fury icon indicating copy to clipboard operation
fury copied to clipboard

[Java][Protocol] Chunk by chunk predictive map serialization protocol

Open chaokunyang opened this issue 2 years ago • 3 comments

Is your feature request related to a problem? Please describe.

Optimize Collection/Map serialization by potential homogenization in elements:

  • Collection: elements are all not-empty mostly, and all elements are same type
  • Map: all keys are not empty mostly, all keys are same type; all values are not empty mostly, all values are same type.

By using those information, the serialization performance can be enhanced and the size of serialized binary can be smaller.

For collection, we can compute header before serializing elements, since iterating collection is cheap. But for map iteration, it's expensive, it takes same cost as serialization for Map<Integer, Integer>.

We need to finish kv writing and header writing in one-round iteration.

Describe the solution you'd like

Users can use MapFieldInfo annotation to provide header in advance. Otherwise Fury will use first key-value pair to predict header optimistically, and update the chunk header if predict failed at some pair.

Fury will serialize map chunk by chunk, every chunk has 127 pairs at most.

+----------------+----------------+~~~~~~~~~~~~~~~~~+
| chunk size: N  |    KV header   |   N*2 objects   |
+----------------+----------------+~~~~~~~~~~~~~~~~~+

KV header:

  • If track key ref, use first bit 0b1 of header to flag it.
  • If key has null, use second bit 0b10 of header to flag it. If ref tracking is enabled for this key type, this flag is invalid.
  • If map key type is not declared type, use 3rd bit 0b100 of header to flag it.
  • If map key type different, use 4rd bit 0b1000 of header to flag it.
  • If track value ref, use 5rd bit 0b10000 of header to flag it.
  • If value has null, use 6rd bit 0b100000 of header to flag it. If ref tracking is enabled for this value type, this flag is invalid.
  • If map value type is not declared type, use 7rd bit 0b1000000 of header to flag it.
  • If map value type different, use 8rd bit 0b10000000 of header to flag it.

If streaming write is enabled, which means Fury can't update written chunk size. In such cases, map key-value data format will be:

+----------------+~~~~~~~~~~~~~~~~~+
|    KV header   |   N*2 objects   |
+----------------+~~~~~~~~~~~~~~~~~+

KV header will be header marked by MapFieldInfo in java. For languages such as golang, this can be computed in advance for non-interface type mostly.

Additional context

#923

chaokunyang avatar Oct 01 '23 09:10 chaokunyang

map iteration and serialization benchmark:

  @Benchmark
  public Object computeMapHeader() {
    boolean containKeyNull = false;
    boolean containValueNull = false;
    Class<?> keyClass = null, valueClass = null;
    boolean keySameClass = true;
    boolean valueSameClass = true;
    int count = 0;
    for (Map.Entry<Object, Object> entry : mapForIterating.entrySet()) {
      Object key = entry.getKey();
      Object value = entry.getValue();
      count++;
      if (key == null) {
        containKeyNull = true;
      } else if (keyClass == null) {
        keyClass = key.getClass();
      } else if (keyClass != key.getClass()) {
        keySameClass = false;
      }
      if (value == null) {
        containValueNull = true;
       } else if (valueClass == null) {
        valueClass = value.getClass();
      } else if (valueClass != value.getClass()) {
        valueSameClass = false;
      }
    }
    return new boolean[] {containKeyNull, keySameClass, containValueNull, valueSameClass, count % 2 != 0};
  }


  @Benchmark
  public Object iterateMap() {
    int count = 0;
    for (Map.Entry<Object, Object> entry : mapForIterating.entrySet()) {
      Object key = entry.getKey();
      Object value = entry.getValue();
      // hole.consume(key);
      // hole.consume(value);
      count++;
    }
    return count;
  }

  @Benchmark
  public Object serializeMap() {
    buffer.writerIndex(0);
    for (Map.Entry<Object, Object> entry : mapForIterating.entrySet()) {
      Object key = entry.getKey();
      Object value = entry.getValue();
      fury.writeRef(buffer, key);
      fury.writeRef(buffer, value);
    }
    return buffer;
  }

  @Benchmark
  public Object serializeMap2() {
    buffer.writerIndex(0);
    Map<Integer, Integer> map = (Map) mapForIterating;
    for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
      Integer key = entry.getKey();
      Integer value = entry.getValue();
      buffer.writeInt(key);
      buffer.writeInt(value);
    }
    return buffer;
  }

  @Benchmark
  public Object serialize() {
    return Tuple2.of(computeMapHeader(), serializeMap2());
  }

  @Benchmark
  public Object serializeOpt() {
    buffer.writerIndex(0);
    Map<Integer, Integer> map = (Map) mapForIterating;
    boolean containKeyNull = false;
    boolean containValueNull = false;
    Class<?> keyClass = null, valueClass = null;
    boolean keySameClass = true;
    boolean valueSameClass = true;
    int count = 0;
    for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
      Integer key = entry.getKey();
      Integer value = entry.getValue();
      count++;
      if (key == null) {
        containKeyNull = true;
      } else if (keyClass == null) {
        keyClass = key.getClass();
      } else if (keyClass != key.getClass()) {
        keySameClass = false;
      }
      if (value == null) {
        containValueNull = true;
      } else if (valueClass == null) {
        valueClass = value.getClass();
      } else if (valueClass != value.getClass()) {
        valueSameClass = false;
      }
      buffer.writeInt(key);
      buffer.writeInt(value);
    }
    return Tuple2.of(new boolean[] {containKeyNull, keySameClass, containValueNull, valueSameClass, count % 2 != 0}, buffer);
  }

Iteration is almost slow as write data for a map with size 100.

Benchmark                   Mode  Cnt        Score        Error  Units
MapSuite.computeMapHeader  thrpt   15   962145.162 ±  23937.297  ops/s
MapSuite.iterateMap        thrpt   15  1001955.918 ±  66110.121  ops/s
MapSuite.serialize         thrpt   15   445024.638 ±  65968.399  ops/s
MapSuite.serializeMap      thrpt   15   620651.930 ±  14956.265  ops/s
MapSuite.serializeMap2     thrpt   15   911471.607 ± 123342.194  ops/s
MapSuite.serializeOpt      thrpt   15   919619.780 ±  11398.077  ops/s

chaokunyang avatar Dec 22 '23 12:12 chaokunyang

i am interesting in this issue. I have some question :

  1. how user use MapFieldInfo annotation to provide header? could you provide some example?
  2. Is this class AbstractMapSerializer the file mainly modified by this change
  3. could you assign this issue to me?

Hen1ng avatar Jun 30 '24 07:06 Hen1ng

  1. MapFieldInfo is not implemented currently, you can leave it in next PR. The usage is like:
public class Struct {
@MapFieldInfo(keyNullable=false, valueNullable=false);
Map<String, Integer> map;
}
  1. AbstractMapSerializer is the main file, another file is BaseObjectCodecBuilder, but that can be left in another PR
  2. assigned, thanks for contributing to Apache Fury

chaokunyang avatar Jun 30 '24 11:06 chaokunyang