手写OkHttp核心代码与责任链详细分析
OkHttp源码看了好多遍,时间长了还是记不住,怎么破? 从头手写一遍OkHttp的核心代码,你就再也不会忘记. 手写一遍对知识进行梳理,更加深入的去了解,当时作者为什么会这样写,这样写的好处是什么?
Ok,要想手写okhttp,就需要对okhttp的源码以及架构有一定的了解,可以去看这篇文章OkHttp 源码解析及OkHttp的设计思想 .我们虽然了解了okhttp的源码,但是并没有真正的掌握okhttp的核心代码,死扣细节才能真正的掌握,哪怕以后使用其他的网络框架也是同样的原理.
本篇文章较长,建议根据文章步骤手敲代码更容易理解.
基本的包装类
请求URL的包装类,主要包装了,host file protocol(是http还是https) port (端口).
主要就是对请求url地址的分解,如对http://www.kuaidi100.com/query?type=yuantong&postid=222222222
的分解,host:www.kuaidi100.com/query
file:query?type=yuantong&postid=222222222
, protocol:http
,prot:80
这些数据都要在socket发送请求时用到,详细的使用在下面讲解.
public class HttpUrl {
private String host;
private String file;
private String protocol;
private int port;
public HttpUrl(String url) throws MalformedURLException {
URL urls = new URL(url);
host = urls.getHost();//host
file = urls.getFile();// /query?.....
file = TextUtils.isEmpty(file) ? "/" : file;
protocol = urls.getProtocol();//http/https
port = urls.getPort();//端口 如:80
port = port == -1 ? urls.getDefaultPort() : port;
}
public String getHost() {
return host;
}
public String getFile() {
return file;
}
public String getProtocol() {
return protocol;
}
public int getPort() {
return port;
}
}
请求的包装类,主要是包装了请求的方式(GET/POST)、请求头、请求URL、请求体的包装,通过builder的方式代码很简单如下:
public class Request {
//请求头
private Map<String, String> headers;
//请求方式 GET/POST
private Method method;
//请求URL
private HttpUrl url;
//请求体 post请求方式需要用到
private RequestBody body;
public Map<String, String> getHeaders() {
return headers;
}
public Method getMethod() {
return method;
}
public HttpUrl getUrl() {
return url;
}
public RequestBody getBody() {
return body;
}
public Request(Builder builder) {
this.headers = builder.headers;
this.method = builder.method;
this.url = builder.url;
this.body = builder.body;
}
public final static class Builder {
//请求头
Map<String, String> headers = new HashMap<>();
//请求方式 GET/POST
Method method = Method.GET;
HttpUrl url;
RequestBody body;
public Builder url(String url) {
try {
this.url = new HttpUrl(url);
} catch (MalformedURLException e) {
e.printStackTrace();
}
return this;
}
public Builder addHeader(String name, String value) {
headers.put(name, value);
return this;
}
public Builder removeHeader(String name) {
headers.remove(name);
return this;
}
public Builder get() {
method = Method.GET;
return this;
}
public Builder post(RequestBody body) {
this.body = body;
method = Method.POST;
return this;
}
public Request build() {
if (url == null) {
throw new IllegalStateException("HttpUrl this is url,not null");
}
return new Request(this);
}
}
}
关于请求体RequestBody,这里只用了表单的提交方式,代码如下: 只有post请求的时候才会使用RequestBody, 真正的请求体 就是一个key=value & …字符串
public class RequestBody {
/**
* 表单提交 使用urlencoded编码 只模拟表单提交
*/
private final static String CONTENT_TYPE = "application/x-www-form-urlencoded";
private final static String CHARSET = "utf-8";
private Map<String,String> encodedBodys = new HashMap<>();
public String contentType(){
return CONTENT_TYPE;
}
public long contentLength(){
return body().getBytes().length;
}
/**
* 真正的请求体 就是一个key:value & ...字符串
* @return
*/
public String body(){
StringBuffer sb = new StringBuffer();
for (Map.Entry<String, String> entry : encodedBodys.entrySet()) {
sb.append(entry.getKey())
.append("=")
.append(entry.getValue())
.append("&");
}
if (sb.length() != 0){
sb.deleteCharAt(sb.length()-1);
}
return sb.toString();
}
public RequestBody add(String name,String value){
try {
encodedBodys.put(URLEncoder.encode(name,CHARSET),URLEncoder.encode(value,CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return this;
}
}
调度器(Dispatcher)的实现
上述我们实现了最简单的几个基本的包装类,这些其实没有什么可讲解的,下面我们来重点讲解调度器的实现.
新建一个Dispatcher
类,主要负责并发调度请求和控制最大的并发请求数.首先我们先要定义最大同时进行的请求数和执行队列和等待队列.队列使用双端队列,新数据添加到尾部,移除的是头部数据.
//最大同时进行的请求数
private int maxRequests = 64;
//同时请求的相同的host的最大数
private int maxRequestsPreHost = 5;
//等待执行双端队列
private Deque<Call.AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在执行双端队列
private Deque<Call.AsyncCall> runningAsyncCalls = new ArrayDeque<>();
public Dispatcher() {
this(64,5);
}
public Dispatcher(int maxRequests, int maxRequestsPreHost) {
this.maxRequests = maxRequests;
this.maxRequestsPreHost = maxRequestsPreHost;
}
接下来创建一个线程池,所有的任务都在线程池中执行.
//线程池 所有的任务都交给线程池来管理
private ExecutorService executorService;
/**
* 创建一个默认的线程池
*/
public synchronized ExecutorService executorService(){
if (executorService == null){
//线程工厂就是创建线程的
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable,"HttpClient");
}
};
executorService = new ThreadPoolExecutor(0,Integer.MAX_VALUE,60,
TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
}
return executorService;
}
队列中添加的是Call.AsyncCall
,我们先把Call
类写完,Call
类需要Request
的包装类和HttpClient
的设置,首先创建一个HttpClient
作为的主要类,先完成部分:
public class HttpClient {
private final Dispatcher dispatcher;
public HttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;
}
public static final class Builder {
Dispatcher dispatcher;
/**
* 用户自定义调度器
*
* @param dispatcher
* @return
*/
public Builder dispatcher(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
return this;
}
public HttpClient build() {
if (null == dispatcher) {
dispatcher = new Dispatcher();
}
return new HttpClient(this);
}
}
然后写Call
类:
//需要请求的包装类
Request request;
//HttpClient 中配置的参数
HttpClient client;
//标记是否执行过
boolean executed = false;
//标记是否取消请求
boolean cancel = false;
public Call(Request request, HttpClient client) {
this.request = request;
this.client = client;
}
在上述代码中Dispatcher
,任务是在线程池中执行的,而Call
类就是要执行的任务,所以我们需要加入一个执行网络请求的线程内部类AsyncCall
如下:
我们先建立一个空类,run如何执行我们在后面讲解,
/**
* 执行网络请求的线程
*/
class AsyncCall implements Runnable {
private CallBack callBack;
public AsyncCall(CallBack callBack) {
this.callBack = callBack;
}
@Override
public void run() {}
public String host() {
return request.getUrl().getHost();
}
}
同时我们还需要一个回调类CallBack
public interface CallBack {
void onFailure(Call call,Throwable throwable);
void onResponse(Call call, Response response);
}
然后创建equeue
方法,将AsyncCall
传递给Dispatcher
执行.通过HttpClient
获取Dispatcher
代码如下:
public void enqueue(CallBack callBack) {
synchronized (this) {
if (executed) {
throw new IllegalStateException("已经执行过了");
}
//标记已经执行过了
executed = true;
}
//把任务交给调度器调度
client.getDispatcher().enqueue(new AsyncCall(callBack));
}
然后回调Dispatcher
类中,创建enqueue
来处理任务将任务加入到线程池中执行.
首先,我们先要判断正在执行的Call
是否超过了最大请求数与最大相同host请求数,这里我们就用到了HttpUrl
中的host
,如下代码,返回了将要执行Call
的host
请求数,这里的host
从Call.AsyncCall
中通过Call
传递的Request
返回的HttpUrl
中包装的host
.
/**
* 当前正在执行的host
* @param call 正在执行的host
* @return
*/
private int runningCallsForHost(Call.AsyncCall call){
int result = 0;
for (Call.AsyncCall aysncCall : runningAsyncCalls) {
//正在执行队列 和当前将要执行的call的host进行比对,如果相等计数加1
if (aysncCall.host().equals(call.host())){
result++;
}
}
return result;
}
如果执行队列超过了数量的限制则将Call加入到等待队列中.enqueue
方法如下:
/**
* 异步任务调度
*/
public void enqueue(Call.AsyncCall call){
//将要执行的call,判断正在执行的call不能超过最大请求数与相同host的请求数
if (runningAsyncCalls.size()<maxRequests && runningCallsForHost(call)<maxRequestsPreHost){
runningAsyncCalls.add(call);
executorService().execute(call);
}else {
//如果超过了限制的数量 则将call加入到等待队列中
readyAsyncCalls.add(call);
}
}
OK,在上述的代码中,我们实现了调度器异步的任务调度,通过HttpClient
来初始化Call
,传递Request
和HttpClient
,然后通过Call
类中的enqueue
方法new AnsycCall
线程,然后交给调度器处理,调度器是通过HttpClient
获取到的.
我们要在HttpClient
中加入newCall
方法:
public Call newCall(Request request) {
return new Call(request, this);
}
既然有添加任务请求,那么肯定有完成任务请求,因为我们将任务加入到了队列中,当任务完成时,我们需要将任务移除.
那么如何判断任务请求完成了呢? 我们需要在Call
类中添加getResponse
方法这个方法负责将请求结果返回.我们暂时先写个空方法,我会在后面带大家一步一步实现
private Response getResponse() throws Exception {}
getResponse()
方法,在AsyncCall
类中的run
方法中调用,这样就补全了上述AsyncCall
类中空的run
方法,代码如下:
run方法实现也很简单通过一个singalledCallbacked
标示是否回调过了,调用getResponse
方法获取请求结果,然后判断是否取消请求了的处理,最终在finally
中调用了调度器dispacher
的finished
表示任务请求完毕.
@Override
public void run() {
//信号 是否回调过
boolean singalledCallbacked = false;
try {
//真正的实现请求逻辑
Response response = getResponse();
//如果取消了请求,就回调一个onFailure
if (cancel) {
//回调通知过了
singalledCallbacked = true;
callBack.onFailure(Call.this, new IOException("Canceled"));
} else {
singalledCallbacked = true;
//链接成功了
callBack.onResponse(Call.this, response);
}
} catch (Exception e) {
e.printStackTrace();
//如果信号没有通知过 则回调
if (!singalledCallbacked) {
callBack.onFailure(Call.this, e);
}
} finally {
//将这个任务从调度器移除
client.getDispatcher().finished(this);
}
}
我们来看一下调度器的finished方法是如何实现的呢? 思路是这样的:首先将这个任务从执行队列中移除,并且检查正在执行队列的数量是否达到最大和等待队列中是否还有等待中的任务,如果有等待中的任务,则通过while循环队列,依次从等待队列中移除和加入到正在执行队列中执行,同时要判断是否达到了最大相同host数量和最大请求数量,代码如下:
public void finished(Call.AsyncCall asyncCall) {
synchronized (this){
runningAsyncCalls.remove(asyncCall);
//检查是否可以运行ready
checkReady();
}
}
private void checkReady() {
//达到了同时请求最大数
if(runningAsyncCalls.size()>=maxRequests){
return;
}
//没有等待执行的任务
if (readyAsyncCalls.isEmpty()){
return;
}
Iterator<Call.AsyncCall> iterator = readyAsyncCalls.iterator();
while (iterator.hasNext()){
//获得一个等待执行的任务
Call.AsyncCall asyncCall = iterator.next();
//如果等待执行的任务,加入正在执行小于最大相同host数
if (runningCallsForHost(asyncCall) < maxRequestsPreHost){
iterator.remove();//从等待执行列表移除
runningAsyncCalls.add(asyncCall);
executorService().execute(asyncCall);
}
//如果正在执行队列达到了最大值,则不在请求 return
if (runningAsyncCalls.size() >= maxRequests){
return;
}
}
}
调度器的同步请求实现,Call类中添加execute方法,同样也要标记是否执行过了,
然后请求网络返回结果,最终调用finished方法
public Response execute() throws Exception {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
client.getDispatcher().execute(this);
Response response = getResponse();
if (response == null) throw new IOException("Canceled");
return response;
} catch (Exception e) {
throw e;
} finally {
client.getDispatcher().finished(this);
}
}
Dispatcher类添加execute方法,Dispacher中处理就相当简单了,我们只需要将这个任务添加到队列中就可以了,不用在线程池中执行也不用判断是否达到最大请求,因为同步执行,只能执行一个请求,代码如下:
private Deque<Call> runningSyncCall = new ArrayDeque<>();
public void execute(Call call) {
runningSyncCall.add(call);
}
public void finished(Call call) {
synchronized (this) {
if (!runningSyncCall.remove(call)) throw new AssertionError("Call wasn't in-flight");
}
}
这样整个调度器的实现就全部完成了. 下面我们来责任链也是最核心的部分,代码地址 还有不清楚的直接看代码.
责任链详细分析
我们知道其实okhttp的每个责任链都是一个拦截器,首先我们要实现拦截器接口
public interface Interceptor {
Response interceptor(InterceptorChain chain) throws IOException;
}
InterceptorChain
类主要作用是执行拦截器,将链条一条一条的执行下去.
public class InterceptorChain {
List<Interceptor> interceptors;
int index;
Call call;
public InterceptorChain(List<Interceptor> interceptors, int index, Call call) {
this.interceptors = interceptors;
this.index = index;
this.call = call;
}
/**
* 执行拦截器
*/
public Response process() throws IOException {
if (index >= interceptors.size()) throw new IOException("Interceptor Chain Error");
//获得拦截器 从第0个拦截器开始
Interceptor interceptor = interceptors.get(index);
//链条一条一条执行 同时index加1
InterceptorChain next = new InterceptorChain(interceptors, index + 1, call);
Response response = interceptor.interceptor(next);
return response;
}
}
重试与重定向拦截器 RetryInterceptor
这里我只是实现了很简单的重试拦截器,通过for循环来循环重试次数,如果有respone响应返回则说明请求成功,跳出循环.okhttp的实现就较为复杂,通过while(true)循环来重试如果返回了response不为空则跳出循环,是一样的原理.
public class RetryInterceptor implements Interceptor {
private static final String TAG = "RetryInterceptor";
@Override
public Response interceptor(InterceptorChain chain) throws IOException {
Log.e(TAG, "interceptor: RetryInterceptor");
Call call = chain.call;
HttpClient client = call.getClient();
IOException exception = null;
for (int i = 0; i < client.getRetrys() + 1; i++) {
//如果取消了则抛出异常
if (call.isCanceled()) {
throw new IOException("Canceled");
}
try {
//执行链条中下一个拦截器 如果有返回response 则表示请求成功直接return结束for循环
Response response = chain.process();
return response;
} catch (IOException e) {
exception = e;
}
}
throw exception;
}
}
请求头拦截器 BridgeInterceptor
负责把用户构造的请求转换为发送到服务器的请求 、把服务器返回的响应转换为用户友好的响应 处理 配置请求头等信息. 从应用程序代码到网络代码的桥梁。首先,它根据用户请求构建网络请求。然后它继续呼叫网络。最后,它根据网络响应构建用户响应。BridgeInterceptor 主要是添加一些默认的请求头,和对响应数据的处理.
代码如下: 通过InterceptorChain
获取到正在执行的Call
对象,然后通过Call
获取到request
, BridgeInterceptor
其实就是给Request
的headers
添加一些默认的请求头.
在Call类中的getResponse()方法中添加如下代码:
将我们写好的两个拦截器添加,注意顺序不能出错,自定义的拦截器一定是添加到第一个位置.
private Response getResponse() throws Exception {
ArrayList<Interceptor> interceptors = new ArrayList<>();
//自定义拦截器
interceptors.addAll(client.getInterceptors());
//添加重试拦截器
interceptors.add(new RetryInterceptor());
//添加请求头拦截器
interceptors.add(new BridgeInterceptor());
//创建链
InterceptorChain chain = new InterceptorChain(interceptors, 0, this);
//执行责任链
return chain.process();
}
BridgeInterceptor的实现:
public class BridgeInterceptor implements Interceptor {
private static final String TAG = "BridgeInterceptor";
@Override
public Response interceptor(InterceptorChain chain) throws IOException {
Log.e(TAG, "interceptor: BridgeInterceptor");
//必须有host connection:keep-alive 保持长连接
Request request = chain.call.request();
Map<String, String> headers = request.getHeaders();
//如果没有配置Connection 默认给添加上
if (!headers.containsKey(HEAD_CONNECTION)) {
//保持长连接
headers.put(HEAD_CONNECTION, HEAD_VALUE_KEEP_ALIVE);
}
//host 必须和url中的host一致的
headers.put(HEAD_HOST, request.getUrl().getHost());
//是否有请求体 如果有请求体需要添加请求体的长度和请求体的类型
if (null != request.getBody()) {
//获取到RequestBody
RequestBody body = request.getBody();
//获取请求体的长度
long contentLength = body.contentLength();
//请求体长度
if (contentLength != 0) {
headers.put(HEAD_CONTENT_LENGTH, String.valueOf(contentLength));
}
//请求体类型,这里只实现了一种,其他的同样的道理
String contentType = body.contentType();
if (null != contentType) {
headers.put(HEAD_CONTENT_TYPE, contentType);
}
}
Log.e(TAG, "BridgeInterceptor: 设置的请求头");
for (Map.Entry<String, String> entry : headers.entrySet()) {
Log.e(TAG, "BridgeInterceptor key:" + entry.getKey() + " value:" + entry.getValue());
}
//执行下一个链
return chain.process();
}
}
一些常量静态变量存放在HttpCodec类中:
//定义拼接常用的常量
static final String CRLF = "\r\n";
static final int CR = 13;
static final int LF = 10;
static final String SPACE = " ";
static final String VERSION = "HTTP/1.1";
static final String COLON = ":";
public static final String HEAD_HOST = "Host";
public static final String HEAD_CONNECTION = "Connection";
public static final String HEAD_CONTENT_TYPE = "Content-Type";
public static final String HEAD_CONTENT_LENGTH = "Content-Length";
public static final String HEAD_TRANSFER_ENCODING = "Transfer-Encoding";
public static final String HEAD_VALUE_KEEP_ALIVE = "Keep-Alive";
public static final String HEAD_VALUE_CHUNKED = "chunked";
ConnectionInterceptor
获得有效链接的拦截器, 主要功能是从连接池中获取可复用的连接,如果没有可复用的连接则创建连接添加到连接池中,以提供下次的复用.ConnectionInterceptor的实现较为复杂.
首先实现连接池ConnectionPool
类,同样的连接池可以被用户自己实现连接池的规则,通过HttpClient来获取连接池,在HttpClient类中加入如下代码:
同时也将自定义拦截器的list也加入
private final ConnectionPool connectionPool;
private final List<Interceptor> interceptors;
public ConnectionPool getConnectionPool() {
return connectionPool;
}
public HttpClient(Builder builder) {
....
this.connectionPool = builder.connectionPool;
this.interceptors = builder.interceptors;
....
}
public List<Interceptor> getInterceptors() {
return interceptors;
}
在HttpClient的内部类Builder类中加入如下代码:
ConnectionPool connectionPool;
List<Interceptor> interceptors = new ArrayList<>();
/**
* 添加自定义拦截器
*
* @param interceptor
* @return
*/
public Builder addInterceptor(Interceptor interceptor) {
interceptors.add(interceptor);
return this;
}
/**
* 添加自定义的连接池
*
* @param connectionPool
* @return
*/
public Builder connectionPool(ConnectionPool connectionPool) {
this.connectionPool = connectionPool;
return this;
}
public HttpClient build() {
if (null == dispatcher) {
dispatcher = new Dispatcher();
}
if (null == connectionPool) {
connectionPool = new ConnectionPool();
}
return new HttpClient(this);
}
Ok,加入完毕后我们来看如何实现连接池ConnectionPool
类,首先我们要知道连接池的作用是什么? 实现思路: 首先初始化每个连接的保持时间,如果这个连接超出定义的保持时间则将此连接移除连接池,如果没有超出连接时间则复用这个连接,通过host和port来判断连接池中是否存在这个连接. 那么什么时候将这个连接加入到连接池呢? 在获取到服务器的响应的时候判断响应头``是否允许保持长连接,如果允许将此连接加入到连接池中,然后检查连接池中是否有超过最长时间的限制连接.
首先我们定义三个变量:
/**
* 每个链接的检查时间,默认60s
* <p>
* 例如:每隔5s检查是否可用,无效则将其从链接池移除
*/
private long keepAlive;
//是否清理了连接池
private boolean cleanupRunning = false;
//HttpConnection 包装的连接类 存储连接队列
private Deque<HttpConnection> connectionDeque = new ArrayDeque<>();
public ConnectionPool() {
this(1, TimeUnit.MINUTES);
}
public ConnectionPool(long keepAlive, TimeUnit utils) {
this.keepAlive = utils.toMillis(keepAlive);
}
HttpConnction
是真正的请求连接的包装类, 连接池中存储的就是该类,我们先实现连接: 这里我们先不实现Socket请求服务器的方法,我会在后面进行实现,我们先实现连接池中需要用的方法和变量
Socket socket;
Request request;
private HttpClient client;
/**
* 当前链接的socket是否与对应的host port一致
*
* @param host
* @param port
* @return
*/
public boolean isSameAddress(String host, int port) {
if (null == socket) {
return false;
}
return TextUtils.equals(socket.getInetAddress().getHostName(), host) && (port == socket.getPort());
}
/**
* 释放关闭 socket
*/
public void close() {
if (null != socket) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void setRequest(Request request) {
this.request = request;
}
public void setClient(HttpClient client) {
this.client = client;
}
好下面我们实现ConnectionPool连接池的获取连接的方法get:
Get方法很简单遍历连接队列,如果有相同的host和port的连接则,将其从连接池中移除复用该连接,注意get方法需要synchronized.
public synchronized HttpConnection get(String host, int port) {
Iterator<HttpConnection> iterator = connectionDeque.iterator();
while (iterator.hasNext()) {
HttpConnection next = iterator.next();
//如果查找到链接池中存在相同host port 的链接就可以直接使用
if (next.isSameAddress(host, port)) {
iterator.remove();
return next;
}
}
return null;
}
下面看如何将连接加入到连接池中,代码如下.判断是否执行清理连接线程,如果没有则在线程池中执行,这样可以防止多次加入连接池,执行多次线程池.在连接加入到连接池后需要在线程池中遍历所有的闲置连接,超出时间则将连接移除连接池,直到所有连接移除连接池,执行完毕.
/**
* 加入链接到链接池
*
* @param connection
*/
public void put(HttpConnection connection) {
//如果没有执行清理线程 则执行
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connectionDeque.add(connection);
}
/**
* 执行清理线程的线程池
*/
private static final Executor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable r) {
Thread thread = new Thread("Connection Pool");
//设置为守护线程 有什么用呢?
thread.setDaemon(true);
return thread;
}
});
我们来看一下cleanupRunnable
线程的实现.代码如下.
看下面的逻辑如果当前的时间传入给cleanup()方法,如果返回-1则说明连接池中已没有任何连接,循环结束.如果返回的不是-1则等待返回的时间,将线程暂时挂起,然后在继续执行.
cleanup方法的实现是遍历所有连接,判断其是否超过了最大的闲置时间,如果超过了则进行移除,关闭socket连接.然后继续遍历下一个,记录下所有连接中没有超过最长闲置时间的最长的时间,然后返回keepAlive - longestIdleDuration
.将线程挂起这么长的时间后,重新遍历所有连接然后清除.直到所有连接清除完毕.
/**
* 清理链接池的线程
*/
private Runnable cleanupRunnable = new Runnable() {
@Override
public void run() {
while (true) {
//得到下次的检查时间
long waitDuration = cleanup(System.currentTimeMillis());
//如果返回-1 则说明连接池中没有连接 直接结束
if (waitDuration < 0) {
return;
}
if (waitDuration > 0) {
synchronized (ConnectionPool.this) {
try {
//线程暂时被挂起
ConnectionPool.this.wait(waitDuration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
};
private long cleanup(long now) {
//记录比较每个链接的闲置时间
long longestIdleDuration = -1;
synchronized (this) {
Iterator<HttpConnection> iterator = connectionDeque.iterator();
//为什么要迭代它呢? 如果某个链接在最长的闲置时间没有使用则进行移除
while (iterator.hasNext()) {
HttpConnection connection = iterator.next();
//获取这个链接的闲置时间
long idleDuration = now - connection.lastUseTime;
//如果闲置时间超过了最大的闲置时间则进行移除
if (idleDuration > keepAlive) {
iterator.remove();
//释放关闭连接
connection.close();
Log.e("connection pool", "cleanup: 超过闲置时间,移除链接池");
//继续检查下一个
continue;
}
//记录最长的闲置时间
if (longestIdleDuration < idleDuration) {
longestIdleDuration = idleDuration;
}
}
//假如keepAlive 10s longestIdleDuration是5s 那么就等5s后在检查连接池中的连接
if (longestIdleDuration > 0) {
return keepAlive - longestIdleDuration;
}
//标记连接池中没有连接
cleanupRunning = false;
return longestIdleDuration;
}
}
Ok,以上就是整个连接池的实现,是不是get到了很多技能点.你可以去看okhttp的代码也是这样实现的,不过okhttp在ConnectionPool中还加入了最大连接数量的判断,感兴趣的可以去了解一番.
讲了这么多ConnectionInterceptor
类还没有实现, ConnectionInterceptor
的作用主要是从连接池中获取有效的连接(HttpConnection
)然后将这个有效的连接(HttpConnection
),传递给下一个拦截器来实现真正的连接请求以及获取服务器的响应.
代码如下.很简单我们通过chain获取到request和HttpClient,然后通过HttpClient来获取到连接池,通过request过去到host和port传递给连接池的get方法来获得有效连接,如果没有可复用的连接则new HttpConnection,创建一个连接.然后将该连接传递到下一个责任链中,来实现真正的通信.当责任链返回最终的服务器的响应然后判断返回的响应是否允许保持长连接,如果允许,将该连接加入到连接池中复用.如果不允许关闭该连接.
OK,至此整个ConnectionInterceptor
到逻辑就完成了.
public class ConnectionInterceptor implements Interceptor {
private static final String TAG = "ConnectionInterceptor";
@Override
public Response interceptor(InterceptorChain chain) throws IOException {
Log.e(TAG, "interceptor: ConnectionInterceptor");
Request request = chain.call.request();
HttpClient client = chain.call.getClient();
//获取http url
HttpUrl url = request.getUrl();
//从连接池中获取连接 需要具有相同的host 和 port
HttpConnection httpConnection = client.getConnectionPool().get(url.getHost(), url.getPort());
//没有可复用的连接
if (httpConnection == null) {
Log.e(TAG, "ConnectionInterceptor: 新建连接");
httpConnection = new HttpConnection();
} else {
Log.e(TAG, "ConnectionInterceptor: 从连接池中获取连接");
}
//将request传递给连接
httpConnection.setRequest(request);
//将client传递给连接
httpConnection.setClient(client);
try {
//执行下一个拦截器,将连接传递给下一个拦截器
Response process = chain.process(httpConnection);
//如果服务器返回的响应,如果服务其允许长连接
if (process.isKeepAlive) {
//将连接添加到连接池中
Log.e(TAG, "ConnectionInterceptor: 得到服务器响应:isKeepAlive=true,保持长连接,将此连接加入到连接池中");
client.getConnectionPool().put(httpConnection);
} else {
//如果不允许保持连接 则使用连接完毕后直接关闭连接
Log.e(TAG, "ConnectionInterceptor: 得到服务器响应:isKeepAlive=false,不保持长连接,关闭连接");
httpConnection.close();
}
return process;
} catch (IOException e) {
httpConnection.close();
throw e;
}
}
}
在InterceptorChain类中加入以下覆写方法,主要是将HttpConnection进行赋值
在通信拦截器中就可以直接获取到连接,来实现通信.
HttpConnection httpConnection;
public InterceptorChain(List<Interceptor> interceptors, int index, Call call, HttpConnection httpConnection) {
this.interceptors = interceptors;
this.index = index;
this.call = call;
this.httpConnection = httpConnection;
}
/**
* 使下一个拦截器拿到HttpConnection
*
* @param connection
* @return
*/
public Response process(HttpConnection connection) throws IOException {
this.httpConnection = connection;
return process();
}
通信拦截器CallServerInterceptor
首先我们了完善HttpConnection类,来实现socket通信.代码如下.很简单的基础问题,注意https需要创建socketFactory,可以允许用户自己设置,在HttpClient中设置即可.
/**
* 与服务器通信
*
* @return InputStream 服务器返回的数据
*/
public InputStream call(HttpCodec httpCodec) throws IOException {
//创建socket
createSocket();
//发送请求
//按格式拼接 GET 地址参数 HTTP
httpCodec.writeRequest(out, request);
//返回服务器响应(InputStream)
return in;
}
private InputStream in;
private OutputStream out;
/**
* 创建socket
*/
private void createSocket() throws IOException {
if (null == socket || socket.isClosed()) {
HttpUrl httpUrl = request.getUrl();
//如果是https
if (httpUrl.getProtocol().equalsIgnoreCase("https")) {
//也可以用户自己设置
socket = client.getSocketFactory().createSocket();
} else {
socket = new Socket();
}
socket.connect(new InetSocketAddress(httpUrl.getHost(), httpUrl.getPort()));
in = socket.getInputStream();
out = socket.getOutputStream();
}
}
我们在HttpCodec
类中来拼接http协议信息,不懂可以看这个地址了解.主要是拼接:请求行(如GET请求:GET /query?type=yuantong&postid=222222222 / HTTP/1.1 rn),请求头这个就不说了,还有请求体post请求需要用到.
public void writeRequest(OutputStream out, Request request) throws IOException {
StringBuffer sb = new StringBuffer();
//请求行 GET / 。。。。。/ HTTP/1.1\r\n
sb.append(request.getMethod());
sb.append(SPACE);
sb.append(request.getUrl().getFile());
sb.append(SPACE);
sb.append(VERSION);
sb.append(CRLF);
//请求头
Map<String, String> headers = request.getHeaders();
for (Map.Entry<String, String> entry : headers.entrySet()) {
sb.append(entry.getKey());
sb.append(COLON);
sb.append(SPACE);
sb.append(entry.getValue());
sb.append(CRLF);
}
sb.append(CRLF);
//请求体 POST 请求会用到
RequestBody body = request.getBody();
if (null != body) {
sb.append(body.body());
}
out.write(sb.toString().getBytes());
out.flush();
}
HttpCodec来读取服务器返回来的响应.
例如读取如下响应头信息:
Server: nginx
Date:
Mon, 19 Aug 2019 10:43:42 GMT-1m 16s
Content-Type: text/html;charset=UTF-8
Transfer-Encoding: chunked
Connection: keep-alive
Vary: Accept-Encoding
P3P: CP="IDC DSP COR ADM DEVi TAIi PSA PSD IVAi IVDi CONi HIS OUR IND CNT"
Cache-Control:
no-cache
Content-Encoding: gzip
读取一行
/**
* 读取一行
*
* @param in 服务器返回的数据
* @return
* @throws IOException
*/
public String readLine(InputStream in) throws IOException {
//清理
byteBuffer.clear();
//标记
byteBuffer.mark();
boolean isMabeEofLine = false;
byte b;
while ((b = (byte) in.read()) != -1) {
byteBuffer.put(b);
//如果读到一个\r
if (b == CR) {
isMabeEofLine = true;
} else if (isMabeEofLine) {
//读到\n一行结束
if (b == LF) {
//一行数据
byte[] lineBytes = new byte[byteBuffer.position()];
//将标记设置为0
byteBuffer.reset();
//从allocate获得数据
byteBuffer.get(lineBytes);
byteBuffer.clear();
byteBuffer.mark();
//将一行数据返回
return new String(lineBytes);
}
//如果下一个不是\n 置为false
isMabeEofLine = false;
}
}
//如果读完了都没有读到 则服务器出现问题
throw new IOException("Response read line");
}
读取响应头信息
/**
* 读取响应头
*
* @param in
* @return
*/
public Map<String, String> readHeader(InputStream in) throws IOException {
HashMap<String, String> headers = new HashMap<>();
while (true) {
//读取一行
String line = readLine(in);
//如果读到空行\r\n 表示响应头读取完毕了
if (isEmptyLine(line)) {
break;
}
//读取响应头的key value
int index = line.indexOf(":");
if (index > 0) {
String key = line.substring(0, index);
//key与value还有空格
String value = line.substring(index + 2, line.length() - 2);
headers.put(key, value);
}
}
return headers;
}
CallServerInterceptor的完整实现如下代码:最终将响应信息返回.
public class CallServerInterceptor implements Interceptor {
private static final String TAG = "CallServerInterceptor";
@Override
public Response interceptor(InterceptorChain chain) throws IOException {
Log.e(TAG, "interceptor: CallServerInterceptor");
HttpConnection connection = chain.httpConnection;
//进行I/O操作
HttpCodec httpCodec = new HttpCodec();
InputStream in = connection.call(httpCodec);
//读取响应
//读取响应行: HTTP/1.1 200 OK\r\n
String statusLine = httpCodec.readLine(in);
Log.e(TAG, "CallServerInterceptor: 得到响应行:" + statusLine);
//读取响应头
Map<String, String> headers = httpCodec.readHeader(in);
for (Map.Entry<String, String> entry : headers.entrySet()) {
Log.e(TAG, "CallServerInterceptor: 得到响应头 key:" + entry.getKey() + " value:" + entry.getValue());
}
//读取响应体
//判断请求头是否有 content-length 如果有就直接读取这大的长度就可以
int content_length = -1;
if (headers.containsKey(HEAD_CONTENT_LENGTH)) {
content_length = Integer.valueOf(headers.get(HEAD_CONTENT_LENGTH));
}
//根据分块编码解析
boolean isChunked = false;
if (headers.containsKey(HEAD_TRANSFER_ENCODING)) {
isChunked = headers.get(HEAD_TRANSFER_ENCODING).equalsIgnoreCase(HEAD_VALUE_CHUNKED);
}
String body = null;
if (content_length > 0) {
byte[] bytes = httpCodec.readBytes(in, content_length);
body = new String(bytes);
} else if (isChunked) {
body = httpCodec.readChunked(in);
}
//status[1] 就是响应码
String[] status = statusLine.split(" ");
//判断服务器是否允许长连接
boolean isKeepAlive = false;
if (headers.containsKey(HEAD_CONNECTION)) {
isKeepAlive = headers.get(HEAD_CONNECTION).equalsIgnoreCase(HEAD_VALUE_KEEP_ALIVE);
}
//更新一下这个连接的时间
connection.updateLastUserTime();
//返回响应包装类
return new Response(Integer.valueOf(status[1]), content_length, headers, body, isKeepAlive);
}
}
需要注意的是,要在HttpConnection中添加updateLastUserTime方法来更新当前连接的时间,用到连接池ConnectionPool中.
最后完善Call类的getResponse
方法:
private Response getResponse() throws Exception {
ArrayList<Interceptor> interceptors = new ArrayList<>();
//自定义拦截器
interceptors.addAll(client.getInterceptors());
//添加重试拦截器
interceptors.add(new RetryInterceptor());
//添加请求头拦截器
interceptors.add(new BridgeInterceptor());
//添加连接拦截器
interceptors.add(new ConnectionInterceptor());
//添加通信拦截器
interceptors.add(new CallServerInterceptor());
InterceptorChain chain = new InterceptorChain(interceptors, 0, this, null);
return chain.process();
}
测试
OK,我们花费了好大的力气终于写完了一个类似okhttp的网络请求框架.我们需要测试看看是否成立.
创建HttpClient
httpClient = new HttpClient.Builder().retrys(3).build();
测试get方法
Request request = new Request.Builder()
.get()
.url("http://www.kuaidi100.com/query?type=yuantong&postid=222222222")
.build();
Call call = httpClient.newCall(request);
call.enqueue(new CallBack() {
@Override
public void onFailure(Call call, Throwable throwable) {
}
@Override
public void onResponse(Call call, Response response) {
Log.e(TAG, "onResponse: " + response.getBody());
}
});
LOG如下:
interceptor: RetryInterceptor
BridgeInterceptor: interceptor: BridgeInterceptor
BridgeInterceptor: BridgeInterceptor: 设置的请求头
BridgeInterceptor: BridgeInterceptor key:Connection value:Keep-Alive
BridgeInterceptor: BridgeInterceptor key:Host value:www.kuaidi100.com
ConnectionInterceptor: interceptor: ConnectionInterceptor
ConnectionInterceptor: ConnectionInterceptor: 新建连接
CallServerInterceptor: interceptor: CallServerInterceptor
CallServerInterceptor: CallServerInterceptor: 得到响应行:HTTP/1.1 200 OK
CallServerInterceptor: CallServerInterceptor: 得到响应头 key:Server value:nginx
CallServerInterceptor: CallServerInterceptor: 得到响应头 key:Cache-Control value:no-cache
CallServerInterceptor: CallServerInterceptor: 得到响应头 key:Connection value:keep-alive
CallServerInterceptor: CallServerInterceptor: 得到响应头 key:Vary value:Accept-Encoding
CallServerInterceptor: CallServerInterceptor: 得到响应头 key:Content-Length value:204
CallServerInterceptor: CallServerInterceptor: 得到响应头 key:P3P value:CP="IDC DSP COR ADM DEVi TAIi PSA PSD IVAi IVDi CONi HIS OUR IND CNT"
CallServerInterceptor: CallServerInterceptor: 得到响应头 key:Date value:Mon, 19 Aug 2019 11:25:24 GMT
CallServerInterceptor: CallServerInterceptor: 得到响应头 key:Content-Type value:text/html;charset=UTF-8
ConnectionInterceptor: ConnectionInterceptor: 得到服务器响应:isKeepAlive=true,保持长连接,将此连接加入到连接池中
onResponse: {"message":"ok","nu":"222222222","ischeck":"1","com":"yuantong","status":"200","condition":"F00","state":"3","data":[{"time":"2019-08-09 19:25:24","context":"查无结果","ftime":"2019-08-09 19:25:24"}]}
我们在请求GET方法,看下连接是否从连接池中获取.从下面的log可以看到我们从连接池中拿到了可复用的连接,测试成功.
RetryInterceptor: interceptor: RetryInterceptor
BridgeInterceptor: interceptor: BridgeInterceptor
BridgeInterceptor: BridgeInterceptor: 设置的请求头
BridgeInterceptor: BridgeInterceptor key:Connection value:Keep-Alive
BridgeInterceptor: BridgeInterceptor key:Host value:www.kuaidi100.com
ConnectionInterceptor: interceptor: ConnectionInterceptor
ConnectionInterceptor: ConnectionInterceptor: 从连接池中获取连接
OK,我们从头到尾梳理了OKhttp的实现思路,当然我只是以最简单的方式实现了,OKhttp的还有好多细节,比如缓存拦截器,这个拦截器比较复杂但是思路简单:将响应的数据存储到了LruCache中等逻辑大家可以自己分析.