阅读文档时,更新阅读数
文档的点赞功能,更新点赞数
更新电子书的文档数,阅读数,点赞数
有文档被点暂时,前端可以收到通知
SpringBoot异步化,WebSocket,RocketMQ
文档阅读数更新
1. 自定义sql
前端点击某篇文档的时候,doc.view_count+1,这个功能的实现是需要在查找文档内容的时候去做的,也就是在DocService当中的findContent当中,我们更新Doc表当中的view_count自增1可以使用这样的sql:
update doc set view_count = view_count + 1 where id = 1
在我们生成的mapper当中,这种sql是没有实现的,我们需要自己去自定义,我们首先在resources当中创建一个DocMapperCust.xml,内容如下:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.taopoppy.wiki.mapper.DocMapperCust" >
<update id="increaseViewCount">
update doc set view_count = view_count + 1 where id = #{id}
</update>
</mapper>
注意。如果是增删改的sql,都可以使用update包裹起来,查一般使用select包裹起来。另外increaseViewCount是要和西面的DocMapperCust的接口当中的函数名一一对应的。其中这个sql当中的id是动态传递的。看看接口DocMapperCust:
package com.taopoppy.wiki.mapper;
import org.apache.ibatis.annotations.Param;
public interface DocMapperCust {
public void increaseViewCount(@Param("id") Long id);
}
这样的话,我们就可以在DocService.java当中进行使用:
@Service
public class DocService {
@Resource
private DocMapperCust docMapperCust;
/**
* 根据id去content表当中查找内容
* @param id
* @return
*/
public String findContent(Long id) {
Content content = contentMapper.selectByPrimaryKey(id);
// 文档阅读数+1
docMapperCust.increaseViewCount(id);
if(ObjectUtils.isEmpty(content)) {
return "";
} else {
return content.getContent();
}
}
}
2. 前端代码
前端的对应的代码在doc.vue当中,显示了点赞和阅读的数量。
文档点赞功能开发
1. 后端代码实现
前端在文档内容的下方,增加点赞按钮,点击后doc.vote_count+1,实现的方式和阅读数的比较雷同
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.taopoppy.wiki.mapper.DocMapperCust" >
<update id="increaseViewCount">
update doc set view_cout = view_count + 1 where id = #{id}
</update>
<update id="increaseVoteCount">
update doc set vote_count = vote_count + 1 where id = #{id}
</update>
</mapper>
package com.taopoppy.wiki.mapper;
import org.apache.ibatis.annotations.Param;
public interface DocMapperCust {
public void increaseViewCount(@Param("id") Long id);
public void increaseVoteCount(@Param("id") Long id);
}
/**
* 根据id去给文档点赞
* @param id
* @return
*/
public void vote(Long id) {
docMapperCust.increaseVoteCount(id);
}
@GetMapping("/vote/{id}")
public CommonResp vote(@PathVariable Long id) {
CommonResp commonResp = new CommonResp();
docService.vote(id);
return commonResp;
}
2. 点赞拦截
因为前端的点赞数量显示不是从后端拿的数据,而是在点赞的同时自己自增的1,而点赞按钮不能无限的点赞,所以需要实现拦截功能。
要实现的功能是一个用户只能点赞一次,而且是对外部开放的,也就是用户不需要登录也可以点赞,但是只能点赞一次,所以需要对用户的IP和文章的id要同时进行过滤,这个时候就要使用到redis,点赞的时候先看看redis中有没有IP+docId,有表示点赞过,没有就可以点赞。点赞成功后往redis中放IP+docId。
首先定义了两个工具类:
package com.taopoppy.wiki.util;
import java.io.Serializable;
public class RequestContext implements Serializable {
private static ThreadLocal<String> remoteAddr = new ThreadLocal<>();
public static String getRemoteAddr() {
return remoteAddr.get();
}
public static void setRemoteAddr(String remoteAddr) {
RequestContext.remoteAddr.set(remoteAddr);
}
}
package com.taopoppy.wiki.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Component
public class RedisUtil {
private static final Logger LOG = LoggerFactory.getLogger(RedisUtil.class);
@Resource
private RedisTemplate redisTemplate;
/**
* true:不存在,放一个KEY
* false:已存在
* @param key
* @param second
* @return
*/
public boolean validateRepeat(String key, long second) {
if (redisTemplate.hasKey(key)) {
LOG.info("key已存在:{}", key);
return false;
} else {
LOG.info("key不存在,放入:{},过期 {} 秒", key, second);
redisTemplate.opsForValue().set(key, key, second, TimeUnit.SECONDS);
return true;
}
}
}
这两个工具类怎么使用呢,首先我们在日志打印得地方获取远程IP:
@Aspect
@Component
public class LogAspect {
@Before("controllerPointcut()")
public void doBefore(JoinPoint joinPoint) throws Throwable {
LOG.info("------------- 开始 -------------");
LOG.info("请求地址: {} {}", request.getRequestURL().toString(), request.getMethod());
LOG.info("类名方法: {}.{}", signature.getDeclaringTypeName(), name);
LOG.info("远程地址: {}", request.getRemoteAddr());
// 获取到远程IP放进线程本地变量
RequestContext.setRemoteAddr(getRemoteIp(request));
}
/**
* 使用nginx做反向代理,需要用该方法才能取到真实的远程IP
* @param request
* @return
*/
public String getRemoteIp(HttpServletRequest request) {
String ip = request.getHeader("x-forwarded-for");
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("WL-Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
return ip;
}
}
然后在点赞的处理方法当中拿到远程IP,对redis当中的数据做判断:
@Resource
private RedisUtil redisUtil;
/**
* 根据id去给文档点赞
* @param id
* @return
*/
public void vote(Long id) {
// 远程IP + doc.id 作为key,24小时之内不能重复点赞
String ip = RequestContext.getRemoteAddr();
if(redisUtil.validateRepeat("DOC_VOTE_"+id+"_"+ip,3600*24)) {
docMapperCust.increaseVoteCount(id);
} else {
throw new BusinessException(BusinessExceptionCode.VOTE_REPEAT);
}
}
电子书信息更新方案
由于电子书的也有点赞数,阅读数,而且是其所有文档的点赞数和阅读数的总和,所以当对文档点赞或者阅读,其所属的电子书的点赞数和阅读数也应该更新,更新方案有两种:
- 实时更新:优点是数据准确,缺点是改动的地方多,需要在所有相关业务逻辑的地方去实时更新
- 定时批量更新:优点是改动的地方小,侵入性小,缺点就是实时性不强
我们现在来使用定时批量更新的方式,首先统计doc所有ebook_id为1的sql是这样书写的,把ebook_id为1的全部查出来,将view_count和vote_count字段进行累加
select count(1), sum(`view_count`), sum(`vote_count`) from doc where ebook_id=1
如果我们统一按照ebook_id进行分组,我们可以这样书写,按照ebook_id进行分组,将每一组里的数据进行view_count和vote_count进行累加。然后给count(1)起个名字doc_count,给sum(view_count
)起个名字view_count,给sum(vote_count
)起个名字vote_count,这样查询出来的结果是按照我们的名字罗列的
select ebook_id, count(1) doc_count, sum(`view_count`) view_count, sum(`vote_count`) vote_count from doc group by ebook_id;
其实上面查询出来的的数据也算一张表了,我们可以将其和ebook表进行关联更新:将ebook表设置为t1表,将上面分组查询出来的数据作为t2表,然后将t2的一些属性设置给t1,条件就是t1的id和t2的ebook_id是一样的。
update ebook t1,(select ebook_id, count(1) doc_count, sum(`view_count`) view_count, sum(`vote_count`) vote_count from doc group by ebook_id) t2
set t1.doc_count = t2.doc_count, t1.view_count = t2.view_count, t1.vote_count=t2.vote_count
where t1.id = t2.ebook_id
SpringBoot定时任务示例
1. 启用定时器
启用定时任务的话只需要启用定时器,不需要引入依赖,首先要去启动类当中添加注解@EnableScheduling:
@EnableScheduling
public class WikiApplication {}
2. 两种定时器写法
package com.taopoppy.wiki.job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class TestJob {
private static final Logger LOG = LoggerFactory.getLogger(TestJob.class);
/**
* 固定时间间隔,fixedRate单位毫秒
*/
@Scheduled(fixedRate = 1000)
public void simple() throws InterruptedException {
SimpleDateFormat formatter = new SimpleDateFormat("mm:ss");
String dateString = formatter.format(new Date());
Thread.sleep(2000);
LOG.info("每隔5秒钟执行一次: {}", dateString);
}
/**
* 自定义cron表达式跑批
* 只有等上一次执行完成,下一次才会在下一个时间点执行,错过就错过
*/
@Scheduled(cron = "*/1 * * * * ?")
public void cron() throws InterruptedException {
SimpleDateFormat formatter = new SimpleDateFormat("mm:ss SSS");
String dateString = formatter.format(new Date());
Thread.sleep(1500);
LOG.info("每隔1秒钟执行一次: {}", dateString);
}
}
这个是springboot内置的定时器的两种写法,前者是通过固定的时间间隔,后者是通过cron表达式,值得注意的事两个定时任务是属于同一个线程,所以如果你有一个比较大的调度中心的话应该去使用quartz这个第三方框架,这个是当下最流行的定时器框架。
完成电子书信息更新
更新策略是需要根据网站的流量来决定的,可以每分钟每小时,或者每天,而且Cron表达式不需要去记,直接在Cron在线表达式去生成,现在我们就去把前面书写的sql集成到定时器当中:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.taopoppy.wiki.mapper.DocMapperCust" >
// 自定义sql
<update id="updateEbookInfo">
update ebook t1,(select ebook_id, count(1) doc_count, sum(`view_count`) view_count, sum(`vote_count`) vote_count from doc group by ebook_id) t2
set t1.doc_count = t2.doc_count, t1.view_count = t2.view_count, t1.vote_count=t2.vote_count
where t1.id = t2.ebook_id
</update>
</mapper>
public interface DocMapperCust {
// 集成方法
public void updateEbookInfo();
}
/**
* 定时更新电子书当中的数据
*/
public void updateEbookInfo() {
docMapperCust.updateEbookInfo();
}
最后在定时器当中去调用上面的方法:
package com.taopoppy.wiki.job;
import com.taopoppy.wiki.service.DocService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class DocJob {
private static final Logger LOG = LoggerFactory.getLogger(DocJob.class);
@Resource
private DocService docService;
/**
* 每30秒更新电子书信息
*/
@Scheduled(cron = "5/30 * * * * ?")
public void cron() {
LOG.info("更新电子书下的文档数据开始");
long start = System.currentTimeMillis();
docService.updateEbookInfo();
LOG.info("更新电子书下的文档数据结束,耗时:{}毫秒", System.currentTimeMillis() - start);
}
}
日志流水号的使用
1. 日志问题修改
由于我们的日志现在是杂乱无章,我们需要想一个办法,把同一个业务的日志,把他们用一个唯一的号把它标识起来。
2. logback增加自定义参数
在resources/logback-spring.xml当中,给日志定义了这样的一个形式,上面注释掉的是我们应该在生产环境当中使用的,需要具体到年月日,时分秒,而下面的一行是我们开发环境使用的简单的表达。
<!--<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %blue(%-50logger{50}:%-4line) %thread %green(%-18X{LOG_ID}) %msg%n</Pattern>-->
<Pattern>%d{ss.SSS} %highlight(%-5level) %blue(%-30logger{30}:%-4line) %thread %green(%-18X{LOG_ID}) %msg%n</Pattern>
这里面是是会显示log-id的,我们应该在线程的入口,对我们来说,线程的入口可以放在logAspect,log_id的设置就可以放在这里来。logback帮我们提供了一个参数MDC.put,这个是内置的一个函数,我们要在两个位置使用,一个是AOP打印日志的时候,一个是定时器当中:
@Resource
private SnowFlake snowFlake;
@Before("controllerPointcut()")
public void doBefore(JoinPoint joinPoint) throws Throwable {
// 增加日志流水号
MDC.put("LOG_ID", String.valueOf(snowFlake.nextId()));
// 开始打印请求日志
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
Signature signature = joinPoint.getSignature();
String name = signature.getName();
}
@Component
public class DocJob {
private static final Logger LOG = LoggerFactory.getLogger(DocJob.class);
@Resource
private DocService docService;
@Resource
private SnowFlake snowFlake; // 注入
/**
* 每30秒更新电子书信息
*/
@Scheduled(cron = "5/30 * * * * ?")
public void cron() {
// 增加日志流水号
MDC.put("LOG_ID", String.valueOf(snowFlake.nextId()));
LOG.info("更新电子书下的文档数据开始");
long start = System.currentTimeMillis();
docService.updateEbookInfo();
LOG.info("更新电子书下的文档数据结束,耗时:{}毫秒", System.currentTimeMillis() - start);
}
}
WebSocket使用示例
1. 网站通知功能
网站通知功能有两种实现方法:定时轮询和被动通知。
- 定时轮询:打开一个网站,网站会自己发起一个定时器(setTimeout或者setTimeInterval),定时的去调用后端的一个接口,这种需要后端有一个可以存储网站通知内容的地方,前端访问的接口进来就能访问这个地方,把要通知的内容返回给前端。优点就是不占用我们的连接,什么时候想获取通知就去调用一次请求。
- 被动通知:使用到websocket,就是网站显示出来之后,就会创建一个websocket连接,跟服务器连接起来,只要不关闭网站,这个连接就一直存在。有一个缺点就是会一直占用我们的连接,优点就是实时的。
2. 集成WebSocket
WebSocket的内容虽然比较多,但是模式比较固定,到时候只需要拷贝一下内容即可。
(1)服务端
首先要引入spring-boot内置的websocket的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
然后在config文件夹下添加下面的WebSocketConfig文件:
package com.taopoppy.wiki.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
接着我们就去书写WebSocketServer服务了,其实和controller差不多:
- 外部是可以通过/wx/{token}去访问websocket服务
- 其次每个用户都需要传入一个唯一的token作为标识
- map当中存放的是所有的连接
- 连接成功就会往map当中put对应的token和session会话信息
- 连接关闭就将token和session会话信息从map当中移除
- 后面给前端推送消息主要使用的就是sendInfo这个方法 ```java package com.taopoppy.wiki.websocket;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;
import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashMap;
@Component @ServerEndpoint(“/ws/{token}”) public class WebSocketServer { private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
/**
* 每个客户端一个token
*/
private String token = "";
private static HashMap<String, Session> map = new HashMap<>();
/**
* 连接成功
*/
@OnOpen
public void onOpen(Session session, @PathParam("token") String token) {
map.put(token, session);
this.token = token;
LOG.info("有新连接:token:{},session id:{},当前连接数:{}", token, session.getId(), map.size());
}
/**
* 连接关闭
*/
@OnClose
public void onClose(Session session) {
map.remove(this.token);
LOG.info("连接关闭,token:{},session id:{}!当前连接数:{}", this.token, session.getId(), map.size());
}
/**
* 收到消息
*/
@OnMessage
public void onMessage(String message, Session session) {
LOG.info("收到消息:{},内容:{}", token, message);
}
/**
* 连接错误
*/
@OnError
public void onError(Session session, Throwable error) {
LOG.error("发生错误", error);
}
/**
* 群发消息
*/
public void sendInfo(String message) {
for (String token : map.keySet()) {
Session session = map.get(token);
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOG.error("推送消息失败:{},内容:{}", token, message);
}
LOG.info("推送消息:{},内容:{}", token, message);
}
}
}
<a name="IP840"></a>
#### (2)客户端
客户端(前端)对应修改的地方有这么几个:
NODE_ENV=development VUE_APP_SERVER=http://127.0.0.1:8880 VUE_APP_WS_SERVER=ws://127.0.0.1:8880 // ws打头就是websocket协议
由于生产环境还没有域名,这里随便写即可
NODE_ENV=production VUE_APP_SERVER=http://wiki-server.courseimooc.com VUE_APP_WS_SERVER=ws://wiki-server.courseimooc.com
在util/tool.ts文件当中,我们新添加了一个功能,这是一个可以生成唯一token的方法,可以传入长度和进制:
```javascript
/**
* 随机生成[len]长度的[radix]进制数
* @param len
* @param radix 默认62
* @returns {string}
*/
public static uuid (len: number, radix = 62) {
const chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.split('');
const uuid = [];
radix = radix || chars.length;
for (let i = 0; i < len; i++) {
uuid[i] = chars[0 | Math.random() * radix];
}
return uuid.join('');
}
我们这边是进入页面就会和websocket进行连接,当然可以在登录之后执行下面这些代码:
let websocket: any;
let token: any;
const onOpen = () => {
console.log('WebSocket连接成功,状态码:', websocket.readyState)
};
const onMessage = (event: any) => {
console.log('WebSocket收到消息:', event.data);
notification['info']({
message: '收到消息',
description: event.data,
});
};
const onError = () => {
console.log('WebSocket连接错误,状态码:', websocket.readyState)
};
const onClose = () => {
console.log('WebSocket连接关闭,状态码:', websocket.readyState)
};
const initWebSocket = () => {
// 连接成功
websocket.onopen = onOpen;
// 收到消息的回调
websocket.onmessage = onMessage;
// 连接错误
websocket.onerror = onError;
// 连接关闭的回调
websocket.onclose = onClose;
};
onMounted(() => {
// WebSocket
if ('WebSocket' in window) {
token = Tool.uuid(10);
// 连接地址:ws://127.0.0.1:8880/ws/xxx
websocket = new WebSocket(process.env.VUE_APP_WS_SERVER + '/ws/' + token);
initWebSocket()
// 关闭
// websocket.close();
} else {
alert('当前浏览器 不支持')
}
});
整个上面的代码就是websocket的连接集成,点赞的功能我们后续实现。
完成点赞通知功能
1. 点赞时后端往ws推送
/**
* 根据id去给文档点赞
* @param id
* @return
*/
public void vote(Long id) {
String ip = RequestContext.getRemoteAddr();
if(redisUtil.validateRepeat("DOC_VOTE_"+id+"_"+ip,3600*24)) {
docMapperCust.increaseVoteCount(id);
} else {
throw new BusinessException(BusinessExceptionCode.VOTE_REPEAT);
}
// 推送消息
Doc docDb = docMapper.selectByPrimaryKey(id);
webSocketServer.sendInfo("【" + docDb.getName() + "】被点赞");
}
2. 前端收到ws消息,弹出消息
前端的代码我们上面已经展示,主要是在onMessage方法当中使用notification组件弹出提示即可。
异步化解耦点赞功能
我们将点赞和ws推送消息两个关联的功能在技术上进行解耦,比如点赞一多,ws一慢,前端如果迟迟收不到后端的返回,就会产生用户体验的问题。
1. 异步化解耦
所以我们把ws消息推送进行异步化,异步化也就是另外起一个线程来执行后面的内容。首先我们要启用异步化,先到启动类当中去添加一个这样的注解:
@EnableAsync
那剩下的问题又来了,给哪个方法异步化呢?我们知道,springboot会给异步化的方法添加代理类,所以既不能将同一个类当中的方法异步化,也不能直接将webSocketServer.sendInfo异步化,一定要将异步化的代码写到另一个类当中去。
我们新创建一个Service类,专门负责异步化消息,给sendInfo方法添加@Async异步化的注解。
package com.taopoppy.wiki.service;
import com.taopoppy.wiki.websocket.WebSocketServer;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class WsService {
@Resource
private WebSocketServer webSocketServer;
@Async
public void sendInfo(String message) {
webSocketServer.sendInfo(message);
}
}
原来在DocService.java当中的vote方法中的推送消息部分就变成了下面这样:
// 推送消息
Doc docDb = docMapper.selectByPrimaryKey(id);
wsService.sendInfo("【" + docDb.getName() + "】被点赞");
2. 日志流水号
因为异步化之后,点赞业务和推送消息的业务日志线程号不一样,这是理所应当的,我们希望他们的日志流水号一样,这样两个关联的业务虽然不是同一个线程,但是属于同一个业务日志流水号,则查询运维的时候会很方便。
先在点赞的业务接口当中拿到业务日志流水号:
// 推送消息
Doc docDb = docMapper.selectByPrimaryKey(id);
String logId = MDC.get("LOG_ID");
wsService.sendInfo("【" + docDb.getName() + "】被点赞", logId);
然后在异步的推送线程当中取出点赞业务的流水号,放在当前推送的线程当中:
@Async
public void sendInfo(String message, String logId) {
MDC.put("LOG_ID", logId);
webSocketServer.sendInfo(message);
}
3. 特性扩展
刚才我们说的Async这个注解,这个注解要生效,对应的Async注解的方法,不能和调用的地方放在同一个类当中,也就是说同一个类当中,不能A调用Async注解的B。另外还有一个注解也有这样的特性,就是事务。
同时对两张表有增删改的操作,就要考虑加事务,否则会造成数据不准确。当然也有不加事务的场景,不能一概而论。比如同时操作两个表,前一个表操作成功,后一个表操作失败,就会造成数据不一致和不准确,所以要么同时成功,要么同时失败,这就是事务的一个作用。
比如说在DocService的save方法当中,要同时操作docMapper和contentMapper,就要添加一个@Transactional事务的注解:
@Transactional
public void save(DocSaveReq req) {
Doc doc = CopyUtil.copy(req, Doc.class);
Content content = CopyUtil.copy(req,Content.class);
if(ObjectUtils.isEmpty(req.getId())) {
// 新增文档doc
doc.setId(snowFlake.nextId());
doc.setViewCount(0);
doc.setVoteCount(0);
docMapper.insert(doc);
// 同时新增内容content
content.setId(doc.getId());
contentMapper.insert(content);
} else {
// 更新
docMapper.updateByPrimaryKey(doc);
// 包含大字段的操作
int count = contentMapper.updateByPrimaryKeyWithBLOBs(content);
if(count == 0) {
// 如果content当中没有该id的记录,就要插入
contentMapper.insert(content);
}
}
}
这样添加注解之后,就必须在其他类当中去调用这个方法,如果在DocService这个类当中其他方法调用save方法,那么这个事务注解就不起作用了。
MQ解耦点赞通知功能
1. Springboot异步化的问题
@Async
public void sendInfo(String message, String logId) {
MDC.put("LOG_ID", logId);
webSocketServer.sendInfo(message);
}
假如上面这部分的方法里面的程序很复杂,或者出错等原因,会导致我这个线程一直会被占用,假如点赞这个业务量很大的时候,就会开启很多个线程,当然这里可以添加一个线程池,让异步化规定只能开启多少个线程,一旦超过了这个线程池,它还是会变成同步的。
即使有线程池,假如这个业务是耗时很长的,业务量很大的话,线程越来越多,就会把服务器塞满,影响原有的业务内容,解决这个问题我们需要使用到MQ,MQ就是消息队列,和redis一样,是一个中间件,需要单独安装,常用的MQ有rocketmq,kafka,rabbitmq等等。
MQ分为发送方和接收方,我们上面构造好要发送给前端的通知消息之后,就往MQ里面送,MQ里面还有一个消费端,就是另起一个服务来监听MQ,收到MQ之后才会去执行点赞通知,所以MQ的发送和接收就变成两个服务器,即使接收很慢,接收的地方挂了,也不影响现有的点赞功能,只不过前端收不到点赞的通知罢了。
MQ就是一个第三方软件,到官网下载rocketmq-all-4.7.1-bin-release.zip,然后解压到电脑。进入bin目录,启动mqnamesrv.cmd和mqborker.cmd,前者是注册中心,后者是真正的MQ服务端,springboot就是MQ的客户端。
2. 使用RocketMQ解耦
先添加一下环境变量:ROCKETMQ_HOME(E:\rocketmq-all-4.7.1-bin-release\rocketmq-all-4.7.1-bin-release),然后打开两个新的cmd窗口,路径定位在MQ的bin目录下,先在一个窗口执行mqnamesrv.cmd,然后在另一个窗口执行mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true,后面添加的这个参数后,SpringBoot可以发送任意的topic到RocketMQ,否则需要在RocketMQ里先创建好Topic。
(1)发送端
然后在pom.xml当中集成依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
然后在application.properties当中配置:
# rocketmq配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=default
在DocService.java当中去书写发送端:
@Resource
private RocketMQTemplate rocketMQTemplate;
public void vote(Long id) {
...
// 推送消息
Doc docDb = docMapper.selectByPrimaryKey(id);
String logId = MDC.get("LOG_ID");
// 给mq推送消息(参数:主题,内容)
rocketMQTemplate.convertAndSend("VOTE_TOPIC","【" + docDb.getName() + "】被点赞");
}
(2)接收端
创建一个rocketmq的package包,然后创建一个VoteTopicConsumer.java,内容如下:
package com.taopoppy.wiki.rocketmq;
import com.taopoppy.wiki.websocket.WebSocketServer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
@RocketMQMessageListener(consumerGroup = "default", topic = "VOTE_TOPIC")
public class VoteTopicConsumer implements RocketMQListener<MessageExt> {
private static final Logger LOG = LoggerFactory.getLogger(VoteTopicConsumer.class);
@Resource
public WebSocketServer webSocketServer;
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
LOG.info("ROCKETMQ收到消息:{}", new String(body));
webSocketServer.sendInfo(new String(body));
}
}
整个过程是:
- 前端发起点赞请求
- springboot收到请求,发送VOTE_TOPIC给mq
- mq把消息推送给监听的地方
- springboot就是监听VOTE_TOPIC的地方
- springboot再把消息通过websocket发给前端