分布式WebSocket实现(通过RabbitMQ)

1、实现思路

1、WebSocket接收用户或者接口传过来的数据时,统一发送到RabbitMQ

2、每个服务器监听RabbitMQ数据并获取数据,通过判断数据中persons是否为空来判断是单发还是群发,若persons不为空有用户id,每个服务器对比自己session中是否有这个用户id,若没有则不操作,若有则推送给该用户消息

3、使用websocket在线测试 (websocket-test.com)测试时,可以发送JSON数据来指定发给那个用户,例子如下:

若persons为空则为群发

  1. {
  2. "persons":["123","121"],
  3. "msg":""
  4. }
  1. {
  2. "persons":["123","121"],
  3. "msg":""
  4. }

2、效果截图

1、服务器单发消息(使用PostMan模拟)

image.png

image.png

2、服务器群发

image.png

image.png

3、服务器多发(用户121,123,124) 发送给121,123消息

image.png
image.png

image.png

4、用户群发消息

image.png

5、用户单发

image.png

6、用户多发同理,在persons中添加用户即可

7、定时推送,上面截图中已经有体现

3、代码实现

1、导入依赖

  1. <dependency>
  2. <groupId>com.alibaba</groupId>
  3. <artifactId>fastjson</artifactId>
  4. <version>1.2.75</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.projectlombok</groupId>
  8. <artifactId>lombok</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-amqp</artifactId>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-starter-web</artifactId>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.springframework.boot</groupId>
  20. <artifactId>spring-boot-starter-websocket</artifactId>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-test</artifactId>
  25. <scope>test</scope>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.amqp</groupId>
  29. <artifactId>spring-rabbit-test</artifactId>
  30. <scope>test</scope>
  31. </dependency>

2、编写配置文件

spring:
  rabbitmq:
    port: 5672
    username: test
    password: test
    virtual-host: /test

3、编写配置类

WebSocketConfig RabbitConfig

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Bean
    public TaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        return taskScheduler;
    }
}
@Configuration
public class RabbitConfig {
    public static final String FANOUT_EXCHANGE ="FanoutExchange";

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
    }

}

4、枚举类

@AllArgsConstructor
public enum ResultEnum {
    /**
     * 发送成功
     */
    SEND_SUCCESS(1,"发送成功"),
    /**
     *发送用户未指定,进行群发
     */
    PERSON_NULL_GROUP_SEND(3,"发送用户未指定,进行群发"),
    /**
     * "消息为空,不进行发送"
     */
    MSG_NULL_NOT_SEND(4,"消息为空,不进行发送"),
    /**
     * 发送的消息为空,不进行发送
     */

    SEND_NULL(5,"发送的消息为空,不进行发送");

    /**
     * 编码
     */
    private  int code;
    /**
     * 内容
     */
    private  String title;

    /**
     * 获取编码
     * @return int
     */
    public int getCode() {
        return code;
    }

    /**
     * 获取标题
     * @return String
     */
    public String getTitle() {
        return title;
    }
}

5、创建传输对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgDTO implements Serializable {
    /**
     * 用户组
     */
    private List<String> persons;
    /**
     * 消息
     */
    private String msg;
}

6、WebSocket服务端

@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketEndPoint {

    /**
     * 用于存放所有在线客户端
     */
    private static final Map<String, Session> SESSION_MAP = new ConcurrentHashMap<>();

    private static AmqpTemplate amqpTemplate;

    @Autowired
    public synchronized void setAmqpTemplate(AmqpTemplate amqpTemplate) {
        WebSocketEndPoint.amqpTemplate = amqpTemplate;
    }

    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    public static Map<String, Session> getSessionMap(){
        return SESSION_MAP;
    }
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        log.info("有新的客户端上线: {}", session.getId());
        SESSION_MAP.put(userId, session);
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        String sessionId = session.getId();
        log.info("有客户端离线: {}", sessionId);
        SESSION_MAP.remove(sessionId);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message)  {
        JSONObject object  = JSON.parseObject(message);
        MsgDTO msgDTO = JSON.toJavaObject(object, MsgDTO.class);
        if (ObjectUtils.isEmpty(msgDTO) || !StringUtils.hasText(msgDTO.getMsg())){
            return;
        }
        String msg = JSON.toJSONString(msgDTO);
        log.info("消息:{}",msg);
        msg = msg.replace("\\t","").replace("\\n","");
        amqpTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"",msg);
        log.info("{}","消息为空,不进行发送");
    }

    /**
     * @param session 消息
     * @param error 异常
     */
    @OnError
    public void onError(Session session, Throwable error) {
        String sessionId = session.getId();
        if (SESSION_MAP.get(sessionId) != null) {
            log.info("发生了错误,移除客户端: {}", sessionId);
            SESSION_MAP.remove(sessionId);
        }
        log.error("发生异常:",error);
    }

    /**
     * 指定对象发送消息
     *
     * @param message 消息对象
     */
     public static void sendToUser(List<String> persons, String message) {
         persons.forEach(userId -> send(userId,message));
     }

    /**
     * 单发 1对1
     * @param userId  用户id
     * @param message 消息
     */
     public  static void send(String userId,String message) {
         log.info("{}",userId);
         if (SESSION_MAP.containsKey(userId)) {
             SESSION_MAP.get(userId).getAsyncRemote().sendText(message);
             log.info("消息发送成功");
         }
         log.info("本服务器中无此用户session");
     }
    /**
     * 群发
     * @param message 消息
     */
    public static void batchSend(String message) {
        for(String sessionId : SESSION_MAP.keySet())
        {
            SESSION_MAP.get(sessionId).getAsyncRemote().sendText(message);
        }
    }
}

