flink
flink copied to clipboard
[FLINK-36124][filesystem][s3]Make S3RecoverableFsDataOutputStream.sync not to close the stream
What is the purpose of the change
At the moment S3RecoverableFsDataOutputStream.sync
closing the stream and all upcoming write
operations fail consistently with the following exception:
java.io.IOException: Stream closed.
at org.apache.flink.core.fs.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:72)
at org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:52)
at org.apache.flink.core.fs.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104)
at org.apache.flink.core.fs.RefCountedBufferingFileStream.write(RefCountedBufferingFileStream.java:87)
at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.write(S3RecoverableFsDataOutputStream.java:112)
at java.base/java.io.OutputStream.write(OutputStream.java:122)
In this PR instead of closing the file creating a new and uploading the old content.
Brief change log
Made S3RecoverableFsDataOutputStream.sync
not to close the stream.
Verifying this change
Adopted the existing unit test.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: yes
Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable