阅读文档时,更新阅读数
文档的点赞功能,更新点赞数
更新电子书的文档数,阅读数,点赞数
有文档被点暂时,前端可以收到通知
SpringBoot异步化,WebSocket,RocketMQ
文档阅读数更新
1. 自定义sql
前端点击某篇文档的时候,doc.view_count+1,这个功能的实现是需要在查找文档内容的时候去做的,也就是在DocService当中的findContent当中,我们更新Doc表当中的view_count自增1可以使用这样的sql:
update doc set view_count = view_count + 1 where id = 1
在我们生成的mapper当中,这种sql是没有实现的,我们需要自己去自定义,我们首先在resources当中创建一个DocMapperCust.xml,内容如下:
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="com.taopoppy.wiki.mapper.DocMapperCust" ><update id="increaseViewCount">update doc set view_count = view_count + 1 where id = #{id}</update></mapper>
注意。如果是增删改的sql,都可以使用update包裹起来,查一般使用select包裹起来。另外increaseViewCount是要和西面的DocMapperCust的接口当中的函数名一一对应的。其中这个sql当中的id是动态传递的。看看接口DocMapperCust:
package com.taopoppy.wiki.mapper;import org.apache.ibatis.annotations.Param;public interface DocMapperCust {public void increaseViewCount(@Param("id") Long id);}
这样的话,我们就可以在DocService.java当中进行使用:
@Servicepublic class DocService {@Resourceprivate DocMapperCust docMapperCust;/*** 根据id去content表当中查找内容* @param id* @return*/public String findContent(Long id) {Content content = contentMapper.selectByPrimaryKey(id);// 文档阅读数+1docMapperCust.increaseViewCount(id);if(ObjectUtils.isEmpty(content)) {return "";} else {return content.getContent();}}}
2. 前端代码
前端的对应的代码在doc.vue当中,显示了点赞和阅读的数量。
文档点赞功能开发
1. 后端代码实现
前端在文档内容的下方,增加点赞按钮,点击后doc.vote_count+1,实现的方式和阅读数的比较雷同
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="com.taopoppy.wiki.mapper.DocMapperCust" ><update id="increaseViewCount">update doc set view_cout = view_count + 1 where id = #{id}</update><update id="increaseVoteCount">update doc set vote_count = vote_count + 1 where id = #{id}</update></mapper>
package com.taopoppy.wiki.mapper;import org.apache.ibatis.annotations.Param;public interface DocMapperCust {public void increaseViewCount(@Param("id") Long id);public void increaseVoteCount(@Param("id") Long id);}
/*** 根据id去给文档点赞* @param id* @return*/public void vote(Long id) {docMapperCust.increaseVoteCount(id);}
@GetMapping("/vote/{id}")public CommonResp vote(@PathVariable Long id) {CommonResp commonResp = new CommonResp();docService.vote(id);return commonResp;}
2. 点赞拦截
因为前端的点赞数量显示不是从后端拿的数据,而是在点赞的同时自己自增的1,而点赞按钮不能无限的点赞,所以需要实现拦截功能。
要实现的功能是一个用户只能点赞一次,而且是对外部开放的,也就是用户不需要登录也可以点赞,但是只能点赞一次,所以需要对用户的IP和文章的id要同时进行过滤,这个时候就要使用到redis,点赞的时候先看看redis中有没有IP+docId,有表示点赞过,没有就可以点赞。点赞成功后往redis中放IP+docId。
首先定义了两个工具类:
package com.taopoppy.wiki.util;import java.io.Serializable;public class RequestContext implements Serializable {private static ThreadLocal<String> remoteAddr = new ThreadLocal<>();public static String getRemoteAddr() {return remoteAddr.get();}public static void setRemoteAddr(String remoteAddr) {RequestContext.remoteAddr.set(remoteAddr);}}
package com.taopoppy.wiki.util;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.concurrent.TimeUnit;@Componentpublic class RedisUtil {private static final Logger LOG = LoggerFactory.getLogger(RedisUtil.class);@Resourceprivate RedisTemplate redisTemplate;/*** true:不存在,放一个KEY* false:已存在* @param key* @param second* @return*/public boolean validateRepeat(String key, long second) {if (redisTemplate.hasKey(key)) {LOG.info("key已存在:{}", key);return false;} else {LOG.info("key不存在,放入:{},过期 {} 秒", key, second);redisTemplate.opsForValue().set(key, key, second, TimeUnit.SECONDS);return true;}}}
这两个工具类怎么使用呢,首先我们在日志打印得地方获取远程IP:
@Aspect@Componentpublic class LogAspect {@Before("controllerPointcut()")public void doBefore(JoinPoint joinPoint) throws Throwable {LOG.info("------------- 开始 -------------");LOG.info("请求地址: {} {}", request.getRequestURL().toString(), request.getMethod());LOG.info("类名方法: {}.{}", signature.getDeclaringTypeName(), name);LOG.info("远程地址: {}", request.getRemoteAddr());// 获取到远程IP放进线程本地变量RequestContext.setRemoteAddr(getRemoteIp(request));}/*** 使用nginx做反向代理,需要用该方法才能取到真实的远程IP* @param request* @return*/public String getRemoteIp(HttpServletRequest request) {String ip = request.getHeader("x-forwarded-for");if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("Proxy-Client-IP");}if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("WL-Proxy-Client-IP");}if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {ip = request.getRemoteAddr();}return ip;}}
然后在点赞的处理方法当中拿到远程IP,对redis当中的数据做判断:
@Resourceprivate RedisUtil redisUtil;/*** 根据id去给文档点赞* @param id* @return*/public void vote(Long id) {// 远程IP + doc.id 作为key,24小时之内不能重复点赞String ip = RequestContext.getRemoteAddr();if(redisUtil.validateRepeat("DOC_VOTE_"+id+"_"+ip,3600*24)) {docMapperCust.increaseVoteCount(id);} else {throw new BusinessException(BusinessExceptionCode.VOTE_REPEAT);}}
电子书信息更新方案
由于电子书的也有点赞数,阅读数,而且是其所有文档的点赞数和阅读数的总和,所以当对文档点赞或者阅读,其所属的电子书的点赞数和阅读数也应该更新,更新方案有两种:
- 实时更新:优点是数据准确,缺点是改动的地方多,需要在所有相关业务逻辑的地方去实时更新
- 定时批量更新:优点是改动的地方小,侵入性小,缺点就是实时性不强
我们现在来使用定时批量更新的方式,首先统计doc所有ebook_id为1的sql是这样书写的,把ebook_id为1的全部查出来,将view_count和vote_count字段进行累加
select count(1), sum(`view_count`), sum(`vote_count`) from doc where ebook_id=1
如果我们统一按照ebook_id进行分组,我们可以这样书写,按照ebook_id进行分组,将每一组里的数据进行view_count和vote_count进行累加。然后给count(1)起个名字doc_count,给sum(view_count)起个名字view_count,给sum(vote_count)起个名字vote_count,这样查询出来的结果是按照我们的名字罗列的
select ebook_id, count(1) doc_count, sum(`view_count`) view_count, sum(`vote_count`) vote_count from doc group by ebook_id;

其实上面查询出来的的数据也算一张表了,我们可以将其和ebook表进行关联更新:将ebook表设置为t1表,将上面分组查询出来的数据作为t2表,然后将t2的一些属性设置给t1,条件就是t1的id和t2的ebook_id是一样的。
update ebook t1,(select ebook_id, count(1) doc_count, sum(`view_count`) view_count, sum(`vote_count`) vote_count from doc group by ebook_id) t2set t1.doc_count = t2.doc_count, t1.view_count = t2.view_count, t1.vote_count=t2.vote_countwhere t1.id = t2.ebook_id
SpringBoot定时任务示例
1. 启用定时器
启用定时任务的话只需要启用定时器,不需要引入依赖,首先要去启动类当中添加注解@EnableScheduling:
@EnableSchedulingpublic class WikiApplication {}
2. 两种定时器写法
package com.taopoppy.wiki.job;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;import java.util.Date;@Componentpublic class TestJob {private static final Logger LOG = LoggerFactory.getLogger(TestJob.class);/*** 固定时间间隔,fixedRate单位毫秒*/@Scheduled(fixedRate = 1000)public void simple() throws InterruptedException {SimpleDateFormat formatter = new SimpleDateFormat("mm:ss");String dateString = formatter.format(new Date());Thread.sleep(2000);LOG.info("每隔5秒钟执行一次: {}", dateString);}/*** 自定义cron表达式跑批* 只有等上一次执行完成,下一次才会在下一个时间点执行,错过就错过*/@Scheduled(cron = "*/1 * * * * ?")public void cron() throws InterruptedException {SimpleDateFormat formatter = new SimpleDateFormat("mm:ss SSS");String dateString = formatter.format(new Date());Thread.sleep(1500);LOG.info("每隔1秒钟执行一次: {}", dateString);}}
这个是springboot内置的定时器的两种写法,前者是通过固定的时间间隔,后者是通过cron表达式,值得注意的事两个定时任务是属于同一个线程,所以如果你有一个比较大的调度中心的话应该去使用quartz这个第三方框架,这个是当下最流行的定时器框架。
完成电子书信息更新
更新策略是需要根据网站的流量来决定的,可以每分钟每小时,或者每天,而且Cron表达式不需要去记,直接在Cron在线表达式去生成,现在我们就去把前面书写的sql集成到定时器当中:
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="com.taopoppy.wiki.mapper.DocMapperCust" >// 自定义sql<update id="updateEbookInfo">update ebook t1,(select ebook_id, count(1) doc_count, sum(`view_count`) view_count, sum(`vote_count`) vote_count from doc group by ebook_id) t2set t1.doc_count = t2.doc_count, t1.view_count = t2.view_count, t1.vote_count=t2.vote_countwhere t1.id = t2.ebook_id</update></mapper>
public interface DocMapperCust {// 集成方法public void updateEbookInfo();}
/*** 定时更新电子书当中的数据*/public void updateEbookInfo() {docMapperCust.updateEbookInfo();}
最后在定时器当中去调用上面的方法:
package com.taopoppy.wiki.job;import com.taopoppy.wiki.service.DocService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.slf4j.MDC;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import javax.annotation.Resource;@Componentpublic class DocJob {private static final Logger LOG = LoggerFactory.getLogger(DocJob.class);@Resourceprivate DocService docService;/*** 每30秒更新电子书信息*/@Scheduled(cron = "5/30 * * * * ?")public void cron() {LOG.info("更新电子书下的文档数据开始");long start = System.currentTimeMillis();docService.updateEbookInfo();LOG.info("更新电子书下的文档数据结束,耗时:{}毫秒", System.currentTimeMillis() - start);}}
日志流水号的使用
1. 日志问题修改
由于我们的日志现在是杂乱无章,我们需要想一个办法,把同一个业务的日志,把他们用一个唯一的号把它标识起来。
2. logback增加自定义参数
在resources/logback-spring.xml当中,给日志定义了这样的一个形式,上面注释掉的是我们应该在生产环境当中使用的,需要具体到年月日,时分秒,而下面的一行是我们开发环境使用的简单的表达。
<!--<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %blue(%-50logger{50}:%-4line) %thread %green(%-18X{LOG_ID}) %msg%n</Pattern>--><Pattern>%d{ss.SSS} %highlight(%-5level) %blue(%-30logger{30}:%-4line) %thread %green(%-18X{LOG_ID}) %msg%n</Pattern>
这里面是是会显示log-id的,我们应该在线程的入口,对我们来说,线程的入口可以放在logAspect,log_id的设置就可以放在这里来。logback帮我们提供了一个参数MDC.put,这个是内置的一个函数,我们要在两个位置使用,一个是AOP打印日志的时候,一个是定时器当中:
@Resourceprivate SnowFlake snowFlake;@Before("controllerPointcut()")public void doBefore(JoinPoint joinPoint) throws Throwable {// 增加日志流水号MDC.put("LOG_ID", String.valueOf(snowFlake.nextId()));// 开始打印请求日志ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = attributes.getRequest();Signature signature = joinPoint.getSignature();String name = signature.getName();}
@Componentpublic class DocJob {private static final Logger LOG = LoggerFactory.getLogger(DocJob.class);@Resourceprivate DocService docService;@Resourceprivate SnowFlake snowFlake; // 注入/*** 每30秒更新电子书信息*/@Scheduled(cron = "5/30 * * * * ?")public void cron() {// 增加日志流水号MDC.put("LOG_ID", String.valueOf(snowFlake.nextId()));LOG.info("更新电子书下的文档数据开始");long start = System.currentTimeMillis();docService.updateEbookInfo();LOG.info("更新电子书下的文档数据结束,耗时:{}毫秒", System.currentTimeMillis() - start);}}
WebSocket使用示例
1. 网站通知功能
网站通知功能有两种实现方法:定时轮询和被动通知。
- 定时轮询:打开一个网站,网站会自己发起一个定时器(setTimeout或者setTimeInterval),定时的去调用后端的一个接口,这种需要后端有一个可以存储网站通知内容的地方,前端访问的接口进来就能访问这个地方,把要通知的内容返回给前端。优点就是不占用我们的连接,什么时候想获取通知就去调用一次请求。
- 被动通知:使用到websocket,就是网站显示出来之后,就会创建一个websocket连接,跟服务器连接起来,只要不关闭网站,这个连接就一直存在。有一个缺点就是会一直占用我们的连接,优点就是实时的。
2. 集成WebSocket
WebSocket的内容虽然比较多,但是模式比较固定,到时候只需要拷贝一下内容即可。
(1)服务端
首先要引入spring-boot内置的websocket的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
然后在config文件夹下添加下面的WebSocketConfig文件:
package com.taopoppy.wiki.config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configurationpublic class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}
接着我们就去书写WebSocketServer服务了,其实和controller差不多:
- 外部是可以通过/wx/{token}去访问websocket服务
- 其次每个用户都需要传入一个唯一的token作为标识
- map当中存放的是所有的连接
- 连接成功就会往map当中put对应的token和session会话信息
- 连接关闭就将token和session会话信息从map当中移除
- 后面给前端推送消息主要使用的就是sendInfo这个方法 ```java package com.taopoppy.wiki.websocket;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;
import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashMap;
@Component @ServerEndpoint(“/ws/{token}”) public class WebSocketServer { private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
/*** 每个客户端一个token*/private String token = "";private static HashMap<String, Session> map = new HashMap<>();/*** 连接成功*/@OnOpenpublic void onOpen(Session session, @PathParam("token") String token) {map.put(token, session);this.token = token;LOG.info("有新连接:token:{},session id:{},当前连接数:{}", token, session.getId(), map.size());}/*** 连接关闭*/@OnClosepublic void onClose(Session session) {map.remove(this.token);LOG.info("连接关闭,token:{},session id:{}!当前连接数:{}", this.token, session.getId(), map.size());}/*** 收到消息*/@OnMessagepublic void onMessage(String message, Session session) {LOG.info("收到消息:{},内容:{}", token, message);}/*** 连接错误*/@OnErrorpublic void onError(Session session, Throwable error) {LOG.error("发生错误", error);}/*** 群发消息*/public void sendInfo(String message) {for (String token : map.keySet()) {Session session = map.get(token);try {session.getBasicRemote().sendText(message);} catch (IOException e) {LOG.error("推送消息失败:{},内容:{}", token, message);}LOG.info("推送消息:{},内容:{}", token, message);}}
}
<a name="IP840"></a>#### (2)客户端客户端(前端)对应修改的地方有这么几个:
NODE_ENV=development VUE_APP_SERVER=http://127.0.0.1:8880 VUE_APP_WS_SERVER=ws://127.0.0.1:8880 // ws打头就是websocket协议
由于生产环境还没有域名,这里随便写即可
NODE_ENV=production VUE_APP_SERVER=http://wiki-server.courseimooc.com VUE_APP_WS_SERVER=ws://wiki-server.courseimooc.com
在util/tool.ts文件当中,我们新添加了一个功能,这是一个可以生成唯一token的方法,可以传入长度和进制:```javascript/*** 随机生成[len]长度的[radix]进制数* @param len* @param radix 默认62* @returns {string}*/public static uuid (len: number, radix = 62) {const chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.split('');const uuid = [];radix = radix || chars.length;for (let i = 0; i < len; i++) {uuid[i] = chars[0 | Math.random() * radix];}return uuid.join('');}
我们这边是进入页面就会和websocket进行连接,当然可以在登录之后执行下面这些代码:
let websocket: any;let token: any;const onOpen = () => {console.log('WebSocket连接成功,状态码:', websocket.readyState)};const onMessage = (event: any) => {console.log('WebSocket收到消息:', event.data);notification['info']({message: '收到消息',description: event.data,});};const onError = () => {console.log('WebSocket连接错误,状态码:', websocket.readyState)};const onClose = () => {console.log('WebSocket连接关闭,状态码:', websocket.readyState)};const initWebSocket = () => {// 连接成功websocket.onopen = onOpen;// 收到消息的回调websocket.onmessage = onMessage;// 连接错误websocket.onerror = onError;// 连接关闭的回调websocket.onclose = onClose;};onMounted(() => {// WebSocketif ('WebSocket' in window) {token = Tool.uuid(10);// 连接地址:ws://127.0.0.1:8880/ws/xxxwebsocket = new WebSocket(process.env.VUE_APP_WS_SERVER + '/ws/' + token);initWebSocket()// 关闭// websocket.close();} else {alert('当前浏览器 不支持')}});
整个上面的代码就是websocket的连接集成,点赞的功能我们后续实现。
完成点赞通知功能
1. 点赞时后端往ws推送
/*** 根据id去给文档点赞* @param id* @return*/public void vote(Long id) {String ip = RequestContext.getRemoteAddr();if(redisUtil.validateRepeat("DOC_VOTE_"+id+"_"+ip,3600*24)) {docMapperCust.increaseVoteCount(id);} else {throw new BusinessException(BusinessExceptionCode.VOTE_REPEAT);}// 推送消息Doc docDb = docMapper.selectByPrimaryKey(id);webSocketServer.sendInfo("【" + docDb.getName() + "】被点赞");}
2. 前端收到ws消息,弹出消息
前端的代码我们上面已经展示,主要是在onMessage方法当中使用notification组件弹出提示即可。
异步化解耦点赞功能
我们将点赞和ws推送消息两个关联的功能在技术上进行解耦,比如点赞一多,ws一慢,前端如果迟迟收不到后端的返回,就会产生用户体验的问题。
1. 异步化解耦
所以我们把ws消息推送进行异步化,异步化也就是另外起一个线程来执行后面的内容。首先我们要启用异步化,先到启动类当中去添加一个这样的注解:
@EnableAsync
那剩下的问题又来了,给哪个方法异步化呢?我们知道,springboot会给异步化的方法添加代理类,所以既不能将同一个类当中的方法异步化,也不能直接将webSocketServer.sendInfo异步化,一定要将异步化的代码写到另一个类当中去。
我们新创建一个Service类,专门负责异步化消息,给sendInfo方法添加@Async异步化的注解。
package com.taopoppy.wiki.service;import com.taopoppy.wiki.websocket.WebSocketServer;import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Service;import javax.annotation.Resource;@Servicepublic class WsService {@Resourceprivate WebSocketServer webSocketServer;@Asyncpublic void sendInfo(String message) {webSocketServer.sendInfo(message);}}
原来在DocService.java当中的vote方法中的推送消息部分就变成了下面这样:
// 推送消息Doc docDb = docMapper.selectByPrimaryKey(id);wsService.sendInfo("【" + docDb.getName() + "】被点赞");
2. 日志流水号
因为异步化之后,点赞业务和推送消息的业务日志线程号不一样,这是理所应当的,我们希望他们的日志流水号一样,这样两个关联的业务虽然不是同一个线程,但是属于同一个业务日志流水号,则查询运维的时候会很方便。
先在点赞的业务接口当中拿到业务日志流水号:
// 推送消息Doc docDb = docMapper.selectByPrimaryKey(id);String logId = MDC.get("LOG_ID");wsService.sendInfo("【" + docDb.getName() + "】被点赞", logId);
然后在异步的推送线程当中取出点赞业务的流水号,放在当前推送的线程当中:
@Asyncpublic void sendInfo(String message, String logId) {MDC.put("LOG_ID", logId);webSocketServer.sendInfo(message);}
3. 特性扩展
刚才我们说的Async这个注解,这个注解要生效,对应的Async注解的方法,不能和调用的地方放在同一个类当中,也就是说同一个类当中,不能A调用Async注解的B。另外还有一个注解也有这样的特性,就是事务。
同时对两张表有增删改的操作,就要考虑加事务,否则会造成数据不准确。当然也有不加事务的场景,不能一概而论。比如同时操作两个表,前一个表操作成功,后一个表操作失败,就会造成数据不一致和不准确,所以要么同时成功,要么同时失败,这就是事务的一个作用。
比如说在DocService的save方法当中,要同时操作docMapper和contentMapper,就要添加一个@Transactional事务的注解:
@Transactionalpublic void save(DocSaveReq req) {Doc doc = CopyUtil.copy(req, Doc.class);Content content = CopyUtil.copy(req,Content.class);if(ObjectUtils.isEmpty(req.getId())) {// 新增文档docdoc.setId(snowFlake.nextId());doc.setViewCount(0);doc.setVoteCount(0);docMapper.insert(doc);// 同时新增内容contentcontent.setId(doc.getId());contentMapper.insert(content);} else {// 更新docMapper.updateByPrimaryKey(doc);// 包含大字段的操作int count = contentMapper.updateByPrimaryKeyWithBLOBs(content);if(count == 0) {// 如果content当中没有该id的记录,就要插入contentMapper.insert(content);}}}
这样添加注解之后,就必须在其他类当中去调用这个方法,如果在DocService这个类当中其他方法调用save方法,那么这个事务注解就不起作用了。
MQ解耦点赞通知功能
1. Springboot异步化的问题
@Asyncpublic void sendInfo(String message, String logId) {MDC.put("LOG_ID", logId);webSocketServer.sendInfo(message);}
假如上面这部分的方法里面的程序很复杂,或者出错等原因,会导致我这个线程一直会被占用,假如点赞这个业务量很大的时候,就会开启很多个线程,当然这里可以添加一个线程池,让异步化规定只能开启多少个线程,一旦超过了这个线程池,它还是会变成同步的。
即使有线程池,假如这个业务是耗时很长的,业务量很大的话,线程越来越多,就会把服务器塞满,影响原有的业务内容,解决这个问题我们需要使用到MQ,MQ就是消息队列,和redis一样,是一个中间件,需要单独安装,常用的MQ有rocketmq,kafka,rabbitmq等等。
MQ分为发送方和接收方,我们上面构造好要发送给前端的通知消息之后,就往MQ里面送,MQ里面还有一个消费端,就是另起一个服务来监听MQ,收到MQ之后才会去执行点赞通知,所以MQ的发送和接收就变成两个服务器,即使接收很慢,接收的地方挂了,也不影响现有的点赞功能,只不过前端收不到点赞的通知罢了。
MQ就是一个第三方软件,到官网下载rocketmq-all-4.7.1-bin-release.zip,然后解压到电脑。进入bin目录,启动mqnamesrv.cmd和mqborker.cmd,前者是注册中心,后者是真正的MQ服务端,springboot就是MQ的客户端。
2. 使用RocketMQ解耦
先添加一下环境变量:ROCKETMQ_HOME(E:\rocketmq-all-4.7.1-bin-release\rocketmq-all-4.7.1-bin-release),然后打开两个新的cmd窗口,路径定位在MQ的bin目录下,先在一个窗口执行mqnamesrv.cmd,然后在另一个窗口执行mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true,后面添加的这个参数后,SpringBoot可以发送任意的topic到RocketMQ,否则需要在RocketMQ里先创建好Topic。
(1)发送端
然后在pom.xml当中集成依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency>
然后在application.properties当中配置:
# rocketmq配置rocketmq.name-server=127.0.0.1:9876rocketmq.producer.group=default
在DocService.java当中去书写发送端:
@Resourceprivate RocketMQTemplate rocketMQTemplate;public void vote(Long id) {...// 推送消息Doc docDb = docMapper.selectByPrimaryKey(id);String logId = MDC.get("LOG_ID");// 给mq推送消息(参数:主题,内容)rocketMQTemplate.convertAndSend("VOTE_TOPIC","【" + docDb.getName() + "】被点赞");}
(2)接收端
创建一个rocketmq的package包,然后创建一个VoteTopicConsumer.java,内容如下:
package com.taopoppy.wiki.rocketmq;import com.taopoppy.wiki.websocket.WebSocketServer;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service@RocketMQMessageListener(consumerGroup = "default", topic = "VOTE_TOPIC")public class VoteTopicConsumer implements RocketMQListener<MessageExt> {private static final Logger LOG = LoggerFactory.getLogger(VoteTopicConsumer.class);@Resourcepublic WebSocketServer webSocketServer;@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();LOG.info("ROCKETMQ收到消息:{}", new String(body));webSocketServer.sendInfo(new String(body));}}
整个过程是:
- 前端发起点赞请求
- springboot收到请求,发送VOTE_TOPIC给mq
- mq把消息推送给监听的地方
- springboot就是监听VOTE_TOPIC的地方
- springboot再把消息通过websocket发给前端
