WebSocket技术

1、简介

HTML5开始提供的一种浏览器与服务器进行全双工通讯的网络技术,属于应用层协议。它基于TCP传输协议,并复用HTTP的握手通道。

对大部分web开发者来说,上面这段描述有点枯燥,其实只要记住几点:

  1. WebSocket可以在浏览器里使用
  2. 支持双向通信
  3. 使用很简单

1、优点

说到优点,这里的对比参照物是HTTP协议,概括地说就是:支持双向通信,更灵活,更高效,可扩展性更好。

  1. 支持双向通信,实时性更强。
  2. 更好的二进制支持。
  3. 较少的控制开销。连接创建后,ws客户端、服务端进行数据交换时,协议控制的数据包头部较小。在不包含头部的情况下,服务端到客户端的包头只有2~10字节(取决于数据包长度),客户端到服务端的的话,需要加上额外的4字节的掩码。而HTTP协议每次通信都需要携带完整的头部。
  4. 支持扩展。ws协议定义了扩展,用户可以扩展协议,或者实现自定义的子协议。(比如支持自定义压缩算法等)

16c2321937b73a6a.png
在客户端,new WebSocket实例化一个新的WebSocket客户端对象,请求类似 ws://yourdomain:port/path 的服务端WebSocket URL,客户端WebSocket对象会自动解析并识别为WebSocket请求,并连接服务端端口,执行双方握手过程,客户端发送数据格式类似:

  1. GET /webfin/websocket/ HTTP/1.1
  2. Host: localhost
  3. Upgrade: websocket
  4. Connection: Upgrade
  5. Sec-WebSocket-Key: xqBt3ImNzJbYqRINxEFlkg==
  6. Origin: http://localhost:8080
  7. Sec-WebSocket-Version: 13

可以看到,客户端发起的WebSocket连接报文类似传统HTTP报文,Upgrade:websocket参数值表明这是WebSocket类型请求,Sec-WebSocket-Key是WebSocket客户端发送的一个 base64编码的密文,要求服务端必须返回一个对应加密的Sec-WebSocket-Accept应答,否则客户端会抛出Error during WebSocket handshake错误,并关闭连接。

服务端收到报文后返回的数据格式类似:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: K7DJLdLooIwIG/MOpvWFB3y3FE8=

Sec-WebSocket-Accept的值是服务端采用与客户端一致的密钥计算出来后返回客户端的,HTTP/1.1 101 Switching Protocols表示服务端接受WebSocket协议的客户端连接,经过这样的请求-响应处理后,两端的WebSocket连接握手成功, 后续就可以进行TCP通讯了。

2、导入依赖

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>

3、代码实现

首先要注入ServerEndpointExporter,这个bean会自动注册使用@ServerEndpoint注解声明的Websocket endpoint。要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将有容器自己提供和管理

3.1 配置类

@Configuration
public class WebSocketConfig  {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Bean
    public TaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        return taskScheduler;
    }
}

3.2 WebSocket服务端

@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketEndPoint {

    /**
     * 用于存放所有在线客户端
     */
    private static final Map<String, Session> SESSION_MAP = new ConcurrentHashMap<>();

    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    public static Map<String, Session> getSessionMap(){
        return SESSION_MAP;
    }

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        log.info("有新的客户端上线: {}", session.getId());
        SESSION_MAP.put(userId, session);
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        String sessionId = session.getId();
        log.info("有客户端离线: {}", sessionId);
        SESSION_MAP.remove(sessionId);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message)  {
        for(String userId : SESSION_MAP.keySet()) {
            SESSION_MAP.get(userId).getAsyncRemote().sendText(message);
        }
    }

    /**
     * @param session 消息
     * @param error 异常
     */
    @OnError
    public void onError(Session session, Throwable error) {
        String sessionId = session.getId();
        if (SESSION_MAP.get(sessionId) != null) {
            log.info("发生了错误,移除客户端: {}", sessionId);
            SESSION_MAP.remove(sessionId);
        }
        log.error("发生异常:",error);
    }

    /**
     * 指定对象发送消息
     *
     * @param message 消息对象
     */
     public static void sendToUser(List<String> persons, String message) {
         persons.forEach(userId -> send(userId,message));
     }

    /**
     * 单发 1对1
     * @param userId  用户id
     * @param message 消息
     */
     public  static void send(String userId,String message) {
         SESSION_MAP.get(userId).getAsyncRemote().sendText(message);
     }

    /**
     * 群发
     * @param message 消息
     */
    public static void batchSend(String message) {
        for(String sessionId : SESSION_MAP.keySet())
        {
            SESSION_MAP.get(sessionId).getAsyncRemote().sendText(message);
        }
    }
}

3.3 定时任务

定时任务每10s一次 @Scheduled(cron = "0/10 * * * * ?")

@Component
@EnableScheduling
@Slf4j
public class ScheduleTask {
    @Scheduled(cron = "0/10 * *  * * ?")
    public void sendMessage(){
        String message = "定时报时间,现在:"+new Date();
        log.info("{}","**********定时任务执行***********");
        Map<String, Session> map = WebSocketEndPoint.getSessionMap();
        for (String userId : map.keySet()) {
            map.get(userId).getAsyncRemote().sendText(message);
        }
    }
}

3.4 服务层

public interface SendMessageService {
    /**
     * 一对一 一对多
     * @param map 消息和用户
     * @return 返回消息
     */
    String sendToUser(Map<String, Object> map);

    /**
     * 群发消息
     * @param message 消息
     * @return 返回消息
     */
    String batchSend(String message);
}
@Service
@Slf4j
public class SendMessageServiceImpl implements SendMessageService {

    /**
     *
     * @param map  用户和消息信息
     */
    @Override
    public String sendToUser(Map<String, Object> map) {
        List<String> persons = new ArrayList<>();
        Object obj = map.get("persons");
        if (ObjectUtils.isEmpty(obj)) {
            log.info("{}","用户为空,不进行发送");
            return "发送用户未指定,不进行发送";
        }
        if (obj instanceof ArrayList<?>) {
            for (Object o : (List<?>) obj) {
                persons.add(o.toString());
            }
        }
        String message = map.get("msg").toString();
        if (StringUtils.hasText(message)) {
            WebSocketEndPoint.sendToUser(persons,message);
            return "消息发送成功";
        }
        log.info("{}","消息为空,不进行发送");
        return "发送的消息为空,不进行发送";
    }


    /**
     * 群发消息
     * @param message 消息
     * @return 返回信息
     */
    @Override
    public String batchSend(String message) {
        if (StringUtils.hasText(message)) {
            WebSocketEndPoint.batchSend(message);
            return "消息发送成功";
        }
        log.info("{}","发送的消息为空");
        return "消息为空,不进行发送";
    }
}

3.5 编写Controller

@RestController
@RequestMapping("/send")
public class PushMessageController {

    private final SendMessageService sendService;

    public PushMessageController(SendMessageService sendService) {
        this.sendService = sendService;
    }

    /**
     *
     * @param message 消息
     * @return 群发送完成
     */
    @GetMapping("/batchSend")
    public String batchSend(@PathParam("message") String message) {
        return this.sendService.batchSend(message);
    }

    /**
     *
     * @param map 用户+消息
     * @return String
     */
    @GetMapping("/sendToUser")
    public String sendToUser(@RequestBody Map<String, Object> map) {
        return sendService.sendToUser(map);
    }
}