ConnectInterceptor连接拦截器分析
源码地址:https://github.com/square/okhttp
经过前几个拦截器的预热,终于来到了拦截器的重头戏了,连接相关的拦截器。这个也耗费了较多时间去准备。(代码较多,撸代码请慎重)
在分析第一个拦截器中RetryAndFollowUpInterceptor,我们知道,当时初始化了一个StreamAllocation的连接对象,也提供了一些对连接对象操作的方法,如取消连接等,但是却没有立刻的做连接,只是一直把这个对象往下传递。而在各种初始化之后(Gzip, Header, 以及cookie的处理拦截器,缓存拦截器),再进行连接操作。
intercept(拦截)
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); }
咋眼一看,这个拦截器的代码很简单啊,只有短短的几行代码。但是里面蕴含的东西,有点多,需要细细分析。
//获取在第一个拦截器就创建的StreamAllocation 类,而这个类,创建时候传入了一个ConnectionPool,以及地址相关信息
StreamAllocation streamAllocation = realChain.streamAllocation();
//通过streamAllocation ,newStream,这个里面会创建连接等一系列的操作。
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
//这里是是获取前一步的connection.
RealConnection connection = streamAllocation.connection();
//这里是把前面创建的连接,传递到下一个拦截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
步骤详细分析
PS: 源代码较多,大部分分析会在代码以注释的形式存在。
基本步骤就上面展示了,我们理清楚以下几个类的调用关系,来分析一下连接是如何一步步建立的:
- StreamAllocation
- ConnectionPool
- RealConnection
1. StreamAllocation实体
首先,StreamAllocation的初始化在第一个拦截器里面,
streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(request.url()), callStackTrace);
传入了三个参数,一个连接池,一个地址类,一个调用堆栈跟踪相关的。
在StreamAllocation构造函数中,主要是把这个三个参数保存为内部变量,供后面使用,还有一个就是同时创建了一个线路选择器:
this.routeSelector = new RouteSelector(address, routeDatabase());
用于后面选择线路使用。
2. newStream()方法
StreamAllocation 的 newStream()是一个建立连接的重要方法,接下来就是一步步对里面的代码撸一撸:
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) { //1. 获取设置的连接超时时间,读写超时的时间,以及是否进行重连。 int connectTimeout = client.connectTimeoutMillis(); int readTimeout = client.readTimeoutMillis(); int writeTimeout = client.writeTimeoutMillis(); boolean connectionRetryEnabled = client.retryOnConnectionFailure(); try { // 2. 获取健康可用的连接 RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks); //3. 通过resultConnection初始化,对请求以及结果 编解码的类(分http 1.1 和http 2.0)。 // 这里主要是初始化,在后面一个拦截器才用到这相关的东西。 HttpCodec resultCodec = resultConnection.newCodec(client, this); synchronized (connectionPool) { codec = resultCodec; return resultCodec; } } catch (IOException e) { throw new RouteException(e); } }
在上面的代码中最重要的,是注释 第二点,获取健康可用的连接,那我们继续深入:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { // 1. 加了个死循环,一直找可用的连接 while (true) { // 2. 这里继续去挖掘,寻找连接 RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled); // 3. 连接池同步获取,上面找到的连接是否是一个新的连接,如果是的话,就直接返回了,就是我们需要找 // 的连接了 // If this is a brand new connection, we can skip the extensive health checks. synchronized (connectionPool) { if (candidate.successCount == 0) { return candidate; } } //4. 如果不是一个新的连接,那么通过判断,是否一个可用的连接。 // 里面是通过Socket的一些方法进行判断的,有兴趣的,可以继续研究一下 // Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { noNewStreams(); continue; } return candidate; } }
上面的代码,重要的也是注释的第二点:继续去挖掘,寻找连接, 我们一直在找连接,但是到现在为止,都是还没到真正的连接部分 ~~#!
我们继续撸啊撸:(其实你看到下面一大段代码,你就知道,其实应该就是我们要找的地方了)
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) throws IOException { Route selectedRoute; // 1. 同步线程池,来获取里面的连接 synchronized (connectionPool) { // 2. 做些判断,是否已经释放,是否编解码类为空,是否用户已经取消 if (released) throw new IllegalStateException("released"); if (codec != null) throw new IllegalStateException("codec != null"); if (canceled) throw new IOException("Canceled"); // 3. 尝试用一下现在的连接,判断一下,是否有可用的连接 // Attempt to use an already-allocated connection. RealConnection allocatedConnection = this.connection; if (allocatedConnection != null && !allocatedConnection.noNewStreams) { return allocatedConnection; } // 4. 尝试在连接池中获取一个连接,get方法中会直接调用,注意最后一个参数为空 // 里面是一个for循环,在连接池里面,寻找合格的连接 // 而合格的连接会通过,StreamAllocation中的acquire方法,更新connection的值。 // Attempt to get a connection from the pool. Internal.instance.get(connectionPool, address, this, null); if (connection != null) { return connection; } selectedRoute = route; } //5. 判断上面得到的线路,是否空,如果为空的,寻找一个可用的线路 // 对于线路的选,可以深究一下这个RouteSeletor // If we need a route, make one. This is a blocking operation. if (selectedRoute == null) { selectedRoute = routeSelector.next(); } RealConnection result; //6. 继续线程池同步下去获取连接 synchronized (connectionPool) { if (canceled) throw new IOException("Canceled"); // 7. 由于上面我们获取了一个线路,无论是新建的,或者已有的。 // 我们通过这个线路,继续在连接池中寻找是否有可用的连接。 // Now that we have an IP address, make another attempt at getting a connection from the pool. // This could match due to connection coalescing. Internal.instance.get(connectionPool, address, this, selectedRoute); if (connection != null) return connection; // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. route = selectedRoute; refusedStreamCount = 0; // 8. 如果前面这么寻找,都没在连接池中找打可用的连接,那么就新建一个 result = new RealConnection(connectionPool, selectedRoute); acquire(result); } // 9. 这里就是就是连接的操作了,终于找到连接的正主了,这里会调用RealConnection的连接方法,进行连接操作。 // 如果是普通的http请求,会使用Socket进行连接 // 如果是https,会进行相应的握手,建立通道的操作。 // 这里就不对里面的操作进行详细分析了,有兴趣可以在进去看看 // Do TCP + TLS handshakes. This is a blocking operation. result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled); routeDatabase().connected(result.route()); Socket socket = null; // 10. 最后就是同步加到 连接池里面了 synchronized (connectionPool) { // Pool the connection. Internal.instance.put(connectionPool, result); // 最后加了一个多路复用的判断,这个是http2才有的 // If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); return result; }
总结一下,上面的代码分为以下几步:
- 前置做些判断,是否已经释放,是否编解码类为空,是否用户已经取消;
- 尝试用一下现在的连接,判断一下,是否有可用的连接,有就返回;
- 尝试在连接池中获取一个连接(线路为空);
- 获取线路;
- 通过获取到的线路,再去连接池取,是否有可用连接,有就返回;
- 前面都找不到可用连接,新建一个;
- 对新建的连接,进行连接操作(Socket);
- 把刚新建的连接,丢到连接池里面。
到这里为止,我们就已经获取到了一个连接了,这个连接拦截器的主要功能其实已经达到了。
回归到拦截器,下一个方法:streamAllocation.connection()。其实这个非常简单,就是获取前面创建的 realConnection而已。
最后的最后,就是把我们的StreamAlloaction, RealConnection, 以及新建的HttpCodec(请求,结果编解码类),传递到下一个拦截器去。
总结:
这是很重要的一个拦截器,这里面把连接建立起来了。同时新建了一个编解码的类,为后面的数据交换读取做了铺垫。
其实分析还是比较粗糙的,有很多地方,还需要深入去解剖,也留下了一下学习的空间:
- RouteSelector,线路的选择,是通过什么来选择线路的?
- 从连接池里获取已有的连接,是如何判断它是否可用的?
- 新建连接,进行连接操作时候,http 和 https是有什么差异的?
- http2的多路复用,是如何实现的?
系列:
OKhttp源码学习(一)—— 基本请求流程
OKhttp源码学习(二)—— OkHttpClient
OKhttp源码学习(三)—— Request, RealCall
OKhttp源码学习(四)—— RetryAndFollowUpInterceptor拦截器
OKhttp源码学习(五)—— BridgeInterceptor拦截器
OKhttp源码学习(六)—— CacheInterceptor拦截器
OKhttp源码学习(八)——CallServerInterceptor拦截器
OKhttp源码学习(九)—— 任务管理(Dispatcher)
原著是一个有趣的人,若有侵权,请通知删除
还没有人抢沙发呢~