[FLINK-29347] [runtime] Fix ByteStreamStateHandle#read return -1 when read count is 0
What is the purpose of the change
When protobuf serializer serializes an object, which is built directly with builder without assign any value to field, the serializer will generate a zero length byte[] and then write it into state with content '\0'(indicates zero length data).
When recovered from checkpoint, protobuf seralizer deserialize the data. It get length 0, and call InputStream#read(byte[] bytes, int offset, int count) with count = 0.
The underlying Input implementation is NoFetchingInput. It will call Inputsteam#read(byte[] bytes, int offset, int count) with count = 0.
The InputStream implementation is ByteStateHandleInputStream,It will return -1 as long as no data left in memory, even if count is 0.
Any seralizer/deserializer with implementation like protobuf will encounter problem like this (can not recovery from checkpoint with empty length serialized value).
Brief change log
Add check before return -1 in ByteStateHandleInputStream#read(byte[] bytes, int offset, int count). If caller reads 0 bytes, it should always return 0 instead of -1.
Verifying this change
This change added tests and can be verified as follows:
- Use a protobuf generated class as value in list state. Build object of the class with builder and set no field. Try recovery from checkpoint. Before this fix, recovery should fail.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (no) - The serializers: (don't know)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
- The S3 file system connector: (no)
Documentation
- Does this pull request introduce a new feature? (no)
CI report:
- f8156ccd407493119fa7f85dea6ca6e675a427d1 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
@flinkbot run azure
@Shenjiaqi Thanks for this PR, LGTM.
Could you please add the ticket id and module name [FLINK-29347] [runtime] to the commit message?
@Myasuka Would you like to take a look?
- @flinkbot run azure
@flinkbot run azure
Thanks for the update, please resolve the broken CI due to compiling problem.
Thanks for review. Now code is fixed and CI passed.