socket.io文档链接: https://socket.io/docs/v4/

netty-socketio maven依赖



com.corundumstudio.socketio
netty-socketio
1.7.7

application.properties相关配置
netty socket io setting

  1. socketio:
  2. # 内网ip
  3. # host: localhost
  4. port: 8084
  5. # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
  6. maxFramePayloadLength: 1048576
  7. # 设置http交互最大内容长度
  8. maxHttpContentLength: 1048576
  9. # socket连接数大小(如只监听一个端口boss线程组为1即可)
  10. bossCount: 1
  11. workCount: 100
  12. allowCustomRequests: true
  13. # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
  14. upgradeTimeout: 1000000
  15. # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  16. pingTimeout: 6000000
  17. # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
  18. pingInterval: 25000

SocketIOConfig.java配置文件相关配置

  1. package com.jiuli.api.config;
  2. import com.corundumstudio.socketio.SocketConfig;
  3. import com.corundumstudio.socketio.SocketIOServer;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class SocketIOConfig {
  9. @Value("${socketio.host}")
  10. private String host;
  11. @Value("${socketio.port}")
  12. private Integer port;
  13. @Value("${socketio.bossCount}")
  14. private int bossCount;
  15. @Value("${socketio.workCount}")
  16. private int workCount;
  17. @Value("${socketio.allowCustomRequests}")
  18. private boolean allowCustomRequests;
  19. @Value("${socketio.upgradeTimeout}")
  20. private int upgradeTimeout;
  21. @Value("${socketio.pingTimeout}")
  22. private int pingTimeout;
  23. @Value("${socketio.pingInterval}")
  24. private int pingInterval;
  25. /**
  26. * 以下配置在上面的application.properties中已经注明
  27. * @return
  28. */
  29. @Bean
  30. public SocketIOServer socketIOServer() {
  31. SocketConfig socketConfig = new SocketConfig();
  32. socketConfig.setTcpNoDelay(true);
  33. socketConfig.setSoLinger(0);
  34. com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
  35. config.setSocketConfig(socketConfig);
  36. config.setHostname(host);
  37. config.setPort(port);
  38. config.setBossThreads(bossCount);
  39. config.setWorkerThreads(workCount);
  40. config.setAllowCustomRequests(allowCustomRequests);
  41. config.setUpgradeTimeout(upgradeTimeout);
  42. config.setPingTimeout(pingTimeout);
  43. config.setPingInterval(pingInterval);
  44. return new SocketIOServer(config);
  45. }
  46. }

以下就是提供一个SocketIOService接口,供其他地方需要使用时调用。

  1. package com.jiuli.websocket.service;
  2. import com.jiuli.websocket.domian.PushMessage;
  3. /**
  4. * 功能描述
  5. *
  6. * @author: zyu
  7. * @description:
  8. * @date: 2019/4/23 10:41
  9. */
  10. public interface SocketIOService {
  11. //推送的事件
  12. public static final String PUSH_EVENT = "push_event";
  13. // 启动服务
  14. void start() throws Exception;
  15. // 停止服务
  16. void stop();
  17. // 推送信息
  18. void pushMessageToUser(PushMessage pushMessage);
  19. }

SocketIOServiceImpl.java接口实现类

  1. import com.corundumstudio.socketio.SocketIOClient;
  2. import com.corundumstudio.socketio.SocketIOServer;
  3. import com.jiuli.api.util.JwtUtil;
  4. import com.jiuli.websocket.domian.PushMessage;
  5. import com.jiuli.websocket.service.SocketIOService;
  6. import org.apache.commons.lang3.StringUtils;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Service;
  9. import javax.annotation.PostConstruct;
  10. import javax.annotation.PreDestroy;
  11. import java.util.List;
  12. import java.util.Map;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. @Service
  15. public class SocketIOServiceImpl implements SocketIOService {
  16. // 用来存已连接的客户端
  17. private static Map<String, SocketIOClient> clientMap = new ConcurrentHashMap<>();
  18. @Autowired
  19. private SocketIOServer socketIOServer;
  20. /**
  21. * Spring IoC容器创建之后,在加载SocketIOServiceImpl Bean之后启动
  22. *
  23. * @throws Exception
  24. */
  25. @PostConstruct
  26. private void autoStartup() throws Exception {
  27. start();
  28. }
  29. /**
  30. * Spring IoC容器在销毁SocketIOServiceImpl Bean之前关闭,避免重启项目服务端口占用问题
  31. *
  32. * @throws Exception
  33. */
  34. @PreDestroy
  35. private void autoStop() throws Exception {
  36. stop();
  37. }
  38. @Override
  39. public void start() throws Exception {
  40. // 监听客户端连接
  41. socketIOServer.addConnectListener(client -> {
  42. String auth = getParamsByClient(client);
  43. String token = auth.replace("Bearer ", "");
  44. String username = JwtUtil.getUsername(token);
  45. if (StringUtils.isNotEmpty(username)) {
  46. clientMap.put(username, client);
  47. }
  48. });
  49. // 监听客户端断开连接
  50. socketIOServer.addDisconnectListener(client -> {
  51. String loginUserNum = getParamsByClient(client);
  52. if (loginUserNum != null) {
  53. clientMap.remove(loginUserNum);
  54. System.out.println("断开连接: " + loginUserNum);
  55. System.out.println("断开连接: " + client.getSessionId());
  56. client.disconnect();
  57. }
  58. });
  59. // 处理自定义的事件,与连接监听类似
  60. socketIOServer.addEventListener("text", Object.class, (client, data, ackSender) -> {
  61. // TODO do something
  62. client.getHandshakeData();
  63. System.out.println( " 客户端:************ " + data);
  64. });
  65. socketIOServer.start();
  66. }
  67. @Override
  68. public void stop() {
  69. if (socketIOServer != null) {
  70. socketIOServer.stop();
  71. socketIOServer = null;
  72. }
  73. }
  74. @Override
  75. public void pushMessageToUser(PushMessage pushMessage) {
  76. String loginUserNum = pushMessage.getLoginUserNum();
  77. if (StringUtils.isNotBlank(loginUserNum)) {
  78. SocketIOClient client = clientMap.get(loginUserNum);
  79. if (client != null) {
  80. client.sendEvent(PUSH_EVENT, pushMessage);
  81. }
  82. }
  83. }
  84. /**
  85. * 此方法为获取client连接中的参数,可根据需求更改
  86. * @param client
  87. * @return
  88. */
  89. private String getParamsByClient(SocketIOClient client) {
  90. // 从请求的连接中拿出参数(这里的loginUserNum必须是唯一标识)
  91. Map<String, List<String>> params = client.getHandshakeData().getUrlParams();
  92. List<String> list = params.get("auth");
  93. if (list != null && list.size() > 0) {
  94. return list.get(0);
  95. }
  96. return null;
  97. }
  98. public static Map<String, SocketIOClient> getClientMap() {
  99. return clientMap;
  100. }
  101. public static void setClientMap(Map<String, SocketIOClient> clientMap) {
  102. SocketIOServiceImpl.clientMap = clientMap;
  103. }
  104. }