一:Http请求的过程
http请求的生命周期如图所示,其建立连接,关闭连接环节,占据整个生命周期的不小比例。那么频繁的建立连接、释放连接势必造成不小的网络IO浪费。
那么如何尽可能地减少http连接的重复创建释放,是我们讨论的重点。
二:连接池
连接的创建和释放,很容易让人联想到线程创建和终止。跟线程一样,线程有线程池的概念,连接也有连接池。Apache的httpclient是我们服务端最常使用的http工具。
Apache官方的简单使用示例
CloseableHttpClient httpclient = HttpClients.createDefault();
HttpGet httpget = new HttpGet("http://localhost/");
CloseableHttpResponse response = httpclient.execute(httpget);
try {
HttpEntity entity = response.getEntity();
if (entity != null) {
long len = entity.getContentLength();
if (len != -1 && len < 2048) {
System.out.println(EntityUtils.toString(entity));
} else {
// Stream content out
}
}
} finally {
response.close();
}
示例中创建client使用的是createDefault方法,此外还提供的了例外的HttpClients.custom()方法,可以实现自定义httpClient。_custom方法返回的是一个_HttpClientBuilder类,这个类有setConnectionManager �方法可以实现定制httpclient连接池。饶了一圈,终于来到定制连接池这一步啦~。
PoolingHttpClientConnectionManager
官方doc
ClientConnectionPoolManager维护一个HttpClientConnection池,并且能够为来自多个执行线程的连接请求提供服务。连接按每条路由汇集。通过从池中租用连接而不是创建全新的连接,对管理器已经在池中可用的持久连接的路由的请求将成为服务。 ClientConnectionPoolManager维护每个路由的最大连接限制和总数。默认情况下,此实现将为每个给定路由创建不超过 2 个并发连接,并且总共不超过 20 个连接。对于许多现实世界的应用程序来说,这些限制可能过于局限,尤其是当他们使用 HTTP 作为其服务的传输协议时。但是,可以使用ConnPoolControl方法调整连接限制。 在构建时设置的总生存时间 (TTL) 定义了持久连接的最大生命周期,无论其过期设置如何。超过其 TTL 值后,不会再使用持久连接。 在 4.4 版中更改了旧连接的处理。以前,代码会在重新使用之前默认检查每个连接。代码现在仅在自上次使用连接后经过的时间超过已设置的超时时才检查连接。默认超时设置为 2000 毫秒
从官方的doc中,可以看出ClientConnectionPoolManager可以支持我们需要的减少创建释放连接的需求。<br />what给定路由?给定路由就是一个明确的域名or ip:port,那么使用默认的ClientConnectionPoolManager在httpclient访问同一个网址的时候,就只能支持两个并发的连接。<br />**ps:使用HttpClients.createDefault()去调用接口将严重限制我们系统的吞吐量!**
支持的参数
既然了解到默认的ClientConnectionPoolManager会导致吞吐量的严重下降,那么我们可以通过哪些参数改变路由数和总连接数?
setMaxTotal():该方法支持设置连接池最大连接数。
setDefaultMaxPerRoute():该方法支持设置连接池中每个路由的最大连接数。
连接池最大连接数和路由最大连接数有什么关系?
连接池的结构
�从上图连接池的结构中,可以看出连接池其实包含很多小的路由池,每个路由的连接数组成了连接池的连接数。那么当我们系统比较依赖于某几个特定的域名的话,我们就需要把该路由的最大连接数调高。
获取连接
获取连接的流程主要是判断池子中是否有可用的连接,没有则创建,创建时根据池子的最大连接数、每个路由的的最连接数进行创建。有则直接返回连接。
关闭超时的连接
如果连接“长期”没有使用,那么势必占用到我们连接池的数量了,ClientConnectionPoolManager还提供了closeExpiredConnections方法,支持关闭所有过期的空闲连接。closeIdleConnections方法关闭空闲的连接,它支持两个参数第一个空闲时间,第二个是时间的单位。
/**
* Closes all expired connections in the pool.
* <p>
* Open connections in the pool that have not been used for
* the timespan defined when the connection was released will be closed.
* Currently allocated connections are not subject to this method.
* Times will be checked with milliseconds precision.
* </p>
*/
void closeExpiredConnections();
void closeIdleConnections(long idletime, TimeUnit timeUnit);
我们可以用一条线程,定时的关闭连接池中空闲的、过期的连接。
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
// 回收过期的http连接,30秒空闲即回收
scheduledExecutorService.scheduleWithFixedDelay(() -> {
connectionManager.closeExpiredConnections();
connectionManager.closeIdleConnections(30, TimeUnit.SECONDS);
}, 30, 60, TimeUnit.SECONDS);
创建了单线程的延迟任务线程池,每60s执行一次回收连接的任务。
三:请求配置
HttpClient还支持定制http请求配置。
HttpClientBuilder的setDefaultRequestConfig方法支持我们设置默认的http请求配置。其接受的参数是一个RequestConfig对象。
可以通过以下的代码定义一个配置对象,连接超时2s,获取连接请求超时1s,读取超时时间5s。
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(2000).setConnectionRequestTimeout(1000).setSocketTimeout(5000).build();
ConnectTimeout:建立连接的超时时间
ConnectionRequestTimeout:从连接池里取出连接的超时时间
SocketTimeout:从服务端读取读取等待超时时间
四:重试次数
HttpClient支持设置请求的重试次数。
Apacha提供了HttpRequestRetryHandler接口,及其两个实现类DefaultHttpRequestRetryHandler和StandardHttpRequestRetryHandler。
StandardHttpRequestRetryHandler是httpClient默认的重试策略,默认允许三次重试。
我们可以实现HttpRequestRetryHandler接口重写retryRequest方法实现自己的重试策略。也可以简单的使用DefaultHttpRequestRetryHandler,它的构造方法支持自定义重试次数。
当我们不希望httpClient重试时,可以如下操作
// 默认不可重试,重试交由调用方实现
HttpRequestRetryHandler retryHandler = new DefaultHttpRequestRetryHandler(0, false);
HttpClients.custom().setRetryHandler(retryHandler)
五:关闭流
HttpClient执行http请求后返回response对象,而响应的内容是封装在entity里的(其实是io流),如果没有读取entity关闭流的话,流会一直保持,连接没法回到连接池中或释放掉。这将导致后续从池子中取不到连接。
CloseableHttpResponse response = httpclient.execute(httpget);
HttpEntity entity = response.getEntity();
使用如下的方式,可以在读取entity的内容时关闭流。consume和toString方法底层会默认在读取的同时帮我们关闭io流。
EntityUtils.consume(execute.getEntity());
EntityUtils.toString(execute.getEntity(), "UTF-8")
六:关闭httpclient
我们希望程序关闭的时候,关闭httpClient。避免出现内存泄露。
每个java程序都可以通过Runtime.getRuntime()获取其应用的Runtime实例,通过实例addShutdownHook方法可以向jvm注册一个钩子函数,该函数将在程序关闭的时候被执行。(kill -9 不会)
下面的代码将展示如何注册钩子函数关闭httpClient。
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("close httpClient");
try {
if (HTTP_CLIENT != null) {
HTTP_CLIENT.close();
}
} catch (IOException e) {
log.error("close httpClient error", e);
}
}));
�七:基于连接池改造SoaClient
java版的soaClient是基于htppClient实现的。
我们希望对SoaClient进行改造得到一个具有以下特性的soaClient。
- 池化的http连接池,同时具备连接回收,超时关闭等特性。
- 仅对网络问题进行重试,业务报错不重试。减少不必要的网络io
- 支持透传参数,可以自定义连接池最大连接数和每个路由的最大连接数。
key code
SoaClient的请求是基于httpclient发出的。
public SoaResponse callService(String service, String method, Object[] form, SoaBaseParams soaBasic, String distinctRequestId, String env) {
......httprequest 参数拼接....
//尝试次数
int tryCount = 0;
//最多重试tryLimit次
while (tryCount <= tryLimit) {
try {
......正常的处理..
break;
} catch (SocketException | SocketTimeoutException socketException) {
tryCount++;
if (tryCount >3){
LOGGER.error("soa第"+ (tryCount -1) +"次重试失败",socketException);
}else {
LOGGER.warn("soa第"+ (tryCount -1) +"次重试失败",socketException);
}
} catch (JsonProcessingException jse) {
response.setCode("soa_response_error");
response.setMsg(MessageFormat.format("invalid json, {0}", responseStr));
break;
} catch (Exception e) {
response.setCode("soa_call_failed");
response.setMsg(MessageFormat.format("soa请求失败. soa地址:{0}, 异常描述:{1}", reqUrl.toString(), e.getMessage()));
LOGGER.error("soa请求失败", e);
break;
}
}
return response;
}
HttpclientUtils
/**
* http工具
*
* @author linzhe
* @date 2022/04/07
*/
@Slf4j
public class HttpClientUtil {
/**
* 编码
*/
private static final String ENCODING = "UTF-8";
/**
* 连接管理器
*/
private static PoolingHttpClientConnectionManager connectionManager = null;
/**
* http客户端
*/
private static final CloseableHttpClient HTTP_CLIENT;
/**
* 默认最大总连接数
*/
private static final Integer DEFAULT_MAX_TOTAL = 120;
/**
* 默认每个路由最大连接数
*/
private static final Integer DEFAULT_MAX_PER_ROUTE = 60;
static {
try {
// 配置同时支持 HTTP 和 HTTPS
SSLContextBuilder builder = new SSLContextBuilder();
builder.loadTrustMaterial(null, new TrustSelfSignedStrategy());
SSLConnectionSocketFactory sslConnectionSocketFactory = new SSLConnectionSocketFactory(builder.build());
Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create().register("http", PlainConnectionSocketFactory.getSocketFactory()).register("https", sslConnectionSocketFactory).build();
connectionManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
} catch (Exception e) {
log.error("init http connection exception", e);
connectionManager = new PoolingHttpClientConnectionManager();
}
/*
maxTotal:总连接数
maxPerRoute:每个路由最大连接数
计算公式,系统的qps * 1.5~2
eg:j24线上单日被请求数5800w,qps 680左右,12台机器,单台qps56,故最大连接数保持在 90-120比较合适
*/
String maxTotal = EnvironmentUtil.getEnvironment().getProperty("taqu.httpClient.maxTotal");
String maxPerRoute = EnvironmentUtil.getEnvironment().getProperty("taqu.httpClient.maxPerRoute");
connectionManager.setMaxTotal(StringUtils.isBlank(maxTotal) ? DEFAULT_MAX_TOTAL : Integer.parseInt(maxTotal));
connectionManager.setDefaultMaxPerRoute(StringUtils.isBlank(maxPerRoute) ? DEFAULT_MAX_PER_ROUTE : Integer.parseInt(maxPerRoute));
/*
setConnectTimeout表示设置建立连接的超时时间
setSocketTimeout表示发出请求后等待对端应答的超时时间
setConnectionRequestTimeout表示从连接池中拿连接的等待超时时间
连接超时2s(开发环境网络较差,1s容易超时),socket超时5s,从池子中获取连接超时1s
*/
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(2000).setConnectionRequestTimeout(1000).setSocketTimeout(5000).build();
// 默认不可重试,重试交由调用方实现
HttpRequestRetryHandler retryHandler = new DefaultHttpRequestRetryHandler(0, false);
HTTP_CLIENT = HttpClients.custom().setConnectionManager(connectionManager).setDefaultRequestConfig(requestConfig).setRetryHandler(retryHandler).build();
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
// 回收过期的http连接,30秒空闲即回收
scheduledExecutorService.scheduleWithFixedDelay(() -> {
connectionManager.closeExpiredConnections();
connectionManager.closeIdleConnections(30, TimeUnit.SECONDS);
}, 30, 60, TimeUnit.SECONDS);
// 关闭的钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("close httpClient");
try {
if (HTTP_CLIENT != null) {
HTTP_CLIENT.close();
}
} catch (IOException e) {
log.error("close httpClient error", e);
}
}));
}
/**
* post
*
* @param url url
* @param postBody json
* @return {@link String}
* @throws IOException ioexception
*/
public static String post(String url, String postBody) throws IOException {
HttpPost httpPost = new HttpPost(url);
StringEntity entity = new StringEntity(postBody, "utf-8");
entity.setContentEncoding(ENCODING);
entity.setContentType("application/json");
httpPost.setEntity(entity);
HttpResponse resp = HTTP_CLIENT.execute(httpPost);
String respContent = "";
if (resp.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
HttpEntity he = resp.getEntity();
// 读取完自动关闭
respContent = EntityUtils.toString(he, ENCODING);
} else {
EntityUtils.consume(resp.getEntity());
throw new ApiException(resp.getStatusLine().toString());
}
return respContent;
}
/**
* httpPost soa使用
*
* @param url url
* @param args
* @return {@link String}
* @throws IOException ioexception
*/
public static String postBySoa(String url, Object... args) throws Exception {
RequestBuilder rb = RequestBuilder.post();
rb.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8");
HashMap<String, Object> asMap = convertToMap(args);
for (Map.Entry<String, ? extends Object> entry : asMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
rb.addParameter(key, String.valueOf(value));
}
rb.setUri(url);
HttpUriRequest request = rb.build();
CloseableHttpResponse execute = HTTP_CLIENT.execute(request);
StatusLine statusLine = execute.getStatusLine();
if (HttpStatus.SC_OK != statusLine.getStatusCode()) {
EntityUtils.consume(execute.getEntity());
throw new ApiException(statusLine.toString());
}
return EntityUtils.toString(execute.getEntity(), "UTF-8");
}
/**
* 转换为map
*
* @param args
* @return {@link HashMap}<{@link String}, {@link Object}>
*/
private static HashMap<String, Object> convertToMap(Object... args) {
validateDataArgs(args);
HashMap<String, Object> asMap = Maps.newHashMap();
for (int i = 0; i < args.length; i += 2) {
Object key = args[i];
Object value = args[i + 1];
if ((key != null) && (value != null)) {
asMap.put(String.valueOf(key), value);
}
}
return asMap;
}
/**
* 校验soa参数
*
* @param args
*/
public static void validateDataArgs(Object... args) {
if (args.length > 0) {
if ((args.length % 2) != 0) {
throw new ApiException("args must be odd (key+value in order.)");
}
}
}
}
透传参数的获取
String maxTotal = EnvironmentUtil.getEnvironment().getProperty("taqu.httpClient.maxTotal");
String maxPerRoute = EnvironmentUtil.getEnvironment().getProperty("taqu.httpClient.maxPerRoute");
connectionManager.setMaxTotal(StringUtils.isBlank(maxTotal) ? DEFAULT_MAX_TOTAL : Integer.parseInt(maxTotal));
connectionManager.setDefaultMaxPerRoute(StringUtils.isBlank(maxPerRoute) ? DEFAULT_MAX_PER_ROUTE : Integer.parseInt(maxPerRoute));