分布式WebSocket实现(通过RabbitMQ)
1、实现思路
1、WebSocket接收用户或者接口传过来的数据时,统一发送到RabbitMQ
2、每个服务器监听RabbitMQ数据并获取数据,通过判断数据中persons是否为空来判断是单发还是群发,若persons不为空有用户id,每个服务器对比自己session中是否有这个用户id,若没有则不操作,若有则推送给该用户消息
3、使用websocket在线测试 (websocket-test.com)测试时,可以发送JSON数据来指定发给那个用户,例子如下:
若persons为空则为群发
{
"persons":["123","121"],
"msg":""
}
{
"persons":["123","121"],
"msg":""
}
2、效果截图
1、服务器单发消息(使用PostMan模拟)
2、服务器群发
3、服务器多发(用户121,123,124) 发送给121,123消息
4、用户群发消息
5、用户单发
6、用户多发同理,在persons中添加用户即可
7、定时推送,上面截图中已经有体现
3、代码实现
1、导入依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2、编写配置文件
spring:
rabbitmq:
port: 5672
username: test
password: test
virtual-host: /test
3、编写配置类
WebSocketConfig
RabbitConfig
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
public TaskScheduler taskScheduler(){
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
return taskScheduler;
}
}
@Configuration
public class RabbitConfig {
public static final String FANOUT_EXCHANGE ="FanoutExchange";
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
}
}
4、枚举类
@AllArgsConstructor
public enum ResultEnum {
/**
* 发送成功
*/
SEND_SUCCESS(1,"发送成功"),
/**
*发送用户未指定,进行群发
*/
PERSON_NULL_GROUP_SEND(3,"发送用户未指定,进行群发"),
/**
* "消息为空,不进行发送"
*/
MSG_NULL_NOT_SEND(4,"消息为空,不进行发送"),
/**
* 发送的消息为空,不进行发送
*/
SEND_NULL(5,"发送的消息为空,不进行发送");
/**
* 编码
*/
private int code;
/**
* 内容
*/
private String title;
/**
* 获取编码
* @return int
*/
public int getCode() {
return code;
}
/**
* 获取标题
* @return String
*/
public String getTitle() {
return title;
}
}
5、创建传输对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgDTO implements Serializable {
/**
* 用户组
*/
private List<String> persons;
/**
* 消息
*/
private String msg;
}
6、WebSocket服务端
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketEndPoint {
/**
* 用于存放所有在线客户端
*/
private static final Map<String, Session> SESSION_MAP = new ConcurrentHashMap<>();
private static AmqpTemplate amqpTemplate;
@Autowired
public synchronized void setAmqpTemplate(AmqpTemplate amqpTemplate) {
WebSocketEndPoint.amqpTemplate = amqpTemplate;
}
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
public static Map<String, Session> getSessionMap(){
return SESSION_MAP;
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
log.info("有新的客户端上线: {}", session.getId());
SESSION_MAP.put(userId, session);
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
String sessionId = session.getId();
log.info("有客户端离线: {}", sessionId);
SESSION_MAP.remove(sessionId);
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message) {
JSONObject object = JSON.parseObject(message);
MsgDTO msgDTO = JSON.toJavaObject(object, MsgDTO.class);
if (ObjectUtils.isEmpty(msgDTO) || !StringUtils.hasText(msgDTO.getMsg())){
return;
}
String msg = JSON.toJSONString(msgDTO);
log.info("消息:{}",msg);
msg = msg.replace("\\t","").replace("\\n","");
amqpTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"",msg);
log.info("{}","消息为空,不进行发送");
}
/**
* @param session 消息
* @param error 异常
*/
@OnError
public void onError(Session session, Throwable error) {
String sessionId = session.getId();
if (SESSION_MAP.get(sessionId) != null) {
log.info("发生了错误,移除客户端: {}", sessionId);
SESSION_MAP.remove(sessionId);
}
log.error("发生异常:",error);
}
/**
* 指定对象发送消息
*
* @param message 消息对象
*/
public static void sendToUser(List<String> persons, String message) {
persons.forEach(userId -> send(userId,message));
}
/**
* 单发 1对1
* @param userId 用户id
* @param message 消息
*/
public static void send(String userId,String message) {
log.info("{}",userId);
if (SESSION_MAP.containsKey(userId)) {
SESSION_MAP.get(userId).getAsyncRemote().sendText(message);
log.info("消息发送成功");
}
log.info("本服务器中无此用户session");
}
/**
* 群发
* @param message 消息
*/
public static void batchSend(String message) {
for(String sessionId : SESSION_MAP.keySet())
{
SESSION_MAP.get(sessionId).getAsyncRemote().sendText(message);
}
}
}
7、定时推送
@Component
@EnableScheduling
@Slf4j
public class ScheduleTask {
@Scheduled(cron = "0/10 * * * * ?")
public void sendMessage(){
String message = "定时报时间,现在:"+new Date();
log.info("{}","**********定时任务执行***********");
Map<String, Session> map = WebSocketEndPoint.getSessionMap();
for (String userId : map.keySet()) {
map.get(userId).getAsyncRemote().sendText(message);
}
}
}
8、发送消息服
SendMessageService
SendMessageServiceImpl
public interface SendMessageService {
/**
* 发送消息到RabbitMQ
* @param msgDTO 对象
* @return String
*/
String send(MsgDTO msgDTO);
/**
* 发送消息给用户
* @param msgDTO 对象
*/
void sendToUser(MsgDTO msgDTO);
}
@Service
@Slf4j
public class SendMessageServiceImpl implements SendMessageService {
final
AmqpTemplate amqpTemplate;
public SendMessageServiceImpl(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}
@Override
public String send(MsgDTO msgDTO) {
String message = msgDTO.getMsg();
if (StringUtils.hasText(message)) {
String msg = JSON.toJSONString(msgDTO);
msg = msg.replace("\\t","").replace("\\n","");
amqpTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"",msg);
return ResultEnum.SEND_SUCCESS.getTitle();
}
return ResultEnum.SEND_NULL.getTitle();
}
/**
*
* @param msgDTO 实体类
*/
@Override
public void sendToUser(MsgDTO msgDTO) {
List<String> persons = msgDTO.getPersons();
String message = msgDTO.getMsg();
if (!StringUtils.hasText(message)) {
return;
}
if (ObjectUtils.isEmpty(persons)) {
WebSocketEndPoint.batchSend(message);
return;
}
WebSocketEndPoint.sendToUser(persons,message);
}
}
9、RabbitMQ监听
@Slf4j
@Service
public class RabbitMsgListener {
final SendMessageServiceImpl sendMessageService;
public RabbitMsgListener(SendMessageServiceImpl sendMessageService) {
this.sendMessageService = sendMessageService;
}
@RabbitListener(bindings={
//@QueueBinding注解要完成队列和交换机的
@QueueBinding(
//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
value = @Queue(),
//创建一个交换机
exchange=@Exchange(name= RabbitConfig.FANOUT_EXCHANGE,type="fanout")
)
})
public void fanoutReceive(String msg) {
JSONObject object = JSON.parseObject(msg);
MsgDTO msgDTO = JSON.toJavaObject(object, MsgDTO.class);
sendMessageService.sendToUser(msgDTO);
log.info("{}","监听并推送消息");
}
}
10、控制层
@RestController
@RequestMapping("/send")
public class PushMessageController {
private final SendMessageService sendService;
public PushMessageController(SendMessageService sendService) {
this.sendService = sendService;
}
/**
*
* @param map 用户+消息 用户为空则为群发
* @return String
*/
@GetMapping("/sendToUser")
public String sendToUser(@RequestBody Map<String, Object> map) {
JSONObject param =new JSONObject(map);
MsgDTO msgDTO = param.toJavaObject(MsgDTO.class);
return sendService.send(msgDTO);
}
}