源码地址:

https://gitee.com/jack_whh/dcs-websocket-session

在显示项目中遇到了一个问题,需要使用到websocket与小程序建立长链接。由于项目是负载均衡的,存在项目部署在多台机器上。这样就会存在一个问题,当一次请求负载到第一台服务器时,socketsession在第一台服务器线程上,第二次请求,负载到第二台服务器上,需要通过id查找当前用户的session时,是查找不到的。

image.png

  • 可以看到,由于websocket的session并没有实现序列化接口。所以无法将session序列化到redis中。
  • web的中的httpsession 主要是通过下面的两个管理器实现序列化的。
    1. org.apache.catalina.session.StandardManager
    2. org.apache.catalina.session.PersistentManager

StandardManager是Tomcat默认使用的,在web应用程序关闭时,对内存中的所有HttpSession对象进行持久化,把他们保存到文件系统中。默认的存储文件为

  1. <tomcat安装目录>/work/Catalina/<主机名>/<应用程序名>/sessions.ser

PersistentManager比StandardManager更为灵活,只要某个设备提供了实现org.apache.catalina.Store接口的驱动类,PersistentManager就可以将HttpSession对象保存到该设备。

image.png

所以spring-session-redis 解决分布场景下的session共享就是将session序列化到redis中间件中,使用filter 加装饰器模式解决分布式场景httpsession 共享问题。

# 解决方案

  • 使用消息中间件解决websocket session共享问题。
  • 使用redis的发布订阅模式解决

本文使用方式二

使用StringRedisTemplate的convertAndSend方法向指定频道发送指定消息:

  1. this.execute((connection) -> {
  2. connection.publish(rawChannel, rawMessage);
  3. return null;
  4. }, true);

redis的命令publish channel message

添加一个监听的容器以及一个监听器适配器

  1. @Bean
  2. RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter){
  3. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  4. container.setConnectionFactory(connectionFactory);
  5. // 可以添加多个 messageListener,配置不同的交换机
  6. container.addMessageListener(listenerAdapter, new PatternTopic(Constants.REDIS_CHANNEL));// 订阅最新消息频道
  7. return container;
  8. }
  9. @Bean
  10. MessageListenerAdapter listenerAdapter(RedisReceiver receiver){
  11. // 消息监听适配器
  12. return new MessageListenerAdapter(receiver, "onMessage");
  13. }

添加消息接收器

  1. /**
  2. * 消息监听对象,接收订阅消息
  3. */
  4. @Component
  5. public class RedisReceiver implements MessageListener {
  6. Logger log = LoggerFactory.getLogger(this.getClass());
  7. @Autowired
  8. private WebSocketServer webSocketServer;
  9. /**
  10. * 处理接收到的订阅消息
  11. */
  12. @Override
  13. public void onMessage(Message message, byte[] pattern)
  14. {
  15. String channel = new String(message.getChannel());// 订阅的频道名称
  16. String msg = "";
  17. try
  18. {
  19. msg = new String(message.getBody(), Constants.UTF8);//注意与发布消息编码一致,否则会乱码
  20. if (!StringUtils.isEmpty(msg)){
  21. if (Constants.REDIS_CHANNEL.endsWith(channel))// 最新消息
  22. {
  23. JSONObject jsonObject = JSON.parseObject(msg);
  24. webSocketServer.sendMessageByWayBillId(
  25. Long.parseLong(jsonObject.get(Constants.REDIS_MESSAGE_KEY).toString())
  26. ,jsonObject.get(Constants.REDIS_MESSAGE_VALUE).toString());
  27. }else{
  28. //TODO 其他订阅的消息处理
  29. }
  30. }else{
  31. log.info("消息内容为空,不处理。");
  32. }
  33. }
  34. catch (Exception e)
  35. {
  36. log.error("处理消息异常:"+e.toString());
  37. e.printStackTrace();
  38. }
  39. }
  40. }

websocket的配置类

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  4. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  5. /**
  6. * @description: websocket的配置类
  7. * @dateTime: 2021/6/16 15:43
  8. */
  9. @Configuration
  10. @EnableWebSocket
  11. public class WebSocketConfiguration {
  12. @Bean
  13. public ServerEndpointExporter serverEndpointExporter() {
  14. return new ServerEndpointExporter();
  15. }
  16. }

