Retrofit2源码分析
例子
从简单的例子开始分析Retrofit2是怎么和其他的库一起合作的,
下边是一个很简单的例子,是rxjava2 + retrofit2 + okhttp3 + gson混合使用,是访问淘宝的ip地址查询服务,返回信息输出到EditText里。
public static Retrofit getRetrofit() {
if (retrofit == null) {
synchronized (Retrofit.class) {
if (retrofit == null) {
retrofit = new Retrofit.Builder()
.baseUrl(BASE_URL)
.addConverterFactory(ScalarsConverterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(getOkHttpClient())
.build();
}
}
}
return retrofit;
}public interface IpServiceRx {
@Headers({
"Accept-Encoding: application/json",
"User-Agent: wz"
})
@GET("getIpInfo.php")
Observable<Response<IpModel>> getIpMsg(@Query("ip") String ip);
}/**
* rxjava2 + retrofit2 + okhttp3
*/
private void requestData3() {
Retrofit retrofit = NetworkUtils.getRetrofit();
IpServiceRx ipServiceRx = retrofit.create(IpServiceRx.class);
String ip = "117.100.130.5";
Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
ipMsg.throttleFirst(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Response<IpModel>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Response<IpModel> ipModelResponse) {
IpModel ipModel = ipModelResponse.body();
if (ipModel == null) {
return;
}
IpData data = ipModel.getData();
if (data == null) {
return;
}
mEt.setText(getCSData(data));
}
@Override
public void onError(@NonNull Throwable e) {
mEt.setText(e.toString());
e.printStackTrace();
}
@Override
public void onComplete() {
}
});
}先从创建Retrofit时传递的几个factory看起
ConverterFactory
.addConverterFactory(GsonConverterFactory.create())
public Builder addConverterFactory(Converter.Factory factory) {
converterFactories.add(checkNotNull(factory, "factory == null"));
return this;
}把转换器加入到了一个list中
public final class GsonConverterFactory extends Converter.Factory {
/**
* Create an instance using a default {@link Gson} instance for conversion. Encoding to JSON and
* decoding from JSON (when no charset is specified by a header) will use UTF-8.
*/
public static GsonConverterFactory create() {
return create(new Gson());
}
/**
* Create an instance using {@code gson} for conversion. Encoding to JSON and
* decoding from JSON (when no charset is specified by a header) will use UTF-8.
*/
@SuppressWarnings("ConstantConditions") // Guarding public API nullability.
public static GsonConverterFactory create(Gson gson) {
if (gson == null) throw new NullPointerException("gson == null");
return new GsonConverterFactory(gson);
}
private final Gson gson;
private GsonConverterFactory(Gson gson) {
this.gson = gson;
}
//返回解析okhttp3.ResponseBody的Converter实例
@Override
public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations,
Retrofit retrofit) {
TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
return new GsonResponseBodyConverter<>(gson, adapter);
}
//返回解析okhttp3.RequsetBody的Converter实例
@Override
public Converter<?, RequestBody> requestBodyConverter(Type type,
Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
return new GsonRequestBodyConverter<>(gson, adapter);
}
}public interface Converter<F, T> {
@Nullable T convert(F value) throws IOException;
/** Creates {@link Converter} instances based on a type and target usage. */
abstract class Factory {
/**
* Returns a {@link Converter} for converting an HTTP response body to {@code type}, or null if
* {@code type} cannot be handled by this factory. This is used to create converters for
* response types such as {@code SimpleResponse} from a {@code Call<SimpleResponse>}
* declaration.
*/
public @Nullable Converter<ResponseBody, ?> responseBodyConverter(Type type,
Annotation[] annotations, Retrofit retrofit) {
return null;
}
/**
* Returns a {@link Converter} for converting {@code type} to an HTTP request body, or null if
* {@code type} cannot be handled by this factory. This is used to create converters for types
* specified by {@link Body @Body}, {@link Part @Part}, and {@link PartMap @PartMap}
* values.
*/
public @Nullable Converter<?, RequestBody> requestBodyConverter(Type type,
Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
return null;
}
/**
* Returns a {@link Converter} for converting {@code type} to a {@link String}, or null if
* {@code type} cannot be handled by this factory. This is used to create converters for types
* specified by {@link Field @Field}, {@link FieldMap @FieldMap} values,
* {@link Header @Header}, {@link HeaderMap @HeaderMap}, {@link Path @Path},
* {@link Query @Query}, and {@link QueryMap @QueryMap} values.
*/
public @Nullable Converter<?, String> stringConverter(Type type, Annotation[] annotations,
Retrofit retrofit) {
return null;
}
/**
* Extract the upper bound of the generic parameter at {@code index} from {@code type}. For
* example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}.
*/
protected static Type getParameterUpperBound(int index, ParameterizedType type) {
return Utils.getParameterUpperBound(index, type);
}
/**
* Extract the raw class type from {@code type}. For example, the type representing
* {@code List<? extends Runnable>} returns {@code List.class}.
*/
protected static Class<?> getRawType(Type type) {
return Utils.getRawType(type);
}
}
}CallAdapterFactory
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
public Builder addCallAdapterFactory(CallAdapter.Factory factory) {
callAdapterFactories.add(checkNotNull(factory, "factory == null"));
return this;
}public final class RxJava2CallAdapterFactory extends CallAdapter.Factory {
/**
* Returns an instance which creates synchronous observables that do not operate on any scheduler
* by default.
*/
public static RxJava2CallAdapterFactory create() {
return new RxJava2CallAdapterFactory(null, false);
}
private final @Nullable Scheduler scheduler;
private final boolean isAsync;
private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
this.scheduler = scheduler;
this.isAsync = isAsync;
}
...
}public interface CallAdapter<R, T> {
Type responseType();
//注意这里的Call其实是Retrofit自己写的Call,并不是okhttp里的。
T adapt(Call<R> call);
/**
* Creates {@link CallAdapter} instances based on the return type of {@linkplain
* Retrofit#create(Class) the service interface} methods.
*/
abstract class Factory {
/**
* Returns a call adapter for interface methods that return {@code returnType}, or null if it
* cannot be handled by this factory.
*/
public abstract @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations,
Retrofit retrofit);
/**
* Extract the upper bound of the generic parameter at {@code index} from {@code type}. For
* example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}.
*/
protected static Type getParameterUpperBound(int index, ParameterizedType type) {
return Utils.getParameterUpperBound(index, type);
}
/**
* Extract the raw class type from {@code type}. For example, the type representing
* {@code List<? extends Runnable>} returns {@code List.class}.
*/
protected static Class<?> getRawType(Type type) {
return Utils.getRawType(type);
}
}
}上边只是暂时列出来,后边会慢慢分析。
然后看下build()
public Retrofit build() {
// 没有设置时会自动创建一个OkHttpClient
okhttp3.Call.Factory callFactory = this.callFactory;
if (callFactory == null) {
callFactory = new OkHttpClient();
}
// platform是Android,defaultCallbackExecutor是主线程handler。
Executor callbackExecutor = this.callbackExecutor;
if (callbackExecutor == null) {
callbackExecutor = platform.defaultCallbackExecutor();
}
// 可以看到callAdapterFactories包含了我们设置的,还有platform自带的
// Make a defensive copy of the adapters and add the default Call adapter.
List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories);
callAdapterFactories.addAll(platform.defaultCallAdapterFactories(callbackExecutor));
// 而converterFactories也是类似,包含了我们设置的,还有自带的几个。
// Make a defensive copy of the converters.
List<Converter.Factory> converterFactories = new ArrayList<>(
1 + this.converterFactories.size() + platform.defaultConverterFactoriesSize());
// Add the built-in converter factory first. This prevents overriding its behavior but also
// ensures correct behavior when using converters that consume all types.
converterFactories.add(new BuiltInConverters());
converterFactories.addAll(this.converterFactories);
converterFactories.addAll(platform.defaultConverterFactories());
return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories),
unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly);
}接着看retrofit.create
IpServiceRx ipServiceRx = retrofit.create(IpServiceRx.class);
public <T> T create(final Class<T> service) {
...
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
// 此处platform是Android,抽象类Platform有两个继承类,一个叫Android,还有一个Java8。
private final Platform platform = Platform.get();
private final Object[] emptyArgs = new Object[0];
@Override public @Nullable Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
// 如果是object的方法则直接执行
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
// jdk8引入的接口默认方法,不过由于Java8这个类实现了invokeDefaultMethod,而Android这个类没有实现此方法所以跳过
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
}
});
}可以看到其实是使用了动态代理的方法,来把原类型创建出一个代理对象,
接着我们通过这个代理对象调用方法,
Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
就会执行InvocationHandler.invoke方法,
invoke方法里,如果是object的方法则直接执行并返回,接着默认方法也跳过,
直接看loadServiceMethod
ServiceMethod<?> loadServiceMethod(Method method) {
ServiceMethod<?> result = serviceMethodCache.get(method);
if (result != null) return result;
synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
result = ServiceMethod.parseAnnotations(this, method);
serviceMethodCache.put(method, result);
}
}
return result;
}ServiceMethod
static <T> ServiceMethod<T> parseAnnotations(Retrofit retrofit, Method method) {
// 这个类是用来把我们在方法上的注解和之后传递的参数生成一个okhttp的request,下边会用到。
RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit, method);
Type returnType = method.getGenericReturnType();
if (Utils.hasUnresolvableType(returnType)) {
throw methodError(method,
"Method return type must not include a type variable or wildcard: %s", returnType);
}
// 返回类型不能时void
if (returnType == void.class) {
throw methodError(method, "Service methods cannot return void.");
}
return HttpServiceMethod.parseAnnotations(retrofit, method, requestFactory);
}HttpServiceMethod
static <ResponseT, ReturnT> HttpServiceMethod<ResponseT, ReturnT> parseAnnotations(
Retrofit retrofit, Method method, RequestFactory requestFactory) {
boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction;
boolean continuationWantsResponse = false;
boolean continuationBodyNullable = false;
// 获取方法上的注解
Annotation[] annotations = method.getAnnotations();
Type adapterType;
if (isKotlinSuspendFunction) {
...
} else {
// 方法的返回Type类型
adapterType = method.getGenericReturnType();
}
// 在下边进行分析
CallAdapter<ResponseT, ReturnT> callAdapter = createCallAdapter(retrofit, method, adapterType, annotations);
// 校验返回类型是否正确,即Response<IpModel>
Type responseType = callAdapter.responseType();
// 就是说返回类型不能时okhttp3.Response
if (responseType == okhttp3.Response.class) {
throw methodError(method, "‘"
+ getRawType(responseType).getName()
+ "‘ is not a valid response body type. Did you mean ResponseBody?");
}
// 返回类型不能是Response,必须要包含泛型才行Response<String>,这个Response是retrofit2里定义的,不是okhttp3.Response
if (responseType == Response.class) {
throw methodError(method, "Response must include generic type (e.g., Response<String>)");
}
// TODO support Unit for Kotlin?
if (requestFactory.httpMethod.equals("HEAD") && !Void.class.equals(responseType)) {
throw methodError(method, "HEAD method must use Void as response type.");
}
// 在下边进行分析
Converter<ResponseBody, ResponseT> responseConverter = createResponseConverter(retrofit, method, responseType);
// callFactory 其实就是OkHttpClient
okhttp3.Call.Factory callFactory = retrofit.callFactory;
if (!isKotlinSuspendFunction) {
return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter);
} else
...
}
}最后创建了一个CallAdapted对象返回,
CallAdapted继承关系:
CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT>
HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT>
createCallAdapter
HttpServiceMethod.createCallAdapter
private static <ResponseT, ReturnT> CallAdapter<ResponseT, ReturnT> createCallAdapter(
Retrofit retrofit, Method method, Type returnType, Annotation[] annotations) {
try {
//noinspection unchecked
return (CallAdapter<ResponseT, ReturnT>) retrofit.callAdapter(returnType, annotations);
} catch (RuntimeException e) { // Wide exception range because factories are user code.
throw methodError(method, e, "Unable to create call adapter for %s", returnType);
}retrofit.callAdapter
public CallAdapter<?, ?> callAdapter(Type returnType, Annotation[] annotations) {
return nextCallAdapter(null, returnType, annotations);
}
public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) {
int start = callAdapterFactories.indexOf(skipPast) + 1;
for (int i = start, count = callAdapterFactories.size(); i < count; i++) {
CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this);
if (adapter != null) {
return adapter;
}
}
...
throw new IllegalArgumentException(builder.toString());
}总的来说就是从我们之前设置的和自带的calladapterFactory中找到一个,调用get获取一个CallAdapter的就直接返回。
就用RxJava2CallAdapterFactory.get来说明:
@Override public @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
// 我们的returnType是Observable<Response<IpModel>>的Type。
// 此方法返回Observable,具体看下边getRawType源码
Class<?> rawType = getRawType(returnType);
// 显然下边都为false
boolean isFlowable = rawType == Flowable.class;
boolean isSingle = rawType == Single.class;
boolean isMaybe = rawType == Maybe.class;
if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
return null;
}
boolean isResult = false;
boolean isBody = false;
Type responseType;
// 返回泛型参数,即Response<IpModel>
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
// 再次返回Response<IpModel>的RawType,即retrofit的Response
Class<?> rawObservableType = getRawType(observableType);
if (rawObservableType == Response.class) {
// 再次返回Response<IpModel>的UpperBound,即IpModel
responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
} else if (rawObservableType == Result.class) {
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException("Result must be parameterized"
+ " as Result<Foo> or Result<? extends Foo>");
}
responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
isResult = true;
} else {
responseType = observableType;
isBody = true;
}
// 由上边可知,传递进构造函数的Boolean都是false,创建RxJava2CallAdapterFactory时scheduler为null,isAsync为false,
// responseType为IpModel
return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
}Utils.getRawType
static Class<?> getRawType(Type type) {
// 是具体类型
if (type instanceof Class<?>) {
// Type is a normal class.
return (Class<?>) type;
}
// 是带泛型的类型
if (type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) type;
// 返回Observable
Type rawType = parameterizedType.getRawType();
if (!(rawType instanceof Class)) throw new IllegalArgumentException();
return (Class<?>) rawType;
}
// 其他类型
...Utils.getParameterUpperBound
static Type getParameterUpperBound(int index, ParameterizedType type) {
Type[] types = type.getActualTypeArguments();
Type paramType = types[index];
return paramType;
}createResponseConverter
HttpServiceMethod.createResponseConverter
private static <ResponseT> Converter<ResponseBody, ResponseT> createResponseConverter(Retrofit retrofit, Method method, Type responseType) {
Annotation[] annotations = method.getAnnotations();
try {
return retrofit.responseBodyConverter(responseType, annotations);
} catch (RuntimeException e) { // Wide exception range because factories are user code.
throw methodError(method, e, "Unable to create converter for %s", responseType);
}
}retrofit.responseBodyConverter
public <T> Converter<ResponseBody, T> responseBodyConverter(Type type, Annotation[] annotations) {
return nextResponseBodyConverter(null, type, annotations);
}
public <T> Converter<ResponseBody, T> nextResponseBodyConverter(@Nullable Converter.Factory skipPast, Type type, Annotation[] annotations) {
int start = converterFactories.indexOf(skipPast) + 1;
for (int i = start, count = converterFactories.size(); i < count; i++) {
Converter<ResponseBody, ?> converter = converterFactories.get(i).responseBodyConverter(type, annotations, this);
if (converter != null) {
//noinspection unchecked
return (Converter<ResponseBody, T>) converter;
}
}
...
throw new IllegalArgumentException(builder.toString());
}总的来说就是从我们之前设置的和自带的converterFactory中找到一个,然后获取具体的responseBodyConverter。
就用GsonConverterFactory.responseBodyConverter来说明:
@Override
public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {
TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
return new GsonResponseBodyConverter<>(gson, adapter);
}GsonResponseBodyConverter
GsonRequestBodyConverter(Gson gson, TypeAdapter<T> adapter) {
this.gson = gson;
this.adapter = adapter;
}
@Override
public RequestBody convert(T value) throws IOException {
Buffer buffer = new Buffer();
Writer writer = new OutputStreamWriter(buffer.outputStream(), UTF_8);
JsonWriter jsonWriter = gson.newJsonWriter(writer);
adapter.write(jsonWriter, value);
jsonWriter.close();
return RequestBody.create(MEDIA_TYPE, buffer.readByteString());
}loadServiceMethod(method).invoke
一圈分析后在返回上边的retrofit.create内部分invoke的最后
loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
由上边可知loadServiceMethod方法返回的是CallAdapted,
而CallAdapted继承关系:
CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT>
HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT>
调用invoke是调用到的HttpServiceMethod.invoke
@Override final @Nullable ReturnT invoke(Object[] args) {
Call<ResponseT> call = new OkHttpCall<>(requestFactory, args, callFactory, responseConverter);
return adapt(call, args);
}注意此处的call都是retrofit的,不是okhttp的。
在其中创建了个OkHttpCall对象,顾名思义,里边肯定就是通过okhttp的call进行网络请求的,绕了一大圈终于找到实际请求的地方了。
接着看adapt
adapt实际调用的是CallAdapted.adapt
@Override
protected ReturnT adapt(Call<ResponseT> call, Object[] args) {
return callAdapter.adapt(call);
}此处的callAdapter其实就是上边的RxJava2CallAdapter,
所以就去RxJava2CallAdapter中看看
@Override
public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return RxJavaPlugins.onAssembly(observable);
}由上可知
- isAsync,isResult,isBody为false,
- scheduler = null
- isFlowable,isSingle,isMaybe,isCompletable都为false
所以说最终返回就是new CallExecuteObservable<>(call);
而RxJavaPlugins.onAssembly(observable);中
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}我们并没有对rxjava设置hook,所以返回的还是CallExecuteObservable,
CallExecuteObservable创建时传递的call就是OkHttpCall。
接着就是rxjava操作了
这里顺带把rxjava的一些源码也简单分析了。
Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
ipMsg.throttleFirst(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Response<IpModel>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Response<IpModel> ipModelResponse) {
IpModel ipModel = ipModelResponse.body();
if (ipModel == null) {
return;
}
IpData data = ipModel.getData();
if (data == null) {
return;
}
mEt.setText(getCSData(data));
}
@Override
public void onError(@NonNull Throwable e) {
mEt.setText(e.toString());
e.printStackTrace();
}
@Override
public void onComplete() {
}
});rxjava每次调用一个转换操作,都会返回一个不同的observable,这个observable会记录上层的observable,从而形成一个从上到下的链,所以也叫链式操作。
直到最后调用subscribe,此时会触发向上订阅,即下层都会调用上层的subscribe,当然每层observable都有不同的subscribeActual实现,所以每层其实是上层的observer,同时又是下层的observable。
直到调用到顶层层的subscribeActual,即本例中的CallExecuteObservable的subscribeActual:
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
// 就是OkHttpCall
Call<T> call = originalCall.clone();
CallDisposable disposable = new CallDisposable(call);
observer.onSubscribe(disposable);
if (disposable.isDisposed()) {
return;
}
boolean terminated = false;
try {
// 此处会去调用OkHttpCall的execute,里边肯定就是okhttp的call.execute
Response<T> response = call.execute();
if (!disposable.isDisposed()) {
// 开始往下层传递消息
observer.onNext(response);
}
if (!disposable.isDisposed()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
...
}
}OkHttpCall.execute
@Override
public Response<T> execute() throws IOException {
okhttp3.Call call;
synchronized (this) {
//正确性检查
...
call = rawCall;
if (call == null) {
try {
// 创建一个新的网络请求,看下边代码
call = rawCall = createRawCall();
} catch (IOException | RuntimeException | Error e) {
throwIfFatal(e); // Do not assign a fatal error to creationFailure.
creationFailure = e;
throw e;
}
}
}
if (canceled) {
call.cancel();
}
// 解析 阻塞式call.execute() 返回的okhttp3.Response,看下边代码
return parseResponse(call.execute());
}private okhttp3.Call createRawCall() throws IOException {
okhttp3.Call call = callFactory.newCall(requestFactory.create(args));
return call;
}此处的callFactory就是上边ServiceMethod.parseAnnotations中创建的RequestFactory,通过RequestFactory构建出来一个okhttp的request对象,
最后生成一个okhttp3.Call返回。
Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
ResponseBody rawBody = rawResponse.body();
// Remove the body‘s source (the only stateful object) so we can pass the response along.
rawResponse = rawResponse.newBuilder()
.body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
.build();
int code = rawResponse.code();
if (code < 200 || code >= 300) {
try {
// Buffer the entire body to avoid future I/O.
ResponseBody bufferedBody = Utils.buffer(rawBody);
return Response.error(bufferedBody, rawResponse);
} finally {
rawBody.close();
}
}
if (code == 204 || code == 205) {
rawBody.close();
return Response.success(null, rawResponse);
}
ExceptionCatchingResponseBody catchingBody = new ExceptionCatchingResponseBody(rawBody);
try {
// 此处会用我们之前设置的Converter(即GsonResponseBodyConverter)来解析出具体的bean对象,
T body = responseConverter.convert(catchingBody);
return Response.success(body, rawResponse);
} catch (RuntimeException e) {
// If the underlying source threw an exception, propagate that rather than indicating it was
// a runtime exception.
catchingBody.throwIfCaught();
throw e;
}
}GsonResponseBodyConverter.convert
@Override public T convert(ResponseBody value) throws IOException {
JsonReader jsonReader = gson.newJsonReader(value.charStream());
try {
T result = adapter.read(jsonReader);
if (jsonReader.peek() != JsonToken.END_DOCUMENT) {
throw new JsonIOException("JSON document was not fully consumed.");
}
return result;
} finally {
value.close();
}observer.onNext(response);
向下传递,此时还是subscribeOn(Schedulers.io())指定的线程上操作的,
当传递到observeOn(AndroidSchedulers.mainThread())时,此observable会把线程转换成mainThread,
最后传递到subscribe传递的observer的onNext中
其他
返回值中带不带Response逻辑有什么区别
Observable<Response<IpModel>> getIpMsg(@Query("ip") String ip);上边的分析都是基于带Response的,
那如果定义接口时不带呢,即
Observable<IpModel> getIpMsg(@Query("ip") String ip);那么接着上边的createCallAdapter分析里的RxJava2CallAdapterFactory.get来说明:
@Override public @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
// 我们的returnType是Observable<IpModel>的Type。
// 此方法返回Observable,具体看下边getRawType源码
Class<?> rawType = getRawType(returnType);
// 显然下边都为false
boolean isFlowable = rawType == Flowable.class;
boolean isSingle = rawType == Single.class;
boolean isMaybe = rawType == Maybe.class;
if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
return null;
}
boolean isResult = false;
boolean isBody = false;
Type responseType;
// 返回泛型参数,即IpModel
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
// 还是IpModel
Class<?> rawObservableType = getRawType(observableType);
if (rawObservableType == Response.class) {
responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
} else if (rawObservableType == Result.class) {
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException("Result must be parameterized"
+ " as Result<Foo> or Result<? extends Foo>");
}
responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
isResult = true;
} else {
// 此时会进入此逻辑,isBody为true了
responseType = observableType;
isBody = true;
}
// 由上边可知,传递进构造函数的Boolean除了isBody为true,其他都是false,创建RxJava2CallAdapterFactory时scheduler为null,isAsync为false,
// responseType为IpModel
return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
}然后接着loadServiceMethod(method).invoke里
RxJava2CallAdapter.adapt
@Override
public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return RxJavaPlugins.onAssembly(observable);
}由上可知
- isAsync,isResult,
- isBody为true,
- scheduler = null,
- isFlowable,isSingle,isMaybe,isCompletable都为false
所以说最终返回就是new BodyObservable<>(responseObservable);
BodyObservable(Observable<Response<T>> upstream) {
this.upstream = upstream;
}
@Override protected void subscribeActual(Observer<? super T> observer) {
upstream.subscribe(new BodyObserver<T>(observer));
}就是说最上层是responseObservable,
那么当responseObservable开始下传数据时,会调用BodyObserver的onNext:
@Override
public void onNext(Response<R> response) {
if (response.isSuccessful()) {
// 会把body直接传递到下层,即IpModal
observer.onNext(response.body());
} else {
terminated = true;
Throwable t = new HttpException(response);
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}此处的response是retrofit的,
response会携带更多的此次网络请求的信息,如果只返回实际的bean/modal对象,那么就不能够有更多控制。