1、协议举例

1.1、redis协议举例

  1. NioEventLoopGroup worker = new NioEventLoopGroup();
  2. byte[] LINE = {13, 10};
  3. try {
  4. Bootstrap bootstrap = new Bootstrap();
  5. bootstrap.channel(NioSocketChannel.class);
  6. bootstrap.group(worker);
  7. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  8. @Override
  9. protected void initChannel(SocketChannel ch) {
  10. ch.pipeline().addLast(new LoggingHandler());
  11. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  12. // 会在连接 channel 建立成功后,会触发 active 事件
  13. @Override
  14. public void channelActive(ChannelHandlerContext ctx) {
  15. set(ctx);
  16. get(ctx);
  17. }
  18. private void get(ChannelHandlerContext ctx) {
  19. ByteBuf buf = ctx.alloc().buffer();
  20. buf.writeBytes("*2".getBytes());
  21. buf.writeBytes(LINE);
  22. buf.writeBytes("$3".getBytes());
  23. buf.writeBytes(LINE);
  24. buf.writeBytes("get".getBytes());
  25. buf.writeBytes(LINE);
  26. buf.writeBytes("$3".getBytes());
  27. buf.writeBytes(LINE);
  28. buf.writeBytes("aaa".getBytes());
  29. buf.writeBytes(LINE);
  30. ctx.writeAndFlush(buf);
  31. }
  32. private void set(ChannelHandlerContext ctx) {
  33. ByteBuf buf = ctx.alloc().buffer();
  34. buf.writeBytes("*3".getBytes());
  35. buf.writeBytes(LINE);
  36. buf.writeBytes("$3".getBytes());
  37. buf.writeBytes(LINE);
  38. buf.writeBytes("set".getBytes());
  39. buf.writeBytes(LINE);
  40. buf.writeBytes("$3".getBytes());
  41. buf.writeBytes(LINE);
  42. buf.writeBytes("aaa".getBytes());
  43. buf.writeBytes(LINE);
  44. buf.writeBytes("$3".getBytes());
  45. buf.writeBytes(LINE);
  46. buf.writeBytes("bbb".getBytes());
  47. buf.writeBytes(LINE);
  48. ctx.writeAndFlush(buf);
  49. }
  50. @Override
  51. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  52. ByteBuf buf = (ByteBuf) msg;
  53. System.out.println(buf.toString(Charset.defaultCharset()));
  54. }
  55. });
  56. }
  57. });
  58. ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
  59. channelFuture.channel().closeFuture().sync();
  60. } catch (InterruptedException e) {
  61. log.error("client error", e);
  62. } finally {
  63. worker.shutdownGracefully();
  64. }

1.2、http协议举例

  1. @Slf4j
  2. public class TestHttp {
  3. public static void main(String[] args) {
  4. NioEventLoopGroup boss = new NioEventLoopGroup();
  5. NioEventLoopGroup worker = new NioEventLoopGroup();
  6. try {
  7. ServerBootstrap serverBootstrap = new ServerBootstrap();
  8. serverBootstrap.channel(NioServerSocketChannel.class);
  9. serverBootstrap.group(boss, worker);
  10. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  11. @Override
  12. protected void initChannel(SocketChannel ch) throws Exception {
  13. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
  14. ch.pipeline().addLast(new HttpServerCodec());
  15. ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
  16. @Override
  17. protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
  18. // 获取请求
  19. log.debug(msg.uri());
  20. // 返回响应
  21. DefaultFullHttpResponse response =
  22. new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
  23. byte[] bytes = "<h1>Hello, world!</h1>".getBytes();
  24. response.headers().setInt(CONTENT_LENGTH, bytes.length);
  25. response.content().writeBytes(bytes);
  26. // 写回响应
  27. ctx.writeAndFlush(response);
  28. }
  29. });
  30. /*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  31. @Override
  32. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  33. log.debug("{}", msg.getClass());
  34. if (msg instanceof HttpRequest) { // 请求行,请求头
  35. } else if (msg instanceof HttpContent) { //请求体
  36. }
  37. }
  38. });*/
  39. }
  40. });
  41. ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
  42. channelFuture.channel().closeFuture().sync();
  43. } catch (InterruptedException e) {
  44. log.error("server error", e);
  45. } finally {
  46. boss.shutdownGracefully();
  47. worker.shutdownGracefully();
  48. }
  49. }
  50. }

2、自定义协议

2.1、自定义协议要素

  • 魔数,用来在第一时间判定是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊… 跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

2.2、示例

  1. @Slf4j
  2. public class MessageCodec extends ByteToMessageCodec<Message> {
  3. @Override
  4. protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
  5. //1、魔数 4字节
  6. byteBuf.writeInt(Integer.MAX_VALUE);
  7. //2、版本号 1字节
  8. byteBuf.writeByte(1);
  9. //3、序列化算法 1 json 1字节
  10. byteBuf.writeByte(1);
  11. //无效填充 1字节
  12. byteBuf.writeByte(128);
  13. //4、消息类型 1字节
  14. byteBuf.writeByte(message.getMessageType());
  15. //5、请求序号 4字节
  16. byteBuf.writeInt(message.getSeqNo());
  17. //6、数据长度 4字节
  18. final byte[] bytes = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8);
  19. byteBuf.writeInt(bytes.length);
  20. //7、数据
  21. byteBuf.writeBytes(bytes);
  22. }
  23. @Override
  24. protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
  25. //1、魔数 4字节
  26. final int magicNumber = byteBuf.readInt();
  27. //2、版本号 1字节
  28. final byte version = byteBuf.readByte();
  29. //3、序列化算法 1 json 1字节
  30. final byte serializerType = byteBuf.readByte();
  31. //无效填充 1字节
  32. byteBuf.readByte();
  33. //4、消息类型 1字节
  34. final byte messageType = byteBuf.readByte();
  35. //5、请求序号 4字节
  36. final int seqNo = byteBuf.readInt();
  37. //6、数据长度 4字节
  38. final int length = byteBuf.readInt();
  39. //7、数据
  40. byte[] bytes = new byte[length];
  41. byteBuf.readBytes(bytes,0,length);
  42. final Message message = JSON.parseObject(bytes.toString(), Message.class);
  43. log.info("magicNumber: {} , version: {} , serializerType: {} , messageType: {} , seqNo: {} , length: {} , data: {}"
  44. ,magicNumber,version,serializerType,messageType,seqNo,length,message);
  45. list.add(message);
  46. }
  47. public static void main(String[] args) {
  48. final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new LoggingHandler(LogLevel.DEBUG),
  49. new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
  50. new MessageCodec());
  51. LoginRequestMessage loginRequestMessage = new LoginRequestMessage("zs","123456");
  52. embeddedChannel.writeOutbound(loginRequestMessage);
  53. }
  54. }
  1. @Data
  2. public abstract class Message implements Serializable {
  3. private String seqNo;
  4. public abstract Integer getMessageType();
  5. public abstract Integer getSeqNo();
  6. }
  1. @Data
  2. public class LoginRequestMessage extends Message{
  3. private String userName;
  4. private String password;
  5. public LoginRequestMessage(String userName, String password) {
  6. this.userName = userName;
  7. this.password = password;
  8. }
  9. @Override
  10. public Integer getMessageType() {
  11. return 1;
  12. }
  13. @Override
  14. public Integer getSeqNo() {
  15. return 1;
  16. }
  17. }