kyuubi icon indicating copy to clipboard operation
kyuubi copied to clipboard

[KYUUBI #5959][Bug] Flink engine fetch the unbounded data timeout

Open hadoopkandy opened this issue 1 year ago • 8 comments

Flink engine should return the result when fetch timeout but already fetched some data

There are already 3 rows in Kafka when we query from the earlist offset, I tested some cases:

  1. kyuubi.session.engine.flink.max.rows=3 & kyuubi.session.engine.flink.fetch.timeout=PT60S client return 3 rows,this meet our expect

2.kyuubi.session.engine.flink.max.rows=5 & kyuubi.session.engine.flink.fetch.timeout=PT60S client returns: Futures timed out after [60000 milliseconds], this does not meet our expect. we shoud return the result when fetch timeout but already fetched some data

:mag: Description

Issue References 🔗

This pull request fixes #5959

Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

  1. catch the TimeoutException, set the hasNext flag to false to preventing fetchNext method call again
  2. add a UT in org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite recur this issue.

Types of changes :bookmark:

  • [x] Bugfix (non-breaking change which fixes an issue)
  • [ ] New feature (non-breaking change which adds functionality)
  • [ ] Breaking change (fix or feature that would cause existing functionality to change)

Test Plan 🧪

Behavior Without This Pull Request :coffin:

Behavior With This Pull Request :tada:

Related Unit Tests

org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite# test result fetch timeout but already fetched some data


Checklist 📝

Be nice. Be informative.

hadoopkandy avatar Jan 10 '24 09:01 hadoopkandy

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Comparison is base (698db5b) 61.29% compared to head (6cc0846) 61.16%. Report is 1 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #5960      +/-   ##
============================================
- Coverage     61.29%   61.16%   -0.14%     
  Complexity       23       23              
============================================
  Files           622      622              
  Lines         36882    36882              
  Branches       5014     5014              
============================================
- Hits          22606    22558      -48     
- Misses        11850    11882      +32     
- Partials       2426     2442      +16     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov-commenter avatar Jan 11 '24 03:01 codecov-commenter

@hadoopkandy Thanks for updating!

I have one concern regarding the change: in the current version, if one fetch times out, the engine returns hasNext==ture, it's up to the client to decide whether to retry fetching. But in the PR, the engine would return hasNext=false which indicates there's no more records. However, this may not be true, because the Flink job may be still in the initializing status (e.g. restoring states). That would break the current use case.

I suggest optimizing the beeline client and leave the engine as-is.

link3280 avatar Jan 12 '24 09:01 link3280

when timeout is reached, hasNext should be true as the streaming query is not finished. beeline supports --incremental=true to print the fetched records immediately.

pan3793 avatar Jan 15 '24 04:01 pan3793

when timeout is reached, hasNext should be true as the streaming query is not finished. beeline supports --incremental=true to print the fetched records immediately.

How about adding a exit-on-fetch-timeout option to Flink engine?

link3280 avatar Jan 15 '24 04:01 link3280

I'm not very familiar with the streaming process, does Flink/Beam have such a switch to control the behavior of timeout reached but query not completed?

pan3793 avatar Jan 15 '24 04:01 pan3793

I'm not very familiar with the streaming process, does Flink/Beam have such a switch to control the behavior of timeout reached but query not completed?

As I understand, Streaming query does't have that switch. Considering such case, if there are many rows data in Kafka, we use query from kafka with limit 1 , should we return one row data or timeout ? For jdbc client,I think we shoud return one row. Considering of such situation,when timeout is reached, hasNext should be false as the streaming query is not finished.

hadoopkandy avatar Jan 16 '24 03:01 hadoopkandy

the cases are different, if limit 1 is applied, once 1 record is reached, the query is finished, no matter streaming or batch

pan3793 avatar Jan 17 '24 11:01 pan3793

I'm not very familiar with the streaming process, does Flink/Beam have such a switch to control the behavior of timeout reached but query not completed?

Flink SQL Gateway exits the fetch loop and sets hasMoreResults to true once there's a fetch that returns no data. That's, like we discussed before, senmatically incorrect.

See https://github.com/apache/flink/blob/fd673a2f46206ff65978f05fcb96b525696aead2/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java#L265 .

I think we could add a option to make fetch timeout as EOS, while keep the current behavior as default. WDTY? @pan3793 @hadoopkandy

link3280 avatar Jan 20 '24 09:01 link3280