1.介绍
WebSocket是一种H5的规范,一个持久化的协议,从2011年成为国际标准(RFC6455)。其最大特点:通过握手机制在客户端和服务器之间能够建立一个类似TCP的连接,即服务器可以主动向客户端推送消息,客户端同样也可以主动向服务器推送消息,是真正双向平等。
WebSocket的主要特点:
- 建立在 TCP 协议之上,服务器端的实现比较容易。
- 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
- 数据格式比较轻量,性能开销小,通信高效。
- 可以发送文本,也可以发送二进制数据。
- 没有同源限制,客户端可以与任意服务器通信。
- 协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。
2.基本属性
属性 | 描述 |
---|---|
Socket.readyState | 只读属性 readyState 表示连接状态,可以是以下值: 0 - 表示连接尚未建立。 1 - 表示连接已建立,可以进行通信。 2 - 表示连接正在进行关闭。 3 - 表示连接已经关闭或者连接不能打开 |
Socket.bufferedAmount | 只读属性 bufferedAmount 已被 send() 放入正在队列中等待传输,但还没有发出的 UTF-8文本字节数 |
事件 | 事件处理程序 | 描述 |
---|---|---|
open | Socket.onpen | 连接建立时触发 |
message | Socket.onmessage | 客户端接收服务端数据时触发 |
error | Socket.onerror | 通信发生错误时触发 |
close | Socket.onclose | 连接关闭时触发 |
方法 | 描述 |
---|---|
Socket.send() | 使用连接,发送数据 |
Socket.close() | 关闭连接 |
3. 使用
<template>
<div>
<el-input v-model="params" clearable/>
<el-button type="primary" @click="send">发消息</el-button>
</div>
</template>
<script>
export default {
data() {
return {
params: '',
path: "ws://localhost:8080/websocket",
socket: ""
}
},
mounted() {
// 初始化
this.init()
},
destroyed() {
// 销毁监听
this.socket.onclose = this.close
},
methods: {
init: function () {
if (typeof (WebSocket) === "undefined") {
console.log("您的浏览器不支持socket")
} else {
// 实例化socket
this.socket = new WebSocket(this.path)
// 监听socket连接成功回调
this.socket.onopen = this.open
// 监听socket连接失败回调
this.socket.onerror = this.error
// 监听后台返回的socket消息
this.socket.onmessage = this.getMessage
}
},
open: function () {
console.log("socket连接成功")
},
error: function () {
console.log("连接错误")
},
getMessage: function (msg) {
console.log(msg.data)
},
send: function () {
this.socket.send(params)
},
close: function () {
console.log("socket已经关闭")
}
}
}
</script>
# nginx的配置,有需要注意的
## 1. 需要添加一个map,用来区分协议是 http还是websocket
map $http_upgrade $connection_upgrade {
default keep-alive; #默认为keep-alive 可以支持 一般http请求
'websocket' upgrade; #如果为websocket 则为 upgrade
}
## 2. 请求上需要添加的配置
# 反向代理保留客户端地址
proxy_set_header Host $host;
proxy_set_header X-Real_IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# WebSocket 额外请求头
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
# WebSocket 设置断开连接的时间
# 该时间默认是60s,一定要大于心跳时间
proxy_read_timeout 2400s;
// Java的配置
import com.websocket.config.NettyConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import java.util.Date;
/**
* @ClassName: WebSocketHandler
* @Description: 接收/处理/响应客户端websocket请求的核心业务处理类
* @Author: liuhefei
* @Date: 2019/7/21
* @blog: https://www.imooc.com/u/1323320/articles
**/
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketServerHandshaker handshaker;
private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
//客户端与服务端创建连接的时候调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
//super.channelActive(ctx);
NettyConfig.group.add(ctx.channel()); //保存channel
System.out.println("客户端与服务端连接开启......");
}
//客户端与服务器断开连接的时候调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception{
//super.channelInactive(ctx);
NettyConfig.group.remove(ctx.channel()); //移除channel
System.out.println("客户端与服务端连接关闭......");
}
//服务器接收客户端发送过来的数据结束之后调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
//super.channelReadComplete(ctx);
ctx.flush();
}
//在工程出现异常的时候调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception{
//super.exceptionCaught(ctx, cause);
cause.printStackTrace();
ctx.close(); //关闭
}
//服务端处理客户端websocket请求的核心方法
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
//1. 客户端向服务器发送握手请求
//2. 建立websocket连接
//处理客户端向服务器发起http握手请求的业务
if(msg instanceof FullHttpRequest){
handHttpRequest(ctx, (FullHttpRequest) msg);
}else if(msg instanceof WebSocketFrame){ //处理websocket连接业务
handWebsocketFrame(ctx, (WebSocketFrame) msg);
}
}
/**
* 处理客户端与服务器之间的websocket业务
* @param ctx
* @param frame
*/
private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
//判断是否是关闭websocket的指令
if(frame instanceof CloseWebSocketFrame){
handshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
}
//判断是否是ping消息
if(frame instanceof PingWebSocketFrame){
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
//判断是否是二进制消息,如果是二进制消息,抛出异常, 不支持二进制消息
if(!(frame instanceof TextWebSocketFrame)){
System.out.println("目前我们不支持二进制消息");
throw new RuntimeException("【" + this.getClass().getName() + "】不支持消息");
}
//返回应答消息
//获取客户端向服务器端发送的消息
String request = ((TextWebSocketFrame) frame).text();
System.out.println("服务端收到客户端的消息====>>>" + request);
//创建TextWebSocketFrame对象,接收客户端发送过来的消息
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + "=====>>>>" + request);
//群发,服务端向每个连接上来的客户端群发消息
NettyConfig.group.writeAndFlush(tws);
}
/**
* 处理客户端向服务器发起http握手请求的服务
* @param ctx
* @param req
*/
private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
//如果不成功或不是websocket请求
if(!req.getDecoderResult().isSuccess() || !("websocket".equals(req.headers().get("Upgrade")))){
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
//创建工厂对象
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
//创建handshaker对象
handshaker = wsFactory.newHandshaker(req);
if(handshaker == null){
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
}else {
handshaker.handshake(ctx.channel(), req);
}
}
/**
* 服务器向客户端响应消息
* @param ctx
* @param req
* @param res
*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res){
//请求失败
if(res.getStatus().code() != 200){
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
//服务端向客户端发送数据
ChannelFuture cf = ctx.channel().writeAndFlush(res);
if(res.getStatus().code() != 200){
cf.addListener(ChannelFutureListener.CLOSE); //关闭连接
}
}
}
参考文档: