Android |《看完不忘系列》之okhttp
嗨,我是哈利迪~《看完不忘系列》将以从树干到细枝
的思路分析一些技术框架,本文将对开源项目okhttp
网络库进行介绍。
本文约3800字,阅读大约10分钟。如个别大图模糊,可前往个人站点阅读。
概览
源码基于3.14.9,即java版本的最新版
首先上职责图,各个类的名字基本可以见名知意了,就不翻译了,直接起飞~
树干
我们先看一趟飞行的大体流程,
好了,进入代码环节,引入依赖,
implementation 'com.squareup.okhttp3:okhttp:3.14.9'
简单使用(只分析异步请求,同步请求类似),
class OkhttpActivity extends AppCompatActivity { //创建机场,通常是单例 OkHttpClient mClient = new OkHttpClient(); void onCreate(Bundle savedInstanceState) { String url = "xxx"; //构建者模式创建Request请求,设置url(飞去哪里) Request request = new Request.Builder().url(url).build(); //知道目的地后,创建Call会话(本次航班) Call call = mClient.newCall(request); //异步请求入队(飞机进入就绪跑道) call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { //本次航班失败 - - } @Override public void onResponse(Call call, Response response) throws IOException { //抵达目的地! //body只能取一次,Response就会关闭,所以要用临时变量接收 String result = response.body().string(); //回调在子线程,要操作UI的话需切回主线程 runOnUiThread(() -> { mBinding.tv.setText(result); }); } }); } }
OkHttpClient
和Request
使用构建者模式创建即可,当然,如果OkHttpClient
不需要进行配置,直接new就行。知道了起点和终点,就可以创建航班Call
了,
//OkHttpClient.java Call newCall(Request request) { return RealCall.newRealCall(this, request, false); } //RealCall.java RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { RealCall call = new RealCall(client, originalRequest, forWebSocket); //Transmitter意为发射器,功能挺杂的,就先叫他机长吧 call.transmitter = new Transmitter(client, call); return call; }
可见Call
的实例是RealCall
,航班创建好后,进入就绪跑道,
//RealCall.java void enqueue(Callback responseCallback) { //机长回调eventListener,实时汇报航班状态,先忽略 transmitter.callStart(); //用AsyncCall封装Callback,由机场调度中心dispatcher安排进入就绪跑道 client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
AsyncCall
就是一个Runnable,run方法里调了execute方法,
//AsyncCall.java void execute() { try { //得到Response,抵达目的地 Response response = getResponseWithInterceptorChain(); //成功(一般response.isSuccessful()才是真正意义上的成功) responseCallback.onResponse(RealCall.this, response); } catch (IOException e) { //失败 responseCallback.onFailure(RealCall.this, e); } catch (Throwable t) { cancel(); IOException canceledException = new IOException("canceled due to " + t); canceledException.addSuppressed(t); //失败 responseCallback.onFailure(RealCall.this, canceledException); throw t; } finally { //结束航班,callsPerHost减1,runningAsyncCalls移除AsyncCall client.dispatcher().finished(this); } }
AsyncCall
里有一个原子计数器,
//目前每个主机(域名)有多少个会话call volatile AtomicInteger callsPerHost = new AtomicInteger(0);
Dispatcher
里有两个默认max值,
int maxRequests = 64; //最多同时请求数为64 int maxRequestsPerHost = 5; //每个主机最多同时请求数为5
什么意思呢?可以这么理解,机场的调度中心,限制了同时最多起飞的航班为64班;飞往同一个城市的航班,同时最多只能有5班,为什么做城市限制?跟连接池的复用有关,后面会讲。下面我们以上海为例,
看下enqueue方法做了啥,
//Dispatcher.java enqueue(AsyncCall call) { synchronized (this) { //飞机进入就绪跑道 readyAsyncCalls.add(call); if (!call.get().forWebSocket) { //查找飞往上海的AsyncCall AsyncCall existingCall = findExistingCallWithHost(call.host()); //复用上海的计数器callsPerHost,用于统计同一城市的航班 if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } //飞机进入起飞跑道 promoteAndExecute(); }
跟进promoteAndExecute,
//Dispatcher.java boolean promoteAndExecute() { //收集可以执行的AsyncCall List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); //64个起飞跑道被占满,跳出 if (runningAsyncCalls.size() >= maxRequests) break; //飞往上海的航班达到5个,留在就绪跑道就行,跳过 if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; //离开就绪跑道 i.remove(); //上海航班计数器+1 asyncCall.callsPerHost().incrementAndGet(); //把AsyncCall存起来 executableCalls.add(asyncCall); //进入起飞跑道 runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } //把可以执行的AsyncCall,统统起飞 for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); asyncCall.executeOn(executorService()); } return isRunning; }
其中executorService()返回了一个线程池,
//Dispatcher.java synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
核心线程数为0,空闲了60秒后,所有线程会被清空;最大线程数无限制,其实还好,已经有调度中心Dispatcher
会限制请求数了。
继续跟进executeOn方法,
//AsyncCall.java void executeOn(ExecutorService executorService) { boolean success = false; try { //线程池运行Runnable,执行run,调用前面提到的AsyncCall.execute executorService.execute(this); success = true; } catch (RejectedExecutionException e) { InterruptedIOException ioException = new InterruptedIOException("executor rejected"); ioException.initCause(e); transmitter.noMoreExchanges(ioException); //失败回调 responseCallback.onFailure(RealCall.this, ioException); } finally { if (!success) { //结束航班 client.dispatcher().finished(this); } } }
可见,回调都在子线程里完成,所以Activity里要切回主线程才能操作UI。至此,核心流程就结束了。
细枝
拦截器链
前边得到Response
的地方,调了getResponseWithInterceptorChain,进去看看,
//RealCall.java Response getResponseWithInterceptorChain() throws IOException { List<Interceptor> interceptors = new ArrayList<>(); //添加自定义拦截器 interceptors.addAll(client.interceptors()); //添加默认拦截器 interceptors.add(new RetryAndFollowUpInterceptor(client)); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { //添加自定义网络拦截器(在ConnectInterceptor后面,此时网络连接已准备好) interceptors.addAll(client.networkInterceptors()); } //添加默认拦截器,共4+1=5个 interceptors.add(new CallServerInterceptor(forWebSocket)); //创建拦截器链 Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); //放行 Response response = chain.proceed(originalRequest); return response; }
拦截器链基于责任链模式
,即不同的拦截器有不同的职责,链上的拦截器会按顺序挨个处理,在Request
发出之前,Response
返回之前,插入一些定制逻辑,这样可以方便的扩展需求。当然责任链模式
也有不足,就是只要一个环节阻塞住了,就会拖慢整体运行(效率);同时链条越长,产生的中间对象就越多(内存)。
我们先跟proceed方法,
//RealInterceptorChain.java Response proceed(Request request, Transmitter transmitter,Exchange exchange) throws IOException { //传入index + 1,可以访问下一个拦截器 RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); //执行第一个拦截器,同时传入next Response response = interceptor.intercept(next); //等所有拦截器处理完,就能返回Response了 return response; }
下面简要分析下各个拦截器的功能。
一、RetryAndFollowUpInterceptor
:
负责重试和重定向。
//最大重试次数 static final int MAX_FOLLOW_UPS = 20; Response intercept(Chain chain) throws IOException { Request request = chain.request(); RealInterceptorChain realChain = (RealInterceptorChain) chain; Transmitter transmitter = realChain.transmitter(); int followUpCount = 0; while (true) { //机长为Request准备一个连接 //主机、端口、协议都相同时,连接可复用 transmitter.prepareToConnect(request); //放行,让后面的拦截器执行 Response response = realChain.proceed(request, transmitter, null); //后面的拦截器执行完了,拿到Response,解析看下是否需要重试或重定向,需要则返回新的Request Request followUp = followUpRequest(response, route); if (followUp == null) { //新的Request为空,直接返回response return response; } RequestBody followUpBody = followUp.body(); if (followUpBody != null && followUpBody.isOneShot()) { //如果RequestBody有值且只许被调用一次,直接返回response return response; } if (++followUpCount > MAX_FOLLOW_UPS) { //重试次数上限,结束 throw new ProtocolException("Too many follow-up requests: " + followUpCount); } //将新的请求赋值给request,继续循环 request = followUp; } }
其中followUpRequest方***根据Response
不同的响应码做相应的处理,就不跟了。
二、BridgeInterceptor
:
桥接,负责把应用请求转换成网络请求,把网络响应转换成应用响应,说白了就是处理一些网络需要的header,简化应用层逻辑。
Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); RequestBody body = userRequest.body(); if (body != null) { requestBuilder.header("Content-Type", contentType.toString()); //处理Content-Length、Transfer-Encoding //... } //处理Host、Connection、Accept-Encoding、Cookie、User-Agent、 //... //放行,把处理好的新请求往下传递,得到Response Response networkResponse = chain.proceed(requestBuilder.build()); Response.Builder responseBuilder = networkResponse.newBuilder() .request(userRequest); //处理新Response的Content-Encoding、Content-Length、Content-Type、gzip //返回新Response return responseBuilder.build(); }
这里需要注意的一点是,在服务器支持gzip压缩的前提下,客户端不设置Accept-Encoding=gzip的话,okhttp
会自动帮我们开启gzip和解压数据,如果客户端自己开启了gzip,就需要自己解压服务器返回的数据了。
三、CacheInterceptor
:
负责管理缓存,使用okio读写缓存。
InternalCache cache; Response intercept(Chain chain) throws IOException { //获取候选缓存 Response cacheCandidate = cache != null ? cache.get(chain.request()) : null; //创建缓存策略 CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); //网络请求 Request networkRequest = strategy.networkRequest; //缓存Response Response cacheResponse = strategy.cacheResponse; //如果网络请求和缓存Response都为空 if (networkRequest == null && cacheResponse == null) { //返回一个504的Response return new Response.Builder().code(504).xxx.build(); } //如果不使用网络,直接返回缓存 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)).build(); } //放行,往后走 Response networkResponse = chain.proceed(networkRequest); if (cacheResponse != null) { //获取到缓存响应码304,即缓存可用 if (networkResponse.code() == HTTP_NOT_MODIFIED) { Response response = cacheResponse.newBuilder().xxx.build(); //更新缓存,返回 cache.update(cacheResponse, response); return response; } } //获取网络Response Response response = networkResponse.newBuilder().xxx.build(); //写入缓存,返回 cache.put(response); return response; }
关于缓存策略CacheStrategy
会在缓存
章节展开。
四、ConnectInterceptor
:
负责创建连接Connection
。
Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); Transmitter transmitter = realChain.transmitter(); boolean doExtensiveHealthChecks = !request.method().equals("GET"); //机长创建一个交换器Exchange Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks); //放行,给下一个拦截器 return realChain.proceed(request, transmitter, exchange); }
newExchange方***在连接池
章节展开。
五、CallServerInterceptor
:
负责写请求和读响应。
Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Exchange exchange = realChain.exchange(); Request request = realChain.request(); //写请求头 exchange.writeRequestHeaders(request); Response.Builder responseBuilder = null; //处理请求体body... //读取响应头 responseBuilder = exchange.readResponseHeaders(false); //构建响应 Response response = responseBuilder .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); //读取响应体 response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); return response; }
缓存
缓存的实现是基于请求和响应的header来做的。CacheStrategy
即缓存策略,CacheInterceptor
拦截器会根据他拿到网络请求networkRequest、缓存响应cacheResponse,从而决定是使用网络还是缓存。
//CacheStrategy.java //内部类工厂,生产CacheStrategy static class Factory { //一些字段:servedDate、lastModified、expires、etag... Factory(long nowMillis, Request request, Response cacheResponse) { this.nowMillis = nowMillis; this.request = request; this.cacheResponse = cacheResponse; if (cacheResponse != null) { //解析cacheResponse,把参数赋值给自己的成员变量 this.sentRequestMillis = cacheResponse.sentRequestAtMillis(); //... Headers headers = cacheResponse.headers(); for (int i = 0, size = headers.size(); i < size; i++) { String fieldName = headers.name(i); String value = headers.value(i); if ("Date".equalsIgnoreCase(fieldName)) { servedDate = HttpDate.parse(value); servedDateString = value; } else if (xxx){ //... } } } } CacheStrategy get() { CacheStrategy candidate = getCandidate(); if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) { //返回策略,交给拦截器 return new CacheStrategy(null, null); } return candidate; } CacheStrategy getCandidate() { //根据header字段,得到各种策略,交给拦截器... return new CacheStrategy(xxx); } }
getCandidate里面就是根据header字段得到各种策略,然后交给拦截器处理,感兴趣的读者自行阅读啦。
那么缓存是如何写入磁盘的呢?跟进InternalCache
接口,他的实现在Cache
类里,
//Cache.java InternalCache internalCache = new InternalCache() { @Override public Response get(Request request) throws IOException { return Cache.this.get(request);//读取 } @Override public CacheRequest put(Response response) throws IOException { return Cache.this.put(response);//写入 } //... }; Response get(Request request) { String key = key(request.url()); //键 DiskLruCache.Snapshot snapshot; //缓存快照 Entry entry; snapshot = cache.get(key); //cache是okhttp的DiskLruCache if (snapshot == null) { return null; //没缓存,直接返回 } //快照得到输入流,用于创建缓存条目 entry = new Entry(snapshot.getSource(ENTRY_METADATA)); //得到响应 Response response = entry.response(snapshot); return response; } CacheRequest put(Response response) { String requestMethod = response.request().method(); if (!requestMethod.equals("GET")) { //不是get请求,不缓存 return null; } //封装成日志条目 Entry entry = new Entry(response); DiskLruCache.Editor editor = null; editor = cache.edit(key(response.request().url())); //写入缓存 entry.writeTo(editor); return new CacheRequestImpl(editor); }
okhttp的DiskLruCache
,就是根据最近最少使用算法,来管理磁盘缓存,他和Glide里的DiskLruCache
有几份相似,比如日志处理都一样,内部都有一个线程池来清理磁盘,不过okhttp有用到okio。感兴趣的读者可以留意下okhttp3.internal.cache.DiskLruCache
和com.bumptech.glide.disklrucache.DiskLruCache
。
注:缓存默认是关闭的,需要自行开启:
new OkHttpClient.Builder() .cache(new Cache(new File(MyApp.APP.getCacheDir(), "okhttp_cache"), //路径 50L * 1024L * 1024L)) //大小 .build();
连接池
还记得Transmitter
吗,前面我们叫他机长,他是应用和网络之间的桥梁,管理着连接、请求、响应和流。在拦截器
章节知道:
RetryAndFollowUpInterceptor
里调了transmitter.prepareToConnect;准备一个连接
ConnectInterceptor
里调了transmitter.newExchange;创建一个交换器
这里补充几个概念:
Connection,实现为RealConnection:连接,抽象概念,内部维护了Socket
ConnectionPool,持有RealConnectionPool:连接池,管理连接的复用
Exchange:交换器(管理请求和响应、持有ExchangeCodec)
ExchangeCodec:编解码器,用于编码请求,解码响应,实现有Http1ExchangeCodec和Http2ExchangeCodec
HTTP 1.1:引入keep-alive机制,支持连接保活,可以多个请求复用一个连接,但请求是串行的
HTTP 2.0:支持多路复用,一个连接的多个请求可以并行
先看RealConnectionPool
,
//RealConnectionPool.java //线程池,用于清理过期的连接。一个连接池最多运行一个线程 Executor executor = new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true)); //每个ip地址的最大空闲连接数,为5个 int maxIdleConnections; //空闲连接存活时间,为5分钟 long keepAliveDurationNs; //连接队列 Deque<RealConnection> connections = new ArrayDeque<>(); //获取连接 boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter, List<Route> routes, boolean requireMultiplexed) { for (RealConnection connection : connections) { //要求多路复用,跳过不支持多路复用的连接 if (requireMultiplexed && !connection.isMultiplexed()) continue; //不合条件,跳过 if (!connection.isEligible(address, routes)) continue; //给机长分配一个连接 transmitter.acquireConnectionNoEvents(connection); return true; } return false; } //移除连接,executor运行cleanupRunnable,调用了该方法 long cleanup(long now) { //查找移除的连接,或下一次移除的时间 synchronized (this) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { //... if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } } if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { //移除连接 connections.remove(longestIdleConnection); } } //关闭Socket closeQuietly(longestIdleConnection.socket()); }
RealConnection
代码有点多,知道他内部维护了Socket就行了。
前面提到过,同一主机的同时请求数被限制成maxRequestsPerHost = 5 ,为什么这么做?同主机的请求可以共用一个连接,所以大概是为了限流?比如同时飞往上海的航班如果不限数量,会把上海机场挤爆?有知道答案的小伙伴留下评论呀~
小结
okhhttp
具有以下优势:
- 使用简单,拦截器链的设计方便扩展
- 请求失败能自动重连和尝试主机的其他ip、能重定向
- 可以自动处理gzip
- 本地缓存可以避免重复请求
- 同主机的请求可以共享一个Socket,socket由Connection维护,ConnectionPool管理Connection的复用,避免频繁地创建和销毁连接
尾声
还是那句话,该系列旨在摸清技术的整体实现思路,okhhttp
里还有很多精彩细节,如cookie、route、dns、tls等处理,本文没有提到,大家还是要对着源码学习呀。哈迪在看源码过程还发现了很多不懂的地方,比如各种协议和标准,这也是个补充网络知识的好机会,一起飞~
系列文章: