阅读文档时,更新阅读数
文档的点赞功能,更新点赞数
更新电子书的文档数,阅读数,点赞数
有文档被点暂时,前端可以收到通知
SpringBoot异步化,WebSocket,RocketMQ

文档阅读数更新

1. 自定义sql

前端点击某篇文档的时候,doc.view_count+1,这个功能的实现是需要在查找文档内容的时候去做的,也就是在DocService当中的findContent当中,我们更新Doc表当中的view_count自增1可以使用这样的sql:

  1. update doc set view_count = view_count + 1 where id = 1

在我们生成的mapper当中,这种sql是没有实现的,我们需要自己去自定义,我们首先在resources当中创建一个DocMapperCust.xml,内容如下:

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3. <mapper namespace="com.taopoppy.wiki.mapper.DocMapperCust" >
  4. <update id="increaseViewCount">
  5. update doc set view_count = view_count + 1 where id = #{id}
  6. </update>
  7. </mapper>

注意。如果是增删改的sql,都可以使用update包裹起来,查一般使用select包裹起来。另外increaseViewCount是要和西面的DocMapperCust的接口当中的函数名一一对应的。其中这个sql当中的id是动态传递的。看看接口DocMapperCust:

  1. package com.taopoppy.wiki.mapper;
  2. import org.apache.ibatis.annotations.Param;
  3. public interface DocMapperCust {
  4. public void increaseViewCount(@Param("id") Long id);
  5. }

这样的话,我们就可以在DocService.java当中进行使用:

  1. @Service
  2. public class DocService {
  3. @Resource
  4. private DocMapperCust docMapperCust;
  5. /**
  6. * 根据id去content表当中查找内容
  7. * @param id
  8. * @return
  9. */
  10. public String findContent(Long id) {
  11. Content content = contentMapper.selectByPrimaryKey(id);
  12. // 文档阅读数+1
  13. docMapperCust.increaseViewCount(id);
  14. if(ObjectUtils.isEmpty(content)) {
  15. return "";
  16. } else {
  17. return content.getContent();
  18. }
  19. }
  20. }

2. 前端代码

前端的对应的代码在doc.vue当中,显示了点赞和阅读的数量。

文档点赞功能开发

1. 后端代码实现

前端在文档内容的下方,增加点赞按钮,点击后doc.vote_count+1,实现的方式和阅读数的比较雷同

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3. <mapper namespace="com.taopoppy.wiki.mapper.DocMapperCust" >
  4. <update id="increaseViewCount">
  5. update doc set view_cout = view_count + 1 where id = #{id}
  6. </update>
  7. <update id="increaseVoteCount">
  8. update doc set vote_count = vote_count + 1 where id = #{id}
  9. </update>
  10. </mapper>
  1. package com.taopoppy.wiki.mapper;
  2. import org.apache.ibatis.annotations.Param;
  3. public interface DocMapperCust {
  4. public void increaseViewCount(@Param("id") Long id);
  5. public void increaseVoteCount(@Param("id") Long id);
  6. }
  1. /**
  2. * 根据id去给文档点赞
  3. * @param id
  4. * @return
  5. */
  6. public void vote(Long id) {
  7. docMapperCust.increaseVoteCount(id);
  8. }
  1. @GetMapping("/vote/{id}")
  2. public CommonResp vote(@PathVariable Long id) {
  3. CommonResp commonResp = new CommonResp();
  4. docService.vote(id);
  5. return commonResp;
  6. }

2. 点赞拦截

因为前端的点赞数量显示不是从后端拿的数据,而是在点赞的同时自己自增的1,而点赞按钮不能无限的点赞,所以需要实现拦截功能。

要实现的功能是一个用户只能点赞一次,而且是对外部开放的,也就是用户不需要登录也可以点赞,但是只能点赞一次,所以需要对用户的IP和文章的id要同时进行过滤,这个时候就要使用到redis,点赞的时候先看看redis中有没有IP+docId,有表示点赞过,没有就可以点赞。点赞成功后往redis中放IP+docId。

