1.背景
在做活动或者抢购场景,系统查询的请求并发量非常高
如果并发的访问数据库,会给数据库带来很大的压力,
这时候我们可以考虑将多个查询请求合并成一个查询请求返回给客户端,
比如:根据id查询爆款产品
并发10000次
3000次查询 id=1的产品
4000次查询id=2的产品
2000次查询id=3的产品
1000次查询id=4的产品
如果请求不合并,将到数据库查询10000次,
如果采用请求合并,只需到数据库查询1次,共查询出4个产品,然后按照id把结果给每一个请求,
这样大大降低了数据库的压力
2.代码
控制层代码:
/**
* 查询订单
*
* @return
*/
@RequestMapping("/api/product")
public Object product(String id) throws ExecutionException, InterruptedException {
// 为了便于分析,设置一个线程号
Thread.currentThread().setName("thread-" + id);
Map<String, Object> map = productServiceImpl.queryList(id);
// 模拟随机耗时
ThreadUtil.sleepRandom();
return map;
}
业务实现层代码
package com.ldp.jucproject.service.impl;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.*;
/**
* @author 姿势帝-博客园
* @address https://www.cnblogs.com/newAndHui/
* @WeChat 851298348
* @create 11/06 12:51
* @description
*/
@Service
public class ProductServiceImpl {
/**
* 请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
* CompletableFuture将处理结果返回
*/
class Request {
String code;
CompletableFuture completableFuture;
}
/**
* 存放请求的队列
*/
LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue(200);
@PostConstruct
public void init() {
// 建立定时执行的线程
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if (size == 0) {
return;
}
List<Request> list = new ArrayList<>();
System.out.println("合并请求数:" + size);
//将队列的请求消费到一个集合保存
for (int i = 0; i < size; i++) {
Request poll = queue.poll();
if (poll == null) {
System.out.println("无请求对象!");
break;
}
list.add(poll);
}
//拿到我们需要去数据库查询的特征,保存为集合
List<String> listParam = new ArrayList<>();
for (Request request : list) {
listParam.add(request.code);
}
//将参数传入service处理
Map<String, HashMap<String, Object>> response = queryByCodeBatch(listParam);
//将处理结果返回各自的请求
for (Request request : list) {
Map<String, Object> result = response.get(request.code);
//completableFuture.complete方法完成赋值,这一步执行完毕,阻塞的请求可以继续执行了
request.completableFuture.complete(result);
}
}, 0, 30, TimeUnit.MILLISECONDS);
}
/**
* 模拟从数据库查询
*
* @param codes
* @return
*/
public Map<String, HashMap<String, Object>> queryByCodeBatch(List<String> codes) {
Map<String, HashMap<String, Object>> result = new HashMap();
for (String code : codes) {
HashMap<String, Object> hashMap = new HashMap<>();
hashMap.put("productCode", new Random().nextInt(999999999));
hashMap.put("code", code);
hashMap.put("productNome", "苹果-" + new Random().nextInt(5));
hashMap.put("price", new Random().nextInt(100));
result.put(code, hashMap);
}
return result;
}
public Map<String, Object> queryList(String code) throws ExecutionException, InterruptedException {
Request request = new Request();
request.code = code;
CompletableFuture<Map<String, Object>> future = new CompletableFuture<>();
request.completableFuture = future;
//将对象传入队列
boolean offer = queue.offer(request);
if (!offer) {
// 放入队列失败,说明队列满了,返回系统忙
Map<String, Object> result = new HashMap<>();
result.put("code", "9999");
result.put("message", "系统忙!");
return result;
}
//如果这时候没完成赋值,那么就会阻塞,知道能够拿到值
return future.get();
}
}
3.测试
模拟并发请求
@Test
void product() throws InterruptedException {
// 并发请求数
int num = 1000;
CountDownLatch countDownLatch = new CountDownLatch(num);
for (int i = 1; i <= num; i++) {
// 计数器减一
countDownLatch.countDown();
Integer id = i;
new Thread(() -> {
try {
String url = "http://localhost:8001/api/product?id=" + id;
// 等待计数器归零,归零前都是处于阻塞状态
System.out.println("待查询订单号=" + id);
countDownLatch.await();
HttpRequest request = HttpUtil.createGet(url);
String response = request.execute().body();
System.out.println("response=" + response);
} catch (Exception e) {
log.error("模拟并发出错:{}", e);
}
}).start();
}
// 避免线程终止
Thread.sleep(90 * 1000);
}