1、协议举例
1.1、redis协议举例
NioEventLoopGroup worker = new NioEventLoopGroup();
byte[] LINE = {13, 10};
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 会在连接 channel 建立成功后,会触发 active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) {
set(ctx);
get(ctx);
}
private void get(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*2".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("get".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("aaa".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
private void set(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("aaa".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("bbb".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
1.2、http协议举例
@Slf4j
public class TestHttp {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
// 获取请求
log.debug(msg.uri());
// 返回响应
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "<h1>Hello, world!</h1>".getBytes();
response.headers().setInt(CONTENT_LENGTH, bytes.length);
response.content().writeBytes(bytes);
// 写回响应
ctx.writeAndFlush(response);
}
});
/*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("{}", msg.getClass());
if (msg instanceof HttpRequest) { // 请求行,请求头
} else if (msg instanceof HttpContent) { //请求体
}
}
});*/
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
2、自定义协议
2.1、自定义协议要素
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊… 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
2.2、示例
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
//1、魔数 4字节
byteBuf.writeInt(Integer.MAX_VALUE);
//2、版本号 1字节
byteBuf.writeByte(1);
//3、序列化算法 1 json 1字节
byteBuf.writeByte(1);
//无效填充 1字节
byteBuf.writeByte(128);
//4、消息类型 1字节
byteBuf.writeByte(message.getMessageType());
//5、请求序号 4字节
byteBuf.writeInt(message.getSeqNo());
//6、数据长度 4字节
final byte[] bytes = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8);
byteBuf.writeInt(bytes.length);
//7、数据
byteBuf.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
//1、魔数 4字节
final int magicNumber = byteBuf.readInt();
//2、版本号 1字节
final byte version = byteBuf.readByte();
//3、序列化算法 1 json 1字节
final byte serializerType = byteBuf.readByte();
//无效填充 1字节
byteBuf.readByte();
//4、消息类型 1字节
final byte messageType = byteBuf.readByte();
//5、请求序号 4字节
final int seqNo = byteBuf.readInt();
//6、数据长度 4字节
final int length = byteBuf.readInt();
//7、数据
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes,0,length);
final Message message = JSON.parseObject(bytes.toString(), Message.class);
log.info("magicNumber: {} , version: {} , serializerType: {} , messageType: {} , seqNo: {} , length: {} , data: {}"
,magicNumber,version,serializerType,messageType,seqNo,length,message);
list.add(message);
}
public static void main(String[] args) {
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new LoggingHandler(LogLevel.DEBUG),
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new MessageCodec());
LoginRequestMessage loginRequestMessage = new LoginRequestMessage("zs","123456");
embeddedChannel.writeOutbound(loginRequestMessage);
}
}
@Data
public abstract class Message implements Serializable {
private String seqNo;
public abstract Integer getMessageType();
public abstract Integer getSeqNo();
}
@Data
public class LoginRequestMessage extends Message{
private String userName;
private String password;
public LoginRequestMessage(String userName, String password) {
this.userName = userName;
this.password = password;
}
@Override
public Integer getMessageType() {
return 1;
}
@Override
public Integer getSeqNo() {
return 1;
}
}