首先定义了两个工具类:

  1. package com.taopoppy.wiki.util;
  2. import java.io.Serializable;
  3. public class RequestContext implements Serializable {
  4. private static ThreadLocal<String> remoteAddr = new ThreadLocal<>();
  5. public static String getRemoteAddr() {
  6. return remoteAddr.get();
  7. }
  8. public static void setRemoteAddr(String remoteAddr) {
  9. RequestContext.remoteAddr.set(remoteAddr);
  10. }
  11. }
  1. package com.taopoppy.wiki.util;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.data.redis.core.RedisTemplate;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.Resource;
  7. import java.util.concurrent.TimeUnit;
  8. @Component
  9. public class RedisUtil {
  10. private static final Logger LOG = LoggerFactory.getLogger(RedisUtil.class);
  11. @Resource
  12. private RedisTemplate redisTemplate;
  13. /**
  14. * true:不存在,放一个KEY
  15. * false:已存在
  16. * @param key
  17. * @param second
  18. * @return
  19. */
  20. public boolean validateRepeat(String key, long second) {
  21. if (redisTemplate.hasKey(key)) {
  22. LOG.info("key已存在:{}", key);
  23. return false;
  24. } else {
  25. LOG.info("key不存在,放入:{},过期 {} 秒", key, second);
  26. redisTemplate.opsForValue().set(key, key, second, TimeUnit.SECONDS);
  27. return true;
  28. }
  29. }
  30. }

这两个工具类怎么使用呢,首先我们在日志打印得地方获取远程IP:

  1. @Aspect
  2. @Component
  3. public class LogAspect {
  4. @Before("controllerPointcut()")
  5. public void doBefore(JoinPoint joinPoint) throws Throwable {
  6. LOG.info("------------- 开始 -------------");
  7. LOG.info("请求地址: {} {}", request.getRequestURL().toString(), request.getMethod());
  8. LOG.info("类名方法: {}.{}", signature.getDeclaringTypeName(), name);
  9. LOG.info("远程地址: {}", request.getRemoteAddr());
  10. // 获取到远程IP放进线程本地变量
  11. RequestContext.setRemoteAddr(getRemoteIp(request));
  12. }
  13. /**
  14. * 使用nginx做反向代理,需要用该方法才能取到真实的远程IP
  15. * @param request
  16. * @return
  17. */
  18. public String getRemoteIp(HttpServletRequest request) {
  19. String ip = request.getHeader("x-forwarded-for");
  20. if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
  21. ip = request.getHeader("Proxy-Client-IP");
  22. }
  23. if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
  24. ip = request.getHeader("WL-Proxy-Client-IP");
  25. }
  26. if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
  27. ip = request.getRemoteAddr();
  28. }
  29. return ip;
  30. }
  31. }

然后在点赞的处理方法当中拿到远程IP,对redis当中的数据做判断:

  1. @Resource
  2. private RedisUtil redisUtil;
  3. /**
  4. * 根据id去给文档点赞
  5. * @param id
  6. * @return
  7. */
  8. public void vote(Long id) {
  9. // 远程IP + doc.id 作为key,24小时之内不能重复点赞
  10. String ip = RequestContext.getRemoteAddr();
  11. if(redisUtil.validateRepeat("DOC_VOTE_"+id+"_"+ip,3600*24)) {
  12. docMapperCust.increaseVoteCount(id);
  13. } else {
  14. throw new BusinessException(BusinessExceptionCode.VOTE_REPEAT);
  15. }
  16. }

电子书信息更新方案

由于电子书的也有点赞数,阅读数,而且是其所有文档的点赞数和阅读数的总和,所以当对文档点赞或者阅读,其所属的电子书的点赞数和阅读数也应该更新,更新方案有两种:

  • 实时更新:优点是数据准确,缺点是改动的地方多,需要在所有相关业务逻辑的地方去实时更新
  • 定时批量更新:优点是改动的地方小,侵入性小,缺点就是实时性不强

