1、协议举例
1.1、redis协议举例
NioEventLoopGroup worker = new NioEventLoopGroup();byte[] LINE = {13, 10};try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 会在连接 channel 建立成功后,会触发 active 事件 @Override public void channelActive(ChannelHandlerContext ctx) { set(ctx); get(ctx); } private void get(ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes("*2".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("get".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("aaa".getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } private void set(ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes("*3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("set".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("aaa".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("bbb".getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(buf.toString(Charset.defaultCharset())); } }); } }); ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync(); channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) { log.error("client error", e);} finally { worker.shutdownGracefully();}
1.2、http协议举例
@Slf4jpublic class TestHttp { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { // 获取请求 log.debug(msg.uri()); // 返回响应 DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK); byte[] bytes = "<h1>Hello, world!</h1>".getBytes(); response.headers().setInt(CONTENT_LENGTH, bytes.length); response.content().writeBytes(bytes); // 写回响应 ctx.writeAndFlush(response); } }); /*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("{}", msg.getClass()); if (msg instanceof HttpRequest) { // 请求行,请求头 } else if (msg instanceof HttpContent) { //请求体 } } });*/ } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } }}
2、自定义协议
2.1、自定义协议要素
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊… 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
2.2、示例
@Slf4jpublic class MessageCodec extends ByteToMessageCodec<Message> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception { //1、魔数 4字节 byteBuf.writeInt(Integer.MAX_VALUE); //2、版本号 1字节 byteBuf.writeByte(1); //3、序列化算法 1 json 1字节 byteBuf.writeByte(1); //无效填充 1字节 byteBuf.writeByte(128); //4、消息类型 1字节 byteBuf.writeByte(message.getMessageType()); //5、请求序号 4字节 byteBuf.writeInt(message.getSeqNo()); //6、数据长度 4字节 final byte[] bytes = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8); byteBuf.writeInt(bytes.length); //7、数据 byteBuf.writeBytes(bytes); } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { //1、魔数 4字节 final int magicNumber = byteBuf.readInt(); //2、版本号 1字节 final byte version = byteBuf.readByte(); //3、序列化算法 1 json 1字节 final byte serializerType = byteBuf.readByte(); //无效填充 1字节 byteBuf.readByte(); //4、消息类型 1字节 final byte messageType = byteBuf.readByte(); //5、请求序号 4字节 final int seqNo = byteBuf.readInt(); //6、数据长度 4字节 final int length = byteBuf.readInt(); //7、数据 byte[] bytes = new byte[length]; byteBuf.readBytes(bytes,0,length); final Message message = JSON.parseObject(bytes.toString(), Message.class); log.info("magicNumber: {} , version: {} , serializerType: {} , messageType: {} , seqNo: {} , length: {} , data: {}" ,magicNumber,version,serializerType,messageType,seqNo,length,message); list.add(message); } public static void main(String[] args) { final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new LoggingHandler(LogLevel.DEBUG), new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0), new MessageCodec()); LoginRequestMessage loginRequestMessage = new LoginRequestMessage("zs","123456"); embeddedChannel.writeOutbound(loginRequestMessage); }}
@Datapublic abstract class Message implements Serializable { private String seqNo; public abstract Integer getMessageType(); public abstract Integer getSeqNo();}
@Datapublic class LoginRequestMessage extends Message{ private String userName; private String password; public LoginRequestMessage(String userName, String password) { this.userName = userName; this.password = password; } @Override public Integer getMessageType() { return 1; } @Override public Integer getSeqNo() { return 1; }}