添加websocket的服务组件

  1. import com.alibaba.fastjson.JSON;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.data.redis.core.StringRedisTemplate;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.util.StringUtils;
  8. import javax.validation.constraints.NotNull;
  9. import javax.websocket.*;
  10. import javax.websocket.server.PathParam;
  11. import javax.websocket.server.ServerEndpoint;
  12. import java.io.IOException;
  13. import java.io.UnsupportedEncodingException;
  14. import java.util.HashMap;
  15. import java.util.Map;
  16. import java.util.concurrent.ConcurrentHashMap;
  17. import java.util.concurrent.atomic.AtomicInteger;
  18. @ServerEndpoint("/websocket/{id}")
  19. @Component
  20. public class WebSocketServer {
  21. private static final long sessionTimeout = 600000;
  22. private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
  23. /**
  24. * 当前在线连接数
  25. */
  26. private static AtomicInteger onlineCount = new AtomicInteger(0);
  27. /**
  28. * 用来存放每个客户端对应的 WebSocketServer 对象
  29. */
  30. private static ConcurrentHashMap<Long, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
  31. /**
  32. * 与某个客户端的连接会话,需要通过它来给客户端发送数据
  33. */
  34. private Session session;
  35. /**
  36. * 接收 id
  37. */
  38. private Long id;
  39. @Autowired
  40. private StringRedisTemplate template;
  41. /**
  42. * 连接建立成功调用的方法
  43. */
  44. @OnOpen
  45. public void onOpen(Session session, @PathParam("id") Long id) {
  46. session.setMaxIdleTimeout(sessionTimeout);
  47. this.session = session;
  48. this.id = id;
  49. if (webSocketMap.containsKey(id)) {
  50. webSocketMap.remove(id);
  51. webSocketMap.put(id, this);
  52. } else {
  53. webSocketMap.put(id, this);
  54. addOnlineCount();
  55. }
  56. log.info("编号id:" + id + "连接,当前在线数为:" + getOnlineCount());
  57. try {
  58. sendMessage("连接成功!");
  59. } catch (IOException e) {
  60. log.error("编号id:" + id + ",网络异常!!!!!!");
  61. }
  62. }
  63. /**
  64. * 连接关闭调用的方法
  65. */
  66. @OnClose
  67. public void onClose() {
  68. if (webSocketMap.containsKey(id)) {
  69. webSocketMap.remove(id);
  70. subOnlineCount();
  71. }
  72. log.info("编号id:" + id + "退出,当前在线数为:" + getOnlineCount());
  73. }
  74. /**
  75. * 收到客户端消息后调用的方法
  76. *
  77. * @param message 客户端发送过来的消息
  78. */
  79. @OnMessage
  80. public void onMessage(String message, Session session) {
  81. log.info("编号id消息:" + id + ",报文:" + message);
  82. }
  83. /**
  84. * 发生错误时调用
  85. *
  86. * @param session
  87. * @param error
  88. */
  89. @OnError
  90. public void onError(Session session, Throwable error) {
  91. log.error("编号id错误:" + this.id + ",原因:" + error.getMessage());
  92. error.printStackTrace();
  93. }
  94. /**
  95. * @description: 分布式 使用redis 去发布消息
  96. * @dateTime: 2021/6/17 10:31
  97. */
  98. public void sendMessage(@NotNull String key,String message) {
  99. String newMessge= null;
  100. try {
  101. newMessge = new String(message.getBytes(Constants.UTF8), Constants.UTF8);
  102. } catch (UnsupportedEncodingException e) {
  103. e.printStackTrace();
  104. }
  105. Map<String,String> map = new HashMap<String, String>();
  106. map.put(Constants.REDIS_MESSAGE_KEY, key);
  107. map.put(Constants.REDIS_MESSAGE_VALUE, newMessge);
  108. template.convertAndSend(Constants.REDIS_CHANNEL, JSON.toJSONString(map));
  109. }
  110. /**
  111. * @description: 单机使用 外部接口通过指定的客户id向该客户推送消息。
  112. * @dateTime: 2021/6/16 17:49
  113. */
  114. public void sendMessageByWayBillId(@NotNull Long key, String message) {
  115. WebSocketServer webSocketServer = webSocketMap.get(key);
  116. if (!StringUtils.isEmpty(webSocketServer)) {
  117. try {
  118. webSocketServer.sendMessage(message);
  119. log.info("编号id为:"+key+"发送消息:"+message);
  120. } catch (IOException e) {
  121. e.printStackTrace();
  122. log.error("编号id为:"+key+"发送消息失败");
  123. }
  124. }
  125. log.error("编号id号为:"+key+"未连接");
  126. }
  127. /**
  128. * 实现服务器主动推送
  129. */
  130. public void sendMessage(String message) throws IOException {
  131. this.session.getBasicRemote().sendText(message);
  132. }
  133. public static synchronized AtomicInteger getOnlineCount() {
  134. return onlineCount;
  135. }
  136. public static synchronized void addOnlineCount() {
  137. WebSocketServer.onlineCount.getAndIncrement();
  138. }
  139. public static synchronized void subOnlineCount() {
  140. WebSocketServer.onlineCount.getAndDecrement();
  141. }
  142. }
  • 项目结构

image.png

将该项目使用三个端口号启动三个服务

image.png

使用下面的这个网站进行演示。

http://www.easyswoole.com/wstool.html

image.png
启动两个页面网址分别是:

  • ws://127.0.0.1:8081/websocket/456
  • ws://127.0.0.1:8082/websocket/456

使用postman给http://localhost:8080/socket/456 发送请求

image.png

可以看到,我们给8080服务发送的消息,我们订阅的8081和8082 服务可以也可以使用该编号进行消息的推送。

使用8082服务发送这个消息格式{“KEY”:456,”VALUE”:”aaaa”} 的消息。其他的服务也会收到这个信息。

image.png

以上就是使用redis的发布订阅解决websocket 的分布式session 问题。