我们现在来使用定时批量更新的方式,首先统计doc所有ebook_id为1的sql是这样书写的,把ebook_id为1的全部查出来,将view_count和vote_count字段进行累加

  1. select count(1), sum(`view_count`), sum(`vote_count`) from doc where ebook_id=1

如果我们统一按照ebook_id进行分组,我们可以这样书写,按照ebook_id进行分组,将每一组里的数据进行view_count和vote_count进行累加。然后给count(1)起个名字doc_count,给sum(view_count)起个名字view_count,给sum(vote_count)起个名字vote_count,这样查询出来的结果是按照我们的名字罗列的

  1. select ebook_id, count(1) doc_count, sum(`view_count`) view_count, sum(`vote_count`) vote_count from doc group by ebook_id;

1648618674(1).png
其实上面查询出来的的数据也算一张表了,我们可以将其和ebook表进行关联更新:将ebook表设置为t1表,将上面分组查询出来的数据作为t2表,然后将t2的一些属性设置给t1,条件就是t1的id和t2的ebook_id是一样的。

  1. update ebook t1,(select ebook_id, count(1) doc_count, sum(`view_count`) view_count, sum(`vote_count`) vote_count from doc group by ebook_id) t2
  2. set t1.doc_count = t2.doc_count, t1.view_count = t2.view_count, t1.vote_count=t2.vote_count
  3. where t1.id = t2.ebook_id

SpringBoot定时任务示例

1. 启用定时器

启用定时任务的话只需要启用定时器,不需要引入依赖,首先要去启动类当中添加注解@EnableScheduling:

  1. @EnableScheduling
  2. public class WikiApplication {}

2. 两种定时器写法

  1. package com.taopoppy.wiki.job;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.scheduling.annotation.Scheduled;
  5. import org.springframework.stereotype.Component;
  6. import java.text.SimpleDateFormat;
  7. import java.util.Date;
  8. @Component
  9. public class TestJob {
  10. private static final Logger LOG = LoggerFactory.getLogger(TestJob.class);
  11. /**
  12. * 固定时间间隔,fixedRate单位毫秒
  13. */
  14. @Scheduled(fixedRate = 1000)
  15. public void simple() throws InterruptedException {
  16. SimpleDateFormat formatter = new SimpleDateFormat("mm:ss");
  17. String dateString = formatter.format(new Date());
  18. Thread.sleep(2000);
  19. LOG.info("每隔5秒钟执行一次: {}", dateString);
  20. }
  21. /**
  22. * 自定义cron表达式跑批
  23. * 只有等上一次执行完成,下一次才会在下一个时间点执行,错过就错过
  24. */
  25. @Scheduled(cron = "*/1 * * * * ?")
  26. public void cron() throws InterruptedException {
  27. SimpleDateFormat formatter = new SimpleDateFormat("mm:ss SSS");
  28. String dateString = formatter.format(new Date());
  29. Thread.sleep(1500);
  30. LOG.info("每隔1秒钟执行一次: {}", dateString);
  31. }
  32. }

这个是springboot内置的定时器的两种写法,前者是通过固定的时间间隔,后者是通过cron表达式,值得注意的事两个定时任务是属于同一个线程,所以如果你有一个比较大的调度中心的话应该去使用quartz这个第三方框架,这个是当下最流行的定时器框架

完成电子书信息更新

