feat(java): Chunk by chunk predictive map serialization protocol (WIP)
What does this PR do?
Implement chunk based map serialization in #925. This pr doesn't provide JIT support, it will be implemented in later PR.
Related issues
Does this PR introduce any user-facing change?
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
Benchmark
Could we add license header for MapChunkWriter?
Hi @Hen1ng , I noticed we created a MapChunkWriter for serialization/deserialization everytime when we serialize a map. This will introduce extra object allocation cose.
And I found that we use fields in MapChunkWriter for maintain state across different elements serialization. This is good for code readability, but not efficient for execution, we will read and update fields frequently, this will introduce extra indirection and read/write cost. It would be faster if we keep state loca variables. In such ways, all state updates happens on stack, which will be much faster. And we used many small methods, we may need to inline it in the caller to reduce method cost of the method body is small.
Another thing I found is that we seems not support predict same type in this PR. For example:
public static void main(String[] args) {
Map<String, Integer> map =new HashMap<>(20);
for (int i = 0; i < 20; i++) {
map.put("Key"+i, i);
}
Fury fury = Fury.builder().withChunkSerializeMapEnable(true).build();
byte[] result = null;
for (int i = 0; i < 1000000000; i++) {
result = fury.serialize(map);
}
fury.deserialize(result);
}
With this PR, we still needs to write key type and value type for every element, could we optimize this in current PR?
Benchmark (size) (tracking) Mode Cnt Score Error Units HnBenchmark.testGeneralChunkWrite 64 true avgt 15 986.457 ± 64.592 ns/op HnBenchmark.testGeneralChunkWrite 64 false avgt 15 924.420 ± 201.541 ns/op HnBenchmark.testGeneralChunkWrite 128 true avgt 15 2029.786 ± 95.281 ns/op HnBenchmark.testGeneralChunkWrite 128 false avgt 15 1992.186 ± 173.750 ns/op HnBenchmark.testGeneralChunkWrite 256 true avgt 15 4305.362 ± 326.161 ns/op HnBenchmark.testGeneralChunkWrite 256 false avgt 15 3808.830 ± 120.738 ns/op HnBenchmark.testGeneralChunkWrite 512 true avgt 15 10309.996 ± 264.954 ns/op HnBenchmark.testGeneralChunkWrite 512 false avgt 15 9160.058 ± 1075.136 ns/op HnBenchmark.testGeneralWrite 64 true avgt 15 1384.715 ± 388.240 ns/op HnBenchmark.testGeneralWrite 64 false avgt 15 1522.562 ± 617.019 ns/op HnBenchmark.testGeneralWrite 128 true avgt 15 2718.254 ± 666.551 ns/op HnBenchmark.testGeneralWrite 128 false avgt 15 2916.637 ± 838.792 ns/op HnBenchmark.testGeneralWrite 256 true avgt 15 6086.414 ± 349.734 ns/op HnBenchmark.testGeneralWrite 256 false avgt 15 5871.677 ± 672.658 ns/op HnBenchmark.testGeneralWrite 512 true avgt 15 11769.064 ± 995.204 ns/op HnBenchmark.testGeneralWrite 512 false avgt 15 12268.517 ± 1140.401 ns/op
generalWrite benchmark compare, benchmark code
@State(Scope.Benchmark)
@BenchmarkMode({Mode.AverageTime})
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 3, time = 5)
@Threads(1)
@Fork(5)
public class HnBenchmark {
private Fury furyMapChunk;
private Fury fury;
Map<Integer, Integer> map;
MapBean mapBean = new MapBean();
Map<BeanB, BeanB> beanBBeanBMap = new HashMap<>();
Bean bean;
@Param({"64", "128", "256", "512"})
int size;
@Param({"true", "false"})
boolean tracking = false;
@Setup(Level.Trial)
public void init() {
furyMapChunk =
Fury.builder()
.withLanguage(Language.JAVA)
.withRefTracking(tracking)
.withCodegen(false)
.withChunkSerializeMapEnable(true)
.requireClassRegistration(false)
.build();
fury =
Fury.builder()
.withLanguage(Language.JAVA)
.withRefTracking(tracking)
.withCodegen(false)
.withChunkSerializeMapEnable(false)
.requireClassRegistration(false)
.build();
map = new HashMap<>(size);
for (int i = 0; i < size; i++) {
map.put(i, i);
beanBBeanBMap.put(new BeanB(), new BeanB());
}
bean = new Bean();
bean.setMap(map);
mapBean.setMap(beanBBeanBMap);
}
@Benchmark
public void testGeneralChunkWrite() {
final byte[] serialize = furyMapChunk.serialize(map);
}
@Benchmark
public void testGeneralWrite() {
final byte[] serialize = fury.serialize(map);
}
}
Benchmark (size) (tracking) Mode Cnt Score Error Units HnBenchmark.testFinalChunkWrite 64 true avgt 15 1513.621 ± 266.821 ns/op HnBenchmark.testFinalChunkWrite 64 false avgt 15 1548.258 ± 374.848 ns/op HnBenchmark.testFinalChunkWrite 128 true avgt 15 2587.836 ± 263.128 ns/op HnBenchmark.testFinalChunkWrite 128 false avgt 15 2411.845 ± 51.029 ns/op HnBenchmark.testFinalChunkWrite 256 true avgt 15 5246.138 ± 444.736 ns/op HnBenchmark.testFinalChunkWrite 256 false avgt 15 5413.328 ± 1192.068 ns/op HnBenchmark.testFinalChunkWrite 512 true avgt 15 12151.494 ± 954.000 ns/op HnBenchmark.testFinalChunkWrite 512 false avgt 15 11942.548 ± 775.937 ns/op HnBenchmark.testFinalWrite 64 true avgt 15 1814.819 ± 491.802 ns/op HnBenchmark.testFinalWrite 64 false avgt 15 1878.988 ± 795.618 ns/op HnBenchmark.testFinalWrite 128 true avgt 15 3626.284 ± 1315.089 ns/op HnBenchmark.testFinalWrite 128 false avgt 15 3224.561 ± 402.316 ns/op HnBenchmark.testFinalWrite 256 true avgt 15 7173.662 ± 671.952 ns/op HnBenchmark.testFinalWrite 256 false avgt 15 6983.448 ± 663.652 ns/op HnBenchmark.testFinalWrite 512 true avgt 15 13694.154 ± 940.861 ns/op HnBenchmark.testFinalWrite 512 false avgt 15 13999.483 ± 1102.000 ns/op
finalWrite benchmark compare, benchmark code is below
@State(Scope.Benchmark)
@BenchmarkMode({Mode.AverageTime})
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 3, time = 5)
@Threads(1)
@Fork(5)
public class HnBenchmark {
private Fury furyMapChunk;
private Fury fury;
Map<Integer, Integer> map;
MapBean mapBean = new MapBean();
Map<BeanB, BeanB> beanBBeanBMap = new HashMap<>();
Bean bean;
@Param({"64", "128", "256", "512"})
int size;
@Param({"true", "false"})
boolean tracking = false;
@Setup(Level.Trial)
public void init() {
furyMapChunk =
Fury.builder()
.withLanguage(Language.JAVA)
.withRefTracking(tracking)
.withCodegen(false)
.withChunkSerializeMapEnable(true)
.requireClassRegistration(false)
.build();
fury =
Fury.builder()
.withLanguage(Language.JAVA)
.withRefTracking(tracking)
.withCodegen(false)
.withChunkSerializeMapEnable(false)
.requireClassRegistration(false)
.build();
map = new HashMap<>(size);
for (int i = 0; i < size; i++) {
map.put(i, i);
}
bean = new Bean();
bean.setMap(map);
}
@Benchmark
public void testFinalChunkWrite() {
final byte[] serialize = furyMapChunk.serialize(bean);
}
@Benchmark
public void testFinalWrite() {
final byte[] serialize = fury.serialize(bean);
}
}
HnBenchmark.testNoFinalGenericChunkWrite 64 true avgt 15 5173.244 ± 551.374 ns/op HnBenchmark.testNoFinalGenericChunkWrite 64 false avgt 15 2549.857 ± 213.828 ns/op HnBenchmark.testNoFinalGenericChunkWrite 128 true avgt 15 6468.278 ± 309.073 ns/op HnBenchmark.testNoFinalGenericChunkWrite 128 false avgt 15 4765.756 ± 192.533 ns/op HnBenchmark.testNoFinalGenericChunkWrite 256 true avgt 15 12581.604 ± 460.256 ns/op HnBenchmark.testNoFinalGenericChunkWrite 256 false avgt 15 10055.729 ± 586.683 ns/op HnBenchmark.testNoFinalGenericWrite 64 true avgt 15 5140.018 ± 158.204 ns/op HnBenchmark.testNoFinalGenericWrite 64 false avgt 15 3775.049 ± 190.314 ns/op HnBenchmark.testNoFinalGenericWrite 128 true avgt 15 9481.691 ± 310.432 ns/op HnBenchmark.testNoFinalGenericWrite 128 false avgt 15 7771.257 ± 442.158 ns/op HnBenchmark.testNoFinalGenericWrite 256 true avgt 15 17014.393 ± 995.254 ns/op HnBenchmark.testNoFinalGenericWrite 256 false avgt 15 15370.666 ± 809.569 ns/op
testNoFinalGenericWrite benchmark compare, benchmark code is below
@State(Scope.Benchmark)
@BenchmarkMode({Mode.AverageTime})
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 3, time = 5)
@Threads(1)
@Fork(5)
public class HnBenchmark {
private Fury furyMapChunk;
private Fury fury;
Map<Integer, Integer> map;
MapBean mapBean = new MapBean();
Map<BeanB, BeanB> beanBBeanBMap = new HashMap<>();
Bean bean;
@Param({"64", "128", "256"})
int size;
@Param({"true", "false"})
boolean tracking = false;
@Setup(Level.Trial)
public void init() {
furyMapChunk =
Fury.builder()
.withLanguage(Language.JAVA)
.withRefTracking(tracking)
.withCodegen(false)
.withChunkSerializeMapEnable(true)
.requireClassRegistration(false)
.build();
fury =
Fury.builder()
.withLanguage(Language.JAVA)
.withRefTracking(tracking)
.withCodegen(false)
.withChunkSerializeMapEnable(false)
.requireClassRegistration(false)
.build();
map = new HashMap<>(size);
for (int i = 0; i < size; i++) {
map.put(i, i);
beanBBeanBMap.put(new BeanB(), new BeanB());
}
bean = new Bean();
bean.setMap(map);
mapBean.setMap(beanBBeanBMap);
}
@Benchmark
public void testNoFinalGenericChunkWrite() {
final byte[] serialize = furyMapChunk.serialize(mapBean);
}
@Benchmark
public void testNoFinalGenericWrite() {
final byte[] serialize = fury.serialize(mapBean);
}
}
Benchmark (size) (tracking) Mode Cnt Score Error Units HnBenchmark.testGeneralChunkWriteWithNull 64 true avgt 15 1228.736 ± 153.330 ns/op HnBenchmark.testGeneralChunkWriteWithNull 64 false avgt 15 1150.012 ± 83.370 ns/op HnBenchmark.testGeneralChunkWriteWithNull 128 true avgt 15 2393.119 ± 168.874 ns/op HnBenchmark.testGeneralChunkWriteWithNull 128 false avgt 15 2607.314 ± 463.271 ns/op HnBenchmark.testGeneralChunkWriteWithNull 256 true avgt 15 4554.671 ± 91.605 ns/op HnBenchmark.testGeneralChunkWriteWithNull 256 false avgt 15 4977.650 ± 1019.474 ns/op HnBenchmark.testGeneralWriteWithNull 64 true avgt 15 1935.727 ± 1269.534 ns/op HnBenchmark.testGeneralWriteWithNull 64 false avgt 15 1704.501 ± 604.193 ns/op HnBenchmark.testGeneralWriteWithNull 128 true avgt 15 2741.738 ± 664.690 ns/op HnBenchmark.testGeneralWriteWithNull 128 false avgt 15 2431.270 ± 65.081 ns/op HnBenchmark.testGeneralWriteWithNull 256 true avgt 15 5496.098 ± 117.704 ns/op HnBenchmark.testGeneralWriteWithNull 256 false avgt 15 5845.516 ± 90.525 ns/op
@State(Scope.Benchmark)
@BenchmarkMode({Mode.AverageTime})
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 3, time = 5)
@Threads(1)
@Fork(5)
public class HnBenchmark {
private Fury furyMapChunk;
private Fury fury;
Map<Integer, Integer> map;
MapBean mapBean = new MapBean();
Map<BeanB, BeanB> beanBBeanBMap = new HashMap<>();
Bean bean;
@Param({"64", "128", "256"})
int size;
@Param({"true", "false"})
boolean tracking = false;
@Setup(Level.Trial)
public void init() {
furyMapChunk =
Fury.builder()
.withLanguage(Language.JAVA)
.withRefTracking(tracking)
.withCodegen(false)
.withChunkSerializeMapEnable(true)
.requireClassRegistration(false)
.build();
fury =
Fury.builder()
.withLanguage(Language.JAVA)
.withRefTracking(tracking)
.withCodegen(false)
.withChunkSerializeMapEnable(false)
.requireClassRegistration(false)
.build();
map = new HashMap<>(size);
for (int i = 0; i < size; i++) {
if ( i == 0) {
map.put(null, i);
beanBBeanBMap.put(null, new BeanB());
continue;
}
if (i == 10) {
map.put(i, null);
beanBBeanBMap.put(new BeanB(), null);
continue;
}
map.put(i, i);
beanBBeanBMap.put(new BeanB(), new BeanB());
}
bean = new Bean();
bean.setMap(map);
mapBean.setMap(beanBBeanBMap);
}
@Benchmark
public void testGeneralChunkWriteWithNull() {
final byte[] serialize = furyMapChunk.serialize(map);
}
@Benchmark
public void testGeneralWriteWithNull() {
final byte[] serialize = fury.serialize(map);
}
}
The performance are basically same, it's a little unexpected. Could you share some profiling data here?
The performance are basically same, it's a little unexpected. Could you share some profiling data here?
generalJavaChunkWrite profiling data
current method generalJavaWrite profiling data
Seems MapChunkWriter is not used anymore, could we remove it ? @Hen1ng