7、定时推送

@Component
@EnableScheduling
@Slf4j
public class ScheduleTask {
    @Scheduled(cron = "0/10 * *  * * ?")
    public void sendMessage(){
        String message = "定时报时间,现在:"+new Date();
        log.info("{}","**********定时任务执行***********");
        Map<String, Session> map = WebSocketEndPoint.getSessionMap();
        for (String userId : map.keySet()) {
            map.get(userId).getAsyncRemote().sendText(message);
        }
    }
}

8、发送消息服

SendMessageService SendMessageServiceImpl

public interface SendMessageService {

    /**
     *  发送消息到RabbitMQ
     * @param msgDTO 对象
     * @return String
     */
    String send(MsgDTO msgDTO);

    /**
     * 发送消息给用户
     * @param msgDTO 对象
     */
    void sendToUser(MsgDTO msgDTO);
}
@Service
@Slf4j
public class SendMessageServiceImpl implements SendMessageService {


    final
    AmqpTemplate amqpTemplate;

    public SendMessageServiceImpl(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }


    @Override
    public String send(MsgDTO msgDTO) {
        String message = msgDTO.getMsg();
        if (StringUtils.hasText(message)) {
            String msg = JSON.toJSONString(msgDTO);
            msg = msg.replace("\\t","").replace("\\n","");
            amqpTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"",msg);
            return ResultEnum.SEND_SUCCESS.getTitle();
        }
        return ResultEnum.SEND_NULL.getTitle();
    }
    /**
     *
     * @param msgDTO  实体类
     */
    @Override
    public void sendToUser(MsgDTO msgDTO) {
        List<String> persons = msgDTO.getPersons();
        String message = msgDTO.getMsg();
        if (!StringUtils.hasText(message)) {
            return;
        }
        if (ObjectUtils.isEmpty(persons)) {
            WebSocketEndPoint.batchSend(message);
            return;
        }
        WebSocketEndPoint.sendToUser(persons,message);
    }
}

9、RabbitMQ监听

@Slf4j
@Service
public class RabbitMsgListener {

    final SendMessageServiceImpl sendMessageService;

    public RabbitMsgListener(SendMessageServiceImpl sendMessageService) {
        this.sendMessageService = sendMessageService;
    }

    @RabbitListener(bindings={
            //@QueueBinding注解要完成队列和交换机的
            @QueueBinding(
                    //@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
                    value = @Queue(),
                    //创建一个交换机
                    exchange=@Exchange(name= RabbitConfig.FANOUT_EXCHANGE,type="fanout")
            )
    })
    public void fanoutReceive(String msg) {
        JSONObject object  = JSON.parseObject(msg);
        MsgDTO msgDTO = JSON.toJavaObject(object, MsgDTO.class);
        sendMessageService.sendToUser(msgDTO);
        log.info("{}","监听并推送消息");
    }
}

10、控制层

@RestController
@RequestMapping("/send")
public class PushMessageController {

    private final SendMessageService sendService;

    public PushMessageController(SendMessageService sendService) {
        this.sendService = sendService;
    }
    /**
     *
     * @param map 用户+消息  用户为空则为群发
     * @return String
     */
    @GetMapping("/sendToUser")
    public String sendToUser(@RequestBody Map<String, Object> map) {
        JSONObject param =new JSONObject(map);
        MsgDTO msgDTO = param.toJavaObject(MsgDTO.class);
        return sendService.send(msgDTO);
    }
}