更新策略是需要根据网站的流量来决定的,可以每分钟每小时,或者每天,而且Cron表达式不需要去记,直接在Cron在线表达式去生成,现在我们就去把前面书写的sql集成到定时器当中:

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3. <mapper namespace="com.taopoppy.wiki.mapper.DocMapperCust" >
  4. // 自定义sql
  5. <update id="updateEbookInfo">
  6. update ebook t1,(select ebook_id, count(1) doc_count, sum(`view_count`) view_count, sum(`vote_count`) vote_count from doc group by ebook_id) t2
  7. set t1.doc_count = t2.doc_count, t1.view_count = t2.view_count, t1.vote_count=t2.vote_count
  8. where t1.id = t2.ebook_id
  9. </update>
  10. </mapper>
  1. public interface DocMapperCust {
  2. // 集成方法
  3. public void updateEbookInfo();
  4. }
  1. /**
  2. * 定时更新电子书当中的数据
  3. */
  4. public void updateEbookInfo() {
  5. docMapperCust.updateEbookInfo();
  6. }

最后在定时器当中去调用上面的方法:

  1. package com.taopoppy.wiki.job;
  2. import com.taopoppy.wiki.service.DocService;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.slf4j.MDC;
  6. import org.springframework.scheduling.annotation.Scheduled;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.Resource;
  9. @Component
  10. public class DocJob {
  11. private static final Logger LOG = LoggerFactory.getLogger(DocJob.class);
  12. @Resource
  13. private DocService docService;
  14. /**
  15. * 每30秒更新电子书信息
  16. */
  17. @Scheduled(cron = "5/30 * * * * ?")
  18. public void cron() {
  19. LOG.info("更新电子书下的文档数据开始");
  20. long start = System.currentTimeMillis();
  21. docService.updateEbookInfo();
  22. LOG.info("更新电子书下的文档数据结束,耗时:{}毫秒", System.currentTimeMillis() - start);
  23. }
  24. }

日志流水号的使用

1. 日志问题修改

由于我们的日志现在是杂乱无章,我们需要想一个办法,把同一个业务的日志,把他们用一个唯一的号把它标识起来。

2. logback增加自定义参数

在resources/logback-spring.xml当中,给日志定义了这样的一个形式,上面注释掉的是我们应该在生产环境当中使用的,需要具体到年月日,时分秒,而下面的一行是我们开发环境使用的简单的表达。

  1. <!--<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %blue(%-50logger{50}:%-4line) %thread %green(%-18X{LOG_ID}) %msg%n</Pattern>-->
  2. <Pattern>%d{ss.SSS} %highlight(%-5level) %blue(%-30logger{30}:%-4line) %thread %green(%-18X{LOG_ID}) %msg%n</Pattern>

这里面是是会显示log-id的,我们应该在线程的入口,对我们来说,线程的入口可以放在logAspect,log_id的设置就可以放在这里来。logback帮我们提供了一个参数MDC.put,这个是内置的一个函数,我们要在两个位置使用,一个是AOP打印日志的时候,一个是定时器当中:

  1. @Resource
  2. private SnowFlake snowFlake;
  3. @Before("controllerPointcut()")
  4. public void doBefore(JoinPoint joinPoint) throws Throwable {
  5. // 增加日志流水号
  6. MDC.put("LOG_ID", String.valueOf(snowFlake.nextId()));
  7. // 开始打印请求日志
  8. ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
  9. HttpServletRequest request = attributes.getRequest();
  10. Signature signature = joinPoint.getSignature();
  11. String name = signature.getName();
  12. }
  1. @Component
  2. public class DocJob {
  3. private static final Logger LOG = LoggerFactory.getLogger(DocJob.class);
  4. @Resource
  5. private DocService docService;
  6. @Resource
  7. private SnowFlake snowFlake; // 注入
  8. /**
  9. * 每30秒更新电子书信息
  10. */
  11. @Scheduled(cron = "5/30 * * * * ?")
  12. public void cron() {
  13. // 增加日志流水号
  14. MDC.put("LOG_ID", String.valueOf(snowFlake.nextId()));
  15. LOG.info("更新电子书下的文档数据开始");
  16. long start = System.currentTimeMillis();
  17. docService.updateEbookInfo();
  18. LOG.info("更新电子书下的文档数据结束,耗时:{}毫秒", System.currentTimeMillis() - start);
  19. }
  20. }

