[Java][Protocol] Chunk by chunk predictive map serialization protocol
Is your feature request related to a problem? Please describe.
Optimize Collection/Map serialization by potential homogenization in elements:
- Collection: elements are all not-empty mostly, and all elements are same type
- Map: all keys are not empty mostly, all keys are same type; all values are not empty mostly, all values are same type.
By using those information, the serialization performance can be enhanced and the size of serialized binary can be smaller.
For collection, we can compute header before serializing elements, since iterating collection is cheap. But for map iteration, it's expensive, it takes same cost as serialization for Map<Integer, Integer>.
We need to finish kv writing and header writing in one-round iteration.
Describe the solution you'd like
Users can use MapFieldInfo annotation to provide header in advance. Otherwise Fury will use first key-value pair to
predict header optimistically, and update the chunk header if predict failed at some pair.
Fury will serialize map chunk by chunk, every chunk has 127 pairs at most.
+----------------+----------------+~~~~~~~~~~~~~~~~~+
| chunk size: N | KV header | N*2 objects |
+----------------+----------------+~~~~~~~~~~~~~~~~~+
KV header:
- If track key ref, use first bit
0b1of header to flag it. - If key has null, use second bit
0b10of header to flag it. If ref tracking is enabled for this key type, this flag is invalid. - If map key type is not declared type, use 3rd bit
0b100of header to flag it. - If map key type different, use 4rd bit
0b1000of header to flag it. - If track value ref, use 5rd bit
0b10000of header to flag it. - If value has null, use 6rd bit
0b100000of header to flag it. If ref tracking is enabled for this value type, this flag is invalid. - If map value type is not declared type, use 7rd bit
0b1000000of header to flag it. - If map value type different, use 8rd bit
0b10000000of header to flag it.
If streaming write is enabled, which means Fury can't update written chunk size. In such cases, map key-value data
format will be:
+----------------+~~~~~~~~~~~~~~~~~+
| KV header | N*2 objects |
+----------------+~~~~~~~~~~~~~~~~~+
KV header will be header marked by MapFieldInfo in java. For languages such as golang, this can be computed in
advance for non-interface type mostly.
Additional context
#923
map iteration and serialization benchmark:
@Benchmark
public Object computeMapHeader() {
boolean containKeyNull = false;
boolean containValueNull = false;
Class<?> keyClass = null, valueClass = null;
boolean keySameClass = true;
boolean valueSameClass = true;
int count = 0;
for (Map.Entry<Object, Object> entry : mapForIterating.entrySet()) {
Object key = entry.getKey();
Object value = entry.getValue();
count++;
if (key == null) {
containKeyNull = true;
} else if (keyClass == null) {
keyClass = key.getClass();
} else if (keyClass != key.getClass()) {
keySameClass = false;
}
if (value == null) {
containValueNull = true;
} else if (valueClass == null) {
valueClass = value.getClass();
} else if (valueClass != value.getClass()) {
valueSameClass = false;
}
}
return new boolean[] {containKeyNull, keySameClass, containValueNull, valueSameClass, count % 2 != 0};
}
@Benchmark
public Object iterateMap() {
int count = 0;
for (Map.Entry<Object, Object> entry : mapForIterating.entrySet()) {
Object key = entry.getKey();
Object value = entry.getValue();
// hole.consume(key);
// hole.consume(value);
count++;
}
return count;
}
@Benchmark
public Object serializeMap() {
buffer.writerIndex(0);
for (Map.Entry<Object, Object> entry : mapForIterating.entrySet()) {
Object key = entry.getKey();
Object value = entry.getValue();
fury.writeRef(buffer, key);
fury.writeRef(buffer, value);
}
return buffer;
}
@Benchmark
public Object serializeMap2() {
buffer.writerIndex(0);
Map<Integer, Integer> map = (Map) mapForIterating;
for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
Integer key = entry.getKey();
Integer value = entry.getValue();
buffer.writeInt(key);
buffer.writeInt(value);
}
return buffer;
}
@Benchmark
public Object serialize() {
return Tuple2.of(computeMapHeader(), serializeMap2());
}
@Benchmark
public Object serializeOpt() {
buffer.writerIndex(0);
Map<Integer, Integer> map = (Map) mapForIterating;
boolean containKeyNull = false;
boolean containValueNull = false;
Class<?> keyClass = null, valueClass = null;
boolean keySameClass = true;
boolean valueSameClass = true;
int count = 0;
for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
Integer key = entry.getKey();
Integer value = entry.getValue();
count++;
if (key == null) {
containKeyNull = true;
} else if (keyClass == null) {
keyClass = key.getClass();
} else if (keyClass != key.getClass()) {
keySameClass = false;
}
if (value == null) {
containValueNull = true;
} else if (valueClass == null) {
valueClass = value.getClass();
} else if (valueClass != value.getClass()) {
valueSameClass = false;
}
buffer.writeInt(key);
buffer.writeInt(value);
}
return Tuple2.of(new boolean[] {containKeyNull, keySameClass, containValueNull, valueSameClass, count % 2 != 0}, buffer);
}
Iteration is almost slow as write data for a map with size 100.
Benchmark Mode Cnt Score Error Units
MapSuite.computeMapHeader thrpt 15 962145.162 ± 23937.297 ops/s
MapSuite.iterateMap thrpt 15 1001955.918 ± 66110.121 ops/s
MapSuite.serialize thrpt 15 445024.638 ± 65968.399 ops/s
MapSuite.serializeMap thrpt 15 620651.930 ± 14956.265 ops/s
MapSuite.serializeMap2 thrpt 15 911471.607 ± 123342.194 ops/s
MapSuite.serializeOpt thrpt 15 919619.780 ± 11398.077 ops/s
i am interesting in this issue. I have some question :
- how user use MapFieldInfo annotation to provide header? could you provide some example?
- Is this class AbstractMapSerializer the file mainly modified by this change
- could you assign this issue to me?
MapFieldInfois not implemented currently, you can leave it in next PR. The usage is like:
public class Struct {
@MapFieldInfo(keyNullable=false, valueNullable=false);
Map<String, Integer> map;
}
AbstractMapSerializeris the main file, another file isBaseObjectCodecBuilder, but that can be left in another PR- assigned, thanks for contributing to Apache Fury