Codecs and Buffer

字节码操作对大量数据管道配置类的应用来说是一个核心关注点。它广泛应用于reactor-net模块来序列化和反序列化从IO中接收或发送的字节码。

reactor.io.buffer.Buffer是Java中ByteBuffer的装饰器,增加了一系列操作。目的是通过ByteBuffer的限制和读取或者覆盖预分配的字节来减少对字节的复制。追踪ByteBuffer的位置能把开发者折腾的头痛不已。Buffer帮我们处理了这些问题,我们决定把这个简单的工具带给用户。

下面是在Groovy Spock测试中写的一个简单操作Buffer的例子:

  1. import reactor.io.buffer.Buffer
  2. //...
  3. given: "an empty Buffer and a full Buffer"
  4. def buff = new Buffer()
  5. def fullBuff = Buffer.wrap("Hello World!")
  6. when: "a Buffer is appended"
  7. buff.append(fullBuff)
  8. then: "the Buffer was added"
  9. buff.position() == 12
  10. buff.flip().asString() == "Hello World!"

Buffer的一种十分有用的应用是Buffer.View,很多操作如split()就会返回它。它提供一种无需拷贝的方式来扫描和检索ByteBuffer中的字节码。Buffer.View也是一种Buffer,所以有些操作也是对外暴露的。

使用一个分隔符和Buffer.View来复用一些相同的字节码:

  1. byte delimiter = (byte) ';';
  2. byte innerDelimiter = (byte) ',';
  3. Buffer buffer = Buffer.wrap("a;b-1,b-2;c;d;");
  4. List<Buffer.View> views = buffer.split(delimiter);
  5. int viewCount = views.size();
  6. Assert.isTrue(viewCount == 4);
  7. for (Buffer.View view : views) {
  8. System.out.println(view.get().asString()); //prints "a" then "b-1,b-2", then "c" and finally "d"
  9. if(view.indexOf(innerDelimiter) != -1){
  10. for(Buffer.View innerView : view.get().split(innerDelimiter)){
  11. System.out.println(innerView.get().asString()); //prints "b-1" and "b-2"
  12. }
  13. }
  14. }

在正常的序列化/反序列化的场景中使用Buffer会显得有些层级过低,Reactor提供一些称为Codec的预定义转换器。一些Codec在使用时需要在classpath下增加一些额外的依赖,比如处理Json时需要用到Jackson。

Codec有两种工作方式,第一种是实现Funcation来直接编码并且返回编码后的数据,通常以Buffer的表单形式返回。这非常棒,但仅限于与无状态的Codec工作才能起效,另一个可选的方法是使用Codec.encoder()返回的编码函数。

Codec.encoder() vs Codec.apply(Source)

  • Codec.encoder() 返回一个唯一编码函数,且不能在多个线程间共享。
  • Codec.apply() 直接编码(并保存分配的编码器),但是Codec本身需要在多个线程间共享。

Reactor Net模块实际上通过每个新线程调用一次Codec.encoder来避免线程安全问题

对于大部分实现了BufferCodec来说,Codec同样也能通过源类型解码数据。我们需要通过Codec.decoder()来获取一个解码函数用于解码数据。和编码不同的是,没有可重写的方法用于解码数据。和编码相同的是,解码方法也不是线程安全的。

有两种形式的Codec.decoder()函数,其中一个简单的返回解码数据,另一个Codec.decoder(Consumer)将会调用consumer处理每次解码事件。

Codec.decoder() vs Codec.decoder(Consumer)

  • Codec.decoder() 是一种阻塞式的解码函数,它将会直接返回传入源数据对应的解码数据。
  • Codec.decoder(Consumer) 可以用于非阻塞式解码,调用时不会返回任何东西,仅仅在解码完毕时对传入Consumer进行回调。它可以与任何异步场景进行整合。

使用一个预定义的Codecs在单元测试中进行验证:

  1. import reactor.io.json.JsonCodec
  2. //...
  3. given: 'A JSON codec'
  4. def codec = new JsonCodec<Map<String, Object>, Object>(Map);
  5. def latch = new CountDownLatch(1)
  6. when: 'The decoder is passed some JSON'
  7. Map<String, Object> decoded;
  8. def callbackDecoder = codec.decoder{
  9. decoded = it
  10. latch.countDown()
  11. }
  12. def blockingDecoder = codec.decoder()
  13. //yes this is real simple async strategy, but that's not the point here :)
  14. Thread.start{
  15. callbackDecoder.apply(Buffer.wrap("{\"a\": \"alpha\"}"))
  16. }
  17. def decodedMap = blockingDecoder.apply(Buffer.wrap("{\"a\": \"beta\"}")
  18. then: 'The decoded maps have the expected entries'
  19. latch.await()
  20. decoded.size() == 1
  21. decoded['a'] == 'alpha'
  22. decodedMap['a'] == 'beta'

Table 4. 可用的核心Codec:

名称 描述 需要的依赖
ByteArrayCodec Wrap/unwrap byte arrays from/to Buffer. N/A
DelimitedCodec Split/Aggregate Buffer and delegate to the passed Codec for unit marshalling. N/A
FrameCodec Split/Aggregate Buffer into Frame Buffers according to successive prefix lengths. N/A
JavaSerializationCodec Deserialize/Serialize Buffers using Java Serialization. N/A
PassThroughCodec Leave the Buffers untouched. N/A
StringCodec Convert String to/from Buffer N/A
LengthFieldCodec Find the length and decode/encode the appropriate number of bytes into/from Buffer N/A
KryoCodec Convert Buffer into Java objects using Kryo with Buffers com.esotericsoftware.kryo:kryo
JsonCodec,JacksonJsonCodec Convert Buffer into Java objects using Jackson with Buffers com.fasterxml.jackson.core:jackson-databind
SnappyCodec A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer org.xerial.snappy:snappy-java
GZipCodec A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer N/A