flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-29347] [runtime] Fix ByteStreamStateHandle#read return -1 when read count is 0

Open Shenjiaqi opened this issue 1 year ago • 3 comments

What is the purpose of the change

When protobuf serializer serializes an object, which is built directly with builder without assign any value to field, the serializer will generate a zero length byte[] and then write it into state with content '\0'(indicates zero length data).

When recovered from checkpoint, protobuf seralizer deserialize the data. It get length 0, and call InputStream#read(byte[] bytes, int offset, int count) with count = 0.

The underlying Input implementation is NoFetchingInput. It will call Inputsteam#read(byte[] bytes, int offset, int count) with count = 0.

The InputStream implementation is ByteStateHandleInputStream,It will return -1 as long as no data left in memory, even if count is 0.

Any seralizer/deserializer with implementation like protobuf will encounter problem like this (can not recovery from checkpoint with empty length serialized value).

Brief change log

Add check before return -1 in ByteStateHandleInputStream#read(byte[] bytes, int offset, int count). If caller reads 0 bytes, it should always return 0 instead of -1.

Verifying this change

This change added tests and can be verified as follows:

  • Use a protobuf generated class as value in list state. Build object of the class with builder and set no field. Try recovery from checkpoint. Before this fix, recovery should fail.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (don't know)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

Shenjiaqi avatar Sep 20 '22 06:09 Shenjiaqi

CI report:

  • f8156ccd407493119fa7f85dea6ca6e675a427d1 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Sep 20 '22 06:09 flinkbot

@flinkbot run azure

Shenjiaqi avatar Sep 21 '22 10:09 Shenjiaqi

@Shenjiaqi Thanks for this PR, LGTM. Could you please add the ticket id and module name [FLINK-29347] [runtime] to the commit message?

@Myasuka Would you like to take a look?

fredia avatar Sep 23 '22 05:09 fredia

  • @flinkbot run azure

@flinkbot run azure

Shenjiaqi avatar Oct 25 '22 06:10 Shenjiaqi

Thanks for the update, please resolve the broken CI due to compiling problem.

Thanks for review. Now code is fixed and CI passed.

Shenjiaqi avatar Oct 26 '22 02:10 Shenjiaqi