WebSocket使用示例

1. 网站通知功能

网站通知功能有两种实现方法:定时轮询被动通知

  • 定时轮询:打开一个网站,网站会自己发起一个定时器(setTimeout或者setTimeInterval),定时的去调用后端的一个接口,这种需要后端有一个可以存储网站通知内容的地方,前端访问的接口进来就能访问这个地方,把要通知的内容返回给前端。优点就是不占用我们的连接,什么时候想获取通知就去调用一次请求。
  • 被动通知:使用到websocket,就是网站显示出来之后,就会创建一个websocket连接,跟服务器连接起来,只要不关闭网站,这个连接就一直存在。有一个缺点就是会一直占用我们的连接,优点就是实时的。

2. 集成WebSocket

WebSocket的内容虽然比较多,但是模式比较固定,到时候只需要拷贝一下内容即可。

(1)服务端

首先要引入spring-boot内置的websocket的依赖:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>

然后在config文件夹下添加下面的WebSocketConfig文件:

  1. package com.taopoppy.wiki.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  5. @Configuration
  6. public class WebSocketConfig {
  7. @Bean
  8. public ServerEndpointExporter serverEndpointExporter() {
  9. return new ServerEndpointExporter();
  10. }
  11. }

接着我们就去书写WebSocketServer服务了,其实和controller差不多:

  • 外部是可以通过/wx/{token}去访问websocket服务
  • 其次每个用户都需要传入一个唯一的token作为标识
  • map当中存放的是所有的连接
  • 连接成功就会往map当中put对应的token和session会话信息
  • 连接关闭就将token和session会话信息从map当中移除
  • 后面给前端推送消息主要使用的就是sendInfo这个方法 ```java package com.taopoppy.wiki.websocket;

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;

import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashMap;

@Component @ServerEndpoint(“/ws/{token}”) public class WebSocketServer { private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);

  1. /**
  2. * 每个客户端一个token
  3. */
  4. private String token = "";
  5. private static HashMap<String, Session> map = new HashMap<>();
  6. /**
  7. * 连接成功
  8. */
  9. @OnOpen
  10. public void onOpen(Session session, @PathParam("token") String token) {
  11. map.put(token, session);
  12. this.token = token;
  13. LOG.info("有新连接:token:{},session id:{},当前连接数:{}", token, session.getId(), map.size());
  14. }
  15. /**
  16. * 连接关闭
  17. */
  18. @OnClose
  19. public void onClose(Session session) {
  20. map.remove(this.token);
  21. LOG.info("连接关闭,token:{},session id:{}!当前连接数:{}", this.token, session.getId(), map.size());
  22. }
  23. /**
  24. * 收到消息
  25. */
  26. @OnMessage
  27. public void onMessage(String message, Session session) {
  28. LOG.info("收到消息:{},内容:{}", token, message);
  29. }
  30. /**
  31. * 连接错误
  32. */
  33. @OnError
  34. public void onError(Session session, Throwable error) {
  35. LOG.error("发生错误", error);
  36. }
  37. /**
  38. * 群发消息
  39. */
  40. public void sendInfo(String message) {
  41. for (String token : map.keySet()) {
  42. Session session = map.get(token);
  43. try {
  44. session.getBasicRemote().sendText(message);
  45. } catch (IOException e) {
  46. LOG.error("推送消息失败:{},内容:{}", token, message);
  47. }
  48. LOG.info("推送消息:{},内容:{}", token, message);
  49. }
  50. }

}

  1. <a name="IP840"></a>
  2. #### (2)客户端
  3. 客户端(前端)对应修改的地方有这么几个:

NODE_ENV=development VUE_APP_SERVER=http://127.0.0.1:8880 VUE_APP_WS_SERVER=ws://127.0.0.1:8880 // ws打头就是websocket协议

  1. 由于生产环境还没有域名,这里随便写即可

NODE_ENV=production VUE_APP_SERVER=http://wiki-server.courseimooc.com VUE_APP_WS_SERVER=ws://wiki-server.courseimooc.com

  1. util/tool.ts文件当中,我们新添加了一个功能,这是一个可以生成唯一token的方法,可以传入长度和进制:
  2. ```javascript
  3. /**
  4. * 随机生成[len]长度的[radix]进制数
  5. * @param len
  6. * @param radix 默认62
  7. * @returns {string}
  8. */
  9. public static uuid (len: number, radix = 62) {
  10. const chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.split('');
  11. const uuid = [];
  12. radix = radix || chars.length;
  13. for (let i = 0; i < len; i++) {
  14. uuid[i] = chars[0 | Math.random() * radix];
  15. }
  16. return uuid.join('');
  17. }

