1.介绍

WebSocket是一种H5的规范,一个持久化的协议,从2011年成为国际标准(RFC6455)。其最大特点:通过握手机制在客户端和服务器之间能够建立一个类似TCP的连接,即服务器可以主动向客户端推送消息,客户端同样也可以主动向服务器推送消息,是真正双向平等。
image.png

WebSocket的主要特点:

  • 建立在 TCP 协议之上,服务器端的实现比较容易。
  • 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
  • 数据格式比较轻量,性能开销小,通信高效。
  • 可以发送文本,也可以发送二进制数据。
  • 没有同源限制,客户端可以与任意服务器通信。
  • 协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。

2.基本属性

属性 描述
Socket.readyState 只读属性 readyState 表示连接状态,可以是以下值:
0 - 表示连接尚未建立。
1 - 表示连接已建立,可以进行通信。
2 - 表示连接正在进行关闭。
3 - 表示连接已经关闭或者连接不能打开
Socket.bufferedAmount 只读属性 bufferedAmount 已被 send() 放入正在队列中等待传输,但还没有发出的 UTF-8文本字节数
事件 事件处理程序 描述
open Socket.onpen 连接建立时触发
message Socket.onmessage 客户端接收服务端数据时触发
error Socket.onerror 通信发生错误时触发
close Socket.onclose 连接关闭时触发
方法 描述
Socket.send() 使用连接,发送数据
Socket.close() 关闭连接

3. 使用

  1. <template>
  2. <div>
  3. <el-input v-model="params" clearable/>
  4. <el-button type="primary" @click="send">发消息</el-button>
  5. </div>
  6. </template>
  7. <script>
  8. export default {
  9. data() {
  10. return {
  11. params: '',
  12. path: "ws://localhost:8080/websocket",
  13. socket: ""
  14. }
  15. },
  16. mounted() {
  17. // 初始化
  18. this.init()
  19. },
  20. destroyed() {
  21. // 销毁监听
  22. this.socket.onclose = this.close
  23. },
  24. methods: {
  25. init: function () {
  26. if (typeof (WebSocket) === "undefined") {
  27. console.log("您的浏览器不支持socket")
  28. } else {
  29. // 实例化socket
  30. this.socket = new WebSocket(this.path)
  31. // 监听socket连接成功回调
  32. this.socket.onopen = this.open
  33. // 监听socket连接失败回调
  34. this.socket.onerror = this.error
  35. // 监听后台返回的socket消息
  36. this.socket.onmessage = this.getMessage
  37. }
  38. },
  39. open: function () {
  40. console.log("socket连接成功")
  41. },
  42. error: function () {
  43. console.log("连接错误")
  44. },
  45. getMessage: function (msg) {
  46. console.log(msg.data)
  47. },
  48. send: function () {
  49. this.socket.send(params)
  50. },
  51. close: function () {
  52. console.log("socket已经关闭")
  53. }
  54. }
  55. }
  56. </script>
# nginx的配置,有需要注意的
## 1. 需要添加一个map,用来区分协议是 http还是websocket

map $http_upgrade $connection_upgrade {
    default        keep-alive;  #默认为keep-alive 可以支持 一般http请求
    'websocket'    upgrade;     #如果为websocket 则为 upgrade 
}


## 2. 请求上需要添加的配置
# 反向代理保留客户端地址
proxy_set_header Host $host;
proxy_set_header X-Real_IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

# WebSocket 额外请求头
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;

# WebSocket 设置断开连接的时间
# 该时间默认是60s,一定要大于心跳时间
proxy_read_timeout 2400s;
//  Java的配置
import com.websocket.config.NettyConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

import java.util.Date;

/**
 * @ClassName: WebSocketHandler
 * @Description: 接收/处理/响应客户端websocket请求的核心业务处理类
 * @Author: liuhefei
 * @Date: 2019/7/21
 * @blog: https://www.imooc.com/u/1323320/articles
 **/
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker handshaker;
    private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";

    //客户端与服务端创建连接的时候调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception{
        //super.channelActive(ctx);
        NettyConfig.group.add(ctx.channel());  //保存channel
        System.out.println("客户端与服务端连接开启......");
    }

    //客户端与服务器断开连接的时候调用
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception{
        //super.channelInactive(ctx);
        NettyConfig.group.remove(ctx.channel());  //移除channel
        System.out.println("客户端与服务端连接关闭......");
    }

    //服务器接收客户端发送过来的数据结束之后调用
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
        //super.channelReadComplete(ctx);
        ctx.flush();
    }

    //在工程出现异常的时候调用
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception{
        //super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
        ctx.close();  //关闭
    }

    //服务端处理客户端websocket请求的核心方法
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        //1. 客户端向服务器发送握手请求
        //2. 建立websocket连接

        //处理客户端向服务器发起http握手请求的业务
        if(msg instanceof FullHttpRequest){
             handHttpRequest(ctx, (FullHttpRequest) msg);
        }else if(msg instanceof WebSocketFrame){   //处理websocket连接业务
            handWebsocketFrame(ctx, (WebSocketFrame) msg);
        }

    }

    /**
     * 处理客户端与服务器之间的websocket业务
     * @param ctx
     * @param frame
     */
    private void handWebsocketFrame(ChannelHandlerContext ctx,  WebSocketFrame frame){
        //判断是否是关闭websocket的指令
        if(frame instanceof CloseWebSocketFrame){
            handshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
        }

        //判断是否是ping消息
        if(frame instanceof PingWebSocketFrame){
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }

        //判断是否是二进制消息,如果是二进制消息,抛出异常,  不支持二进制消息
        if(!(frame instanceof TextWebSocketFrame)){
            System.out.println("目前我们不支持二进制消息");
            throw new RuntimeException("【" + this.getClass().getName() + "】不支持消息");
        }

        //返回应答消息
        //获取客户端向服务器端发送的消息
        String request = ((TextWebSocketFrame) frame).text();
        System.out.println("服务端收到客户端的消息====>>>" + request);
        //创建TextWebSocketFrame对象,接收客户端发送过来的消息
        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + "=====>>>>" + request);

        //群发,服务端向每个连接上来的客户端群发消息
        NettyConfig.group.writeAndFlush(tws);

    }

    /**
     * 处理客户端向服务器发起http握手请求的服务
     * @param ctx
     * @param req
     */
    private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
        //如果不成功或不是websocket请求
        if(!req.getDecoderResult().isSuccess()  || !("websocket".equals(req.headers().get("Upgrade")))){
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }

        //创建工厂对象
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
        //创建handshaker对象
        handshaker = wsFactory.newHandshaker(req);
        if(handshaker == null){
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        }else {
            handshaker.handshake(ctx.channel(), req);
        }


    }

    /**
     * 服务器向客户端响应消息
     * @param ctx
     * @param req
     * @param res
     */
    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res){
        //请求失败
        if(res.getStatus().code() != 200){
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        //服务端向客户端发送数据
        ChannelFuture cf = ctx.channel().writeAndFlush(res);
        if(res.getStatus().code() != 200){
            cf.addListener(ChannelFutureListener.CLOSE);  //关闭连接
        }
    }
}

参考文档:

  1. https://zhuanlan.zhihu.com/p/74