flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-33536] Fix Flink Table API CSV streaming sink fails with IOException: Stream closed

Open Samrat002 opened this issue 1 year ago • 7 comments

What is the purpose of the change

Fix the error due to IOException: Stream closed

Issue is caused due to

committer.commitAfterRecovery();
closeForCommit();

This is causing the IOException and closing for next commit

Verifying this change

In sql client

Flink SQL> create table dummy_table (
>   id int,
>   data string
> ) with (
>   'connector' = 'filesystem',
>   'path' = 's3://prabhuflinks3/xyz_table/',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO dummy_table2023-11-15 15:31:23,277 INFO  org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at ip-172-31-34-252.us-west-2.compute.internal/172.31.34.252:8032
2023-11-15 15:31:23,500 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at ip-172-31-34-252.us-west-2.compute.internal/172.31.34.252:10200
2023-11-15 15:31:23,508 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-11-15 15:31:23,510 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2023-11-15 15:31:23,613 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface ip-172-31-33-40.us-west-2.compute.internal:45505 of application 'application_1700033066610_0012'.

[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8e2c6835714dd08b3f2ab0a0d1dd7b18


Flink SQL> select * from dummy_table;2023-11-15 15:32:31,324 INFO  org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at ip-172-xx-xx-xxx.us-west-2.compute.internal/172.31.34.252:8032
2023-11-15 15:32:31,328 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at ip-172-xx-xx-xxx.us-west-2.compute.internal/172.31.34.252:10200
2023-11-15 15:32:31,328 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-11-15 15:32:31,328 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2023-11-15 15:32:31,338 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface ip-172-31-33-40.us-west-2.compute.internal:45505 of application 'application_1700033066610_0012'.

[INFO] Result retrieval cancelled.

Verift data in s3 properly

          id                           data
           1                    Hello World
           2                             Hi
           2                             Hi
           3                          Hello
           3                          World
           4                            ADD
           5                           LINE


Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no) no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no) no
  • The serializers: (yes / no / don't know) no
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know) no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no
  • The S3 file system connector: (yes / no / don't know) yes

Documentation

  • Does this pull request introduce a new feature? (yes / no) no
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) not applicable

Samrat002 avatar Nov 15 '23 15:11 Samrat002

CI report:

  • 55394279eee5edfc12df8d9185564ca53ebab2b7 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Nov 15 '23 15:11 flinkbot

@PrabhuJoseph @MartijnVisser Please review whenever time

Samrat002 avatar Nov 15 '23 19:11 Samrat002

@PrabhuJoseph Please review whenever time

Samrat002 avatar Dec 02 '23 13:12 Samrat002

@Samrat002 I have verified the patch with filesystem connector using both csv and json format. The patch is working fine.

LGTM, +1 (non-binding)

PrabhuJoseph avatar Dec 10 '23 16:12 PrabhuJoseph

@MartijnVisser please review whenever time

Samrat002 avatar Dec 14 '23 09:12 Samrat002

Thank you, @MartijnVisser, for diving into the PR.

Before the merge of https://github.com/apache/flink/pull/21458, this issue was already there

How to reproduce this issue

  1. Create a cluster and run Flink session.
  2. Ensure S3 is accessible from the cluster, and all keychains are configured properly.
  3. For simplicity, open a SQL client and execute the following commands:
CREATE TABLE test_table (
   id INT,
   data STRING
 ) WITH (
   'connector' = 'filesystem',
   'path' = 's3://<bucket_name>/<path_to_table>/',
   'format' = 'csv'
 );

INSERT INTO test_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (3, 'Hi'), (4, 'Hello'), (5, 'World'), (6, 'ADD'), (7, 'LINE'));

Note: Ensure updating <bucket_name> and <path_to_table> before executing the above 2 queries.

This issue occurs only when the csv format is used to write data into S3. Other formats like json or any other delimiter don't cause any errors.

Why this error only with CSV

In my code analysis, when the csv format is used in any connector, it calls a class CSVBulkWriter, which implements the BulkWriter interface.

Data is written to S3 when the flush method is executed. CSVBulkWriter contains an instance stream of type FsDataOutputStream. When S3 is invoked, it creates an object of S3RecoverableFsDataOutputStream (S3RecoverableFsDataOutputStream extends RecoverableFsDataOutputStream, and RecoverableFsDataOutputStream extends FSDataOutputStream). S3RecoverableFsDataOutputStream creates a stream of type RefCountedBufferingFileStream. The sync method call delegates to this object, and this is not a supported operation in RefCountedBufferingFileStream.

Next Step

I will look into the E2E test_file_sink.sh s3 StreamingFileSink and try to add tests that covers this flow

Samrat002 avatar Jan 01 '24 08:01 Samrat002

Hi @MartijnVisser , @hlteoh37 ,

I found that test_file_sink.sh s3 StreamingFileSink doesnt have test case for csv format . https://issues.apache.org/jira/browse/FLINK-33995 i will work on it and raise a pr for the same

Samrat002 avatar Jan 04 '24 17:01 Samrat002