我们这边是进入页面就会和websocket进行连接,当然可以在登录之后执行下面这些代码:

  1. let websocket: any;
  2. let token: any;
  3. const onOpen = () => {
  4. console.log('WebSocket连接成功,状态码:', websocket.readyState)
  5. };
  6. const onMessage = (event: any) => {
  7. console.log('WebSocket收到消息:', event.data);
  8. notification['info']({
  9. message: '收到消息',
  10. description: event.data,
  11. });
  12. };
  13. const onError = () => {
  14. console.log('WebSocket连接错误,状态码:', websocket.readyState)
  15. };
  16. const onClose = () => {
  17. console.log('WebSocket连接关闭,状态码:', websocket.readyState)
  18. };
  19. const initWebSocket = () => {
  20. // 连接成功
  21. websocket.onopen = onOpen;
  22. // 收到消息的回调
  23. websocket.onmessage = onMessage;
  24. // 连接错误
  25. websocket.onerror = onError;
  26. // 连接关闭的回调
  27. websocket.onclose = onClose;
  28. };
  29. onMounted(() => {
  30. // WebSocket
  31. if ('WebSocket' in window) {
  32. token = Tool.uuid(10);
  33. // 连接地址:ws://127.0.0.1:8880/ws/xxx
  34. websocket = new WebSocket(process.env.VUE_APP_WS_SERVER + '/ws/' + token);
  35. initWebSocket()
  36. // 关闭
  37. // websocket.close();
  38. } else {
  39. alert('当前浏览器 不支持')
  40. }
  41. });

整个上面的代码就是websocket的连接集成,点赞的功能我们后续实现。

完成点赞通知功能

1. 点赞时后端往ws推送

  1. /**
  2. * 根据id去给文档点赞
  3. * @param id
  4. * @return
  5. */
  6. public void vote(Long id) {
  7. String ip = RequestContext.getRemoteAddr();
  8. if(redisUtil.validateRepeat("DOC_VOTE_"+id+"_"+ip,3600*24)) {
  9. docMapperCust.increaseVoteCount(id);
  10. } else {
  11. throw new BusinessException(BusinessExceptionCode.VOTE_REPEAT);
  12. }
  13. // 推送消息
  14. Doc docDb = docMapper.selectByPrimaryKey(id);
  15. webSocketServer.sendInfo("【" + docDb.getName() + "】被点赞");
  16. }

2. 前端收到ws消息,弹出消息

前端的代码我们上面已经展示,主要是在onMessage方法当中使用notification组件弹出提示即可。

异步化解耦点赞功能

我们将点赞和ws推送消息两个关联的功能在技术上进行解耦,比如点赞一多,ws一慢,前端如果迟迟收不到后端的返回,就会产生用户体验的问题。

1. 异步化解耦

所以我们把ws消息推送进行异步化,异步化也就是另外起一个线程来执行后面的内容。首先我们要启用异步化,先到启动类当中去添加一个这样的注解:

  1. @EnableAsync

那剩下的问题又来了,给哪个方法异步化呢?我们知道,springboot会给异步化的方法添加代理类,所以既不能将同一个类当中的方法异步化,也不能直接将webSocketServer.sendInfo异步化,一定要将异步化的代码写到另一个类当中去

