flink
flink copied to clipboard
[FLINK-33536] Fix Flink Table API CSV streaming sink fails with IOException: Stream closed
What is the purpose of the change
Fix the error due to IOException: Stream closed
Issue is caused due to
committer.commitAfterRecovery();
closeForCommit();
This is causing the IOException and closing for next commit
Verifying this change
In sql client
Flink SQL> create table dummy_table (
> id int,
> data string
> ) with (
> 'connector' = 'filesystem',
> 'path' = 's3://prabhuflinks3/xyz_table/',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.
Flink SQL> INSERT INTO dummy_table2023-11-15 15:31:23,277 INFO org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at ip-172-31-34-252.us-west-2.compute.internal/172.31.34.252:8032
2023-11-15 15:31:23,500 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at ip-172-31-34-252.us-west-2.compute.internal/172.31.34.252:10200
2023-11-15 15:31:23,508 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-11-15 15:31:23,510 WARN org.apache.flink.yarn.YarnClusterDescriptor [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2023-11-15 15:31:23,613 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface ip-172-31-33-40.us-west-2.compute.internal:45505 of application 'application_1700033066610_0012'.
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8e2c6835714dd08b3f2ab0a0d1dd7b18
Flink SQL> select * from dummy_table;2023-11-15 15:32:31,324 INFO org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at ip-172-xx-xx-xxx.us-west-2.compute.internal/172.31.34.252:8032
2023-11-15 15:32:31,328 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at ip-172-xx-xx-xxx.us-west-2.compute.internal/172.31.34.252:10200
2023-11-15 15:32:31,328 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-11-15 15:32:31,328 WARN org.apache.flink.yarn.YarnClusterDescriptor [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2023-11-15 15:32:31,338 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface ip-172-31-33-40.us-west-2.compute.internal:45505 of application 'application_1700033066610_0012'.
[INFO] Result retrieval cancelled.
Verift data in s3 properly
id data
1 Hello World
2 Hi
2 Hi
3 Hello
3 World
4 ADD
5 LINE
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no) no
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: (yes / no) no - The serializers: (yes / no / don't know) no
- The runtime per-record code paths (performance sensitive): (yes / no / don't know) no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no
- The S3 file system connector: (yes / no / don't know) yes
Documentation
- Does this pull request introduce a new feature? (yes / no) no
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) not applicable
CI report:
- 55394279eee5edfc12df8d9185564ca53ebab2b7 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
@PrabhuJoseph @MartijnVisser Please review whenever time
@PrabhuJoseph Please review whenever time
@Samrat002 I have verified the patch with filesystem connector using both csv and json format. The patch is working fine.
LGTM, +1 (non-binding)
@MartijnVisser please review whenever time
Thank you, @MartijnVisser, for diving into the PR.
Before the merge of https://github.com/apache/flink/pull/21458, this issue was already there
How to reproduce this issue
- Create a cluster and run Flink session.
- Ensure S3 is accessible from the cluster, and all keychains are configured properly.
- For simplicity, open a SQL client and execute the following commands:
CREATE TABLE test_table (
id INT,
data STRING
) WITH (
'connector' = 'filesystem',
'path' = 's3://<bucket_name>/<path_to_table>/',
'format' = 'csv'
);
INSERT INTO test_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (3, 'Hi'), (4, 'Hello'), (5, 'World'), (6, 'ADD'), (7, 'LINE'));
Note: Ensure updating <bucket_name>
and <path_to_table>
before executing the above 2 queries.
This issue occurs only when the csv
format is used to write data into S3. Other formats like json
or any other delimiter don't cause any errors.
Why this error only with CSV
In my code analysis, when the csv
format is used in any connector, it calls a class CSVBulkWriter
, which implements the BulkWriter
interface.
Data is written to S3 when the flush
method is executed. CSVBulkWriter
contains an instance stream
of type FsDataOutputStream
. When S3 is invoked, it creates an object of S3RecoverableFsDataOutputStream
(S3RecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
, and RecoverableFsDataOutputStream
extends FSDataOutputStream
). S3RecoverableFsDataOutputStream
creates a stream of type RefCountedBufferingFileStream
. The sync
method call delegates to this object, and this is not a supported operation in RefCountedBufferingFileStream
.
Next Step
I will look into the E2E test_file_sink.sh s3 StreamingFileSink
and try to add tests that covers this flow
Hi @MartijnVisser , @hlteoh37 ,
I found that test_file_sink.sh s3 StreamingFileSink
doesnt have test case for csv format .
https://issues.apache.org/jira/browse/FLINK-33995
i will work on it and raise a pr for the same