flink
flink copied to clipboard
[FLINK-29271]Change to byte array from bytebuffer to improve performance and compatible direct byte buffers
What is the purpose of the change
The PR uses byte array api instead of ByteBuffer api to improve performance and compatible with DirectByteBuffer which does not have byte array backed when reading parquet DataPage V2
Brief change log
- Use the byte array api instead of ByteBuffer api.
Verifying this change
I have done a test to compare byte array api and ByteBuffer api on parquet-mr project. The result is: [Unpack8ValuesByteArray spent time] 80 ms [Unpack8ValuesByteBuffer spent time] 133 ms
The test code is: package org.apache.parquet.column.values.bitpacking;
import java.nio.ByteBuffer;
public class ByteBufferTest { private static final BytePacker bytePacker = Packer.LITTLE_ENDIAN.newBytePacker(7);
private static final int COUNT = 100000;
public static void main(String[] args) { byte [] in = new byte[1008]; int [] out = new int[1152]; int [] out1 = new int[1152]; int [] out2 = new int[1152];
int res = 0;
for(int i = 0; i < in.length; i++) {
in[i] = (byte) i;
}
for(int i = 0; i < COUNT; i++) {
res += unpack8ValuesBytes(in, out, i % out.length);
}
res = 0;
long t1 = System.currentTimeMillis();
for(int i = 0; i < COUNT; i++) {
res += unpack8ValuesBytes(in, out1, i % out.length);
}
long t2 = System.currentTimeMillis();
System.out.println("[Unpack8ValuesByteArray spent time] " + (t2-t1) + " ms");
ByteBuffer byteBuffer = ByteBuffer.wrap(in);
for(int i = 0; i < COUNT; i++) {
res += unpack8ValuesByteBuffer(byteBuffer, out, i % out.length);
}
res = 0;
long t3 = System.currentTimeMillis();
for(int i = 0; i < COUNT; i++) {
res += unpack8ValuesByteBuffer(byteBuffer, out2, i % out.length);
}
long t4 = System.currentTimeMillis();
System.out.println("[Unpack8ValuesByteBuffer spent time] " + (t4-t3) + " ms");
for (int i=0; i<out1.length; i++) {
if(out1[i] != out2[i]) {
System.out.println("diff: " + out1[i] + " " + out2[i]);
}
}
}
private static int unpack8ValuesBytes(byte [] in, int [] out, int ctr) { for(int i = 0, j = 0; i < in.length; i+=7, j+=8) { bytePacker.unpack8Values(in, i, out, j); } return out[ctr]; } private static int unpack8ValuesByteBuffer(ByteBuffer in, int [] out, int ctr) { for(int i = 0, j = 0; i < in.capacity(); i+=7, j+=8) { bytePacker.unpack8Values(in, i, out, j); } return out[ctr]; } }
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- b7f2e988c287d2bfc9eb1c643c3742c08e15818a Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
@flinkbot run azure re-run the last Azure build
@lsyldliu do you want to review this?
@jiangjiguang Thanks for contribution, I will review it as soon as posssible
@lsyldliu Is there a progress ?


@luoyuxia packer is made by this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); so it impossible to run the abstract parent class BytePacker'method

@jiangjiguang Also, could you please help rebase the master?
@luoyuxia I have rebased the master
@wuchong Could you please help merge this pr, thanks~