我们新创建一个Service类,专门负责异步化消息,给sendInfo方法添加@Async异步化的注解。

  1. package com.taopoppy.wiki.service;
  2. import com.taopoppy.wiki.websocket.WebSocketServer;
  3. import org.springframework.scheduling.annotation.Async;
  4. import org.springframework.stereotype.Service;
  5. import javax.annotation.Resource;
  6. @Service
  7. public class WsService {
  8. @Resource
  9. private WebSocketServer webSocketServer;
  10. @Async
  11. public void sendInfo(String message) {
  12. webSocketServer.sendInfo(message);
  13. }
  14. }

原来在DocService.java当中的vote方法中的推送消息部分就变成了下面这样:

  1. // 推送消息
  2. Doc docDb = docMapper.selectByPrimaryKey(id);
  3. wsService.sendInfo("【" + docDb.getName() + "】被点赞");

2. 日志流水号

因为异步化之后,点赞业务和推送消息的业务日志线程号不一样,这是理所应当的,我们希望他们的日志流水号一样,这样两个关联的业务虽然不是同一个线程,但是属于同一个业务日志流水号,则查询运维的时候会很方便。

先在点赞的业务接口当中拿到业务日志流水号:

  1. // 推送消息
  2. Doc docDb = docMapper.selectByPrimaryKey(id);
  3. String logId = MDC.get("LOG_ID");
  4. wsService.sendInfo("【" + docDb.getName() + "】被点赞", logId);

然后在异步的推送线程当中取出点赞业务的流水号,放在当前推送的线程当中:

  1. @Async
  2. public void sendInfo(String message, String logId) {
  3. MDC.put("LOG_ID", logId);
  4. webSocketServer.sendInfo(message);
  5. }

3. 特性扩展

刚才我们说的Async这个注解,这个注解要生效,对应的Async注解的方法,不能和调用的地方放在同一个类当中,也就是说同一个类当中,不能A调用Async注解的B。另外还有一个注解也有这样的特性,就是事务。

同时对两张表有增删改的操作,就要考虑加事务,否则会造成数据不准确。当然也有不加事务的场景,不能一概而论。比如同时操作两个表,前一个表操作成功,后一个表操作失败,就会造成数据不一致和不准确,所以要么同时成功,要么同时失败,这就是事务的一个作用。

比如说在DocService的save方法当中,要同时操作docMapper和contentMapper,就要添加一个@Transactional事务的注解:

  1. @Transactional
  2. public void save(DocSaveReq req) {
  3. Doc doc = CopyUtil.copy(req, Doc.class);
  4. Content content = CopyUtil.copy(req,Content.class);
  5. if(ObjectUtils.isEmpty(req.getId())) {
  6. // 新增文档doc
  7. doc.setId(snowFlake.nextId());
  8. doc.setViewCount(0);
  9. doc.setVoteCount(0);
  10. docMapper.insert(doc);
  11. // 同时新增内容content
  12. content.setId(doc.getId());
  13. contentMapper.insert(content);
  14. } else {
  15. // 更新
  16. docMapper.updateByPrimaryKey(doc);
  17. // 包含大字段的操作
  18. int count = contentMapper.updateByPrimaryKeyWithBLOBs(content);
  19. if(count == 0) {
  20. // 如果content当中没有该id的记录,就要插入
  21. contentMapper.insert(content);
  22. }
  23. }
  24. }

这样添加注解之后,就必须在其他类当中去调用这个方法,如果在DocService这个类当中其他方法调用save方法,那么这个事务注解就不起作用了。

MQ解耦点赞通知功能

1. Springboot异步化的问题

  1. @Async
  2. public void sendInfo(String message, String logId) {
  3. MDC.put("LOG_ID", logId);
  4. webSocketServer.sendInfo(message);
  5. }

