什么时候可以加 @Sharable

  • 当 handler 不保存状态时,就可以安全地在多线程下被共享
  • 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
  • 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. /**
  4. * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
  5. */
  6. public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
  7. @Override
  8. protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
  9. ByteBuf out = ctx.alloc().buffer();
  10. // 1. 4 字节的魔数
  11. out.writeBytes(new byte[]{1, 2, 3, 4});
  12. // 2. 1 字节的版本,
  13. out.writeByte(1);
  14. // 3. 1 字节的序列化方式 jdk 0 , json 1
  15. out.writeByte(0);
  16. // 4. 1 字节的指令类型
  17. out.writeByte(msg.getMessageType());
  18. // 5. 4 个字节
  19. out.writeInt(msg.getSequenceId());
  20. // 无意义,对齐填充
  21. out.writeByte(0xff);
  22. // 6. 获取内容的字节数组
  23. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  24. ObjectOutputStream oos = new ObjectOutputStream(bos);
  25. oos.writeObject(msg);
  26. byte[] bytes = bos.toByteArray();
  27. // 7. 长度
  28. out.writeInt(bytes.length);
  29. // 8. 写入内容
  30. out.writeBytes(bytes);
  31. outList.add(out);
  32. }
  33. @Override
  34. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  35. int magicNum = in.readInt();
  36. byte version = in.readByte();
  37. byte serializerType = in.readByte();
  38. byte messageType = in.readByte();
  39. int sequenceId = in.readInt();
  40. in.readByte();
  41. int length = in.readInt();
  42. byte[] bytes = new byte[length];
  43. in.readBytes(bytes, 0, length);
  44. ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
  45. Message message = (Message) ois.readObject();
  46. log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
  47. log.debug("{}", message);
  48. out.add(message);
  49. }
  50. }