flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-29271]Change to byte array from bytebuffer to improve performance and compatible direct byte buffers

Open jiangjiguang opened this issue 3 years ago • 3 comments
trafficstars

What is the purpose of the change

The PR uses byte array api instead of ByteBuffer api to improve performance and compatible with DirectByteBuffer which does not have byte array backed when reading parquet DataPage V2

Brief change log

  • Use the byte array api instead of ByteBuffer api.

Verifying this change

I have done a test to compare byte array api and ByteBuffer api on parquet-mr project. The result is: [Unpack8ValuesByteArray spent time] 80 ms [Unpack8ValuesByteBuffer spent time] 133 ms

The test code is: package org.apache.parquet.column.values.bitpacking;

import java.nio.ByteBuffer;

public class ByteBufferTest { private static final BytePacker bytePacker = Packer.LITTLE_ENDIAN.newBytePacker(7);

private static final int COUNT = 100000;

public static void main(String[] args) { byte [] in = new byte[1008]; int [] out = new int[1152]; int [] out1 = new int[1152]; int [] out2 = new int[1152];

int res = 0;

for(int i = 0; i < in.length; i++) {
  in[i] = (byte) i;
}

for(int i = 0; i < COUNT; i++) {
  res += unpack8ValuesBytes(in, out, i % out.length);
}

res = 0;
long t1 = System.currentTimeMillis();
for(int i = 0; i < COUNT; i++) {
  res += unpack8ValuesBytes(in, out1, i % out.length);
}
long t2 = System.currentTimeMillis();
System.out.println("[Unpack8ValuesByteArray spent time] " + (t2-t1) + " ms");

ByteBuffer byteBuffer = ByteBuffer.wrap(in);

for(int i = 0; i < COUNT; i++) {
  res += unpack8ValuesByteBuffer(byteBuffer, out, i % out.length);
}

res = 0;
long t3 = System.currentTimeMillis();
for(int i = 0; i < COUNT; i++) {
  res += unpack8ValuesByteBuffer(byteBuffer, out2, i % out.length);
}
long t4 = System.currentTimeMillis();
System.out.println("[Unpack8ValuesByteBuffer spent time] " + (t4-t3) + " ms");

for (int i=0; i<out1.length; i++) {
  if(out1[i] != out2[i]) {
    System.out.println("diff: " + out1[i] + " " + out2[i]);
  }
}

}

private static int unpack8ValuesBytes(byte [] in, int [] out, int ctr) { for(int i = 0, j = 0; i < in.length; i+=7, j+=8) { bytePacker.unpack8Values(in, i, out, j); } return out[ctr]; } private static int unpack8ValuesByteBuffer(ByteBuffer in, int [] out, int ctr) { for(int i = 0, j = 0; i < in.capacity(); i+=7, j+=8) { bytePacker.unpack8Values(in, i, out, j); } return out[ctr]; } }

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

jiangjiguang avatar Sep 14 '22 02:09 jiangjiguang

CI report:

  • b7f2e988c287d2bfc9eb1c643c3742c08e15818a Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Sep 14 '22 02:09 flinkbot

@flinkbot run azure re-run the last Azure build

jiangjiguang avatar Sep 14 '22 11:09 jiangjiguang

@lsyldliu do you want to review this?

wuchong avatar Sep 21 '22 10:09 wuchong

@jiangjiguang Thanks for contribution, I will review it as soon as posssible

lsyldliu avatar Oct 12 '22 12:10 lsyldliu

@lsyldliu Is there a progress ?

jiangjiguang avatar Oct 14 '22 06:10 jiangjiguang

image

jiangjiguang avatar Oct 24 '22 06:10 jiangjiguang

image

jiangjiguang avatar Oct 24 '22 06:10 jiangjiguang

@luoyuxia packer is made by this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); so it impossible to run the abstract parent class BytePacker'method

image

jiangjiguang avatar Oct 24 '22 06:10 jiangjiguang

@jiangjiguang Also, could you please help rebase the master?

luoyuxia avatar Oct 25 '22 02:10 luoyuxia

@luoyuxia I have rebased the master

jiangjiguang avatar Oct 25 '22 06:10 jiangjiguang

@wuchong Could you please help merge this pr, thanks~

jiangjiguang avatar Oct 26 '22 01:10 jiangjiguang