假如上面这部分的方法里面的程序很复杂,或者出错等原因,会导致我这个线程一直会被占用,假如点赞这个业务量很大的时候,就会开启很多个线程,当然这里可以添加一个线程池,让异步化规定只能开启多少个线程,一旦超过了这个线程池,它还是会变成同步的

即使有线程池,假如这个业务是耗时很长的,业务量很大的话,线程越来越多,就会把服务器塞满,影响原有的业务内容,解决这个问题我们需要使用到MQ,MQ就是消息队列,和redis一样,是一个中间件,需要单独安装,常用的MQ有rocketmq,kafka,rabbitmq等等。

MQ分为发送方和接收方,我们上面构造好要发送给前端的通知消息之后,就往MQ里面送,MQ里面还有一个消费端,就是另起一个服务来监听MQ,收到MQ之后才会去执行点赞通知,所以MQ的发送和接收就变成两个服务器,即使接收很慢,接收的地方挂了,也不影响现有的点赞功能,只不过前端收不到点赞的通知罢了。

MQ就是一个第三方软件,到官网下载rocketmq-all-4.7.1-bin-release.zip,然后解压到电脑。进入bin目录,启动mqnamesrv.cmd和mqborker.cmd,前者是注册中心,后者是真正的MQ服务端,springboot就是MQ的客户端。

2. 使用RocketMQ解耦

先添加一下环境变量:ROCKETMQ_HOME(E:\rocketmq-all-4.7.1-bin-release\rocketmq-all-4.7.1-bin-release),然后打开两个新的cmd窗口,路径定位在MQ的bin目录下,先在一个窗口执行mqnamesrv.cmd,然后在另一个窗口执行mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true,后面添加的这个参数后,SpringBoot可以发送任意的topic到RocketMQ,否则需要在RocketMQ里先创建好Topic。
1649309244(1).png

(1)发送端

然后在pom.xml当中集成依赖:

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.0.2</version>
  5. </dependency>

然后在application.properties当中配置:

  1. # rocketmq配置
  2. rocketmq.name-server=127.0.0.1:9876
  3. rocketmq.producer.group=default

在DocService.java当中去书写发送端:

  1. @Resource
  2. private RocketMQTemplate rocketMQTemplate;
  3. public void vote(Long id) {
  4. ...
  5. // 推送消息
  6. Doc docDb = docMapper.selectByPrimaryKey(id);
  7. String logId = MDC.get("LOG_ID");
  8. // 给mq推送消息(参数:主题,内容)
  9. rocketMQTemplate.convertAndSend("VOTE_TOPIC","【" + docDb.getName() + "】被点赞");
  10. }

(2)接收端

创建一个rocketmq的package包,然后创建一个VoteTopicConsumer.java,内容如下:

  1. package com.taopoppy.wiki.rocketmq;
  2. import com.taopoppy.wiki.websocket.WebSocketServer;
  3. import org.apache.rocketmq.common.message.MessageExt;
  4. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  5. import org.apache.rocketmq.spring.core.RocketMQListener;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.stereotype.Service;
  9. import javax.annotation.Resource;
  10. @Service
  11. @RocketMQMessageListener(consumerGroup = "default", topic = "VOTE_TOPIC")
  12. public class VoteTopicConsumer implements RocketMQListener<MessageExt> {
  13. private static final Logger LOG = LoggerFactory.getLogger(VoteTopicConsumer.class);
  14. @Resource
  15. public WebSocketServer webSocketServer;
  16. @Override
  17. public void onMessage(MessageExt messageExt) {
  18. byte[] body = messageExt.getBody();
  19. LOG.info("ROCKETMQ收到消息:{}", new String(body));
  20. webSocketServer.sendInfo(new String(body));
  21. }
  22. }

整个过程是:

  • 前端发起点赞请求
  • springboot收到请求,发送VOTE_TOPIC给mq
  • mq把消息推送给监听的地方
  • springboot就是监听VOTE_TOPIC的地方
  • springboot再把消息通过websocket发给前端