kyuubi
kyuubi copied to clipboard
[KYUUBI #5959][Bug] Flink engine fetch the unbounded data timeout
Flink engine should return the result when fetch timeout but already fetched some data
There are already 3 rows in Kafka when we query from the earlist offset, I tested some cases:
-
kyuubi.session.engine.flink.max.rows=3
&kyuubi.session.engine.flink.fetch.timeout=PT60S
client return 3 rows,this meet our expect
2.kyuubi.session.engine.flink.max.rows=5
& kyuubi.session.engine.flink.fetch.timeout=PT60S
client returns: Futures timed out after [60000 milliseconds], this does not meet our expect.
we shoud return the result when fetch timeout but already fetched some data
:mag: Description
Issue References 🔗
This pull request fixes #5959
Describe Your Solution 🔧
Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
- catch the TimeoutException, set the hasNext flag to false to preventing fetchNext method call again
- add a UT in
org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite
recur this issue.
Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
Test Plan 🧪
Behavior Without This Pull Request :coffin:
Behavior With This Pull Request :tada:
Related Unit Tests
org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite# test result fetch timeout but already fetched some data
Checklist 📝
- [ ] This patch was not authored or co-authored using Generative Tooling
Be nice. Be informative.
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Comparison is base (
698db5b
) 61.29% compared to head (6cc0846
) 61.16%. Report is 1 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #5960 +/- ##
============================================
- Coverage 61.29% 61.16% -0.14%
Complexity 23 23
============================================
Files 622 622
Lines 36882 36882
Branches 5014 5014
============================================
- Hits 22606 22558 -48
- Misses 11850 11882 +32
- Partials 2426 2442 +16
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
@hadoopkandy Thanks for updating!
I have one concern regarding the change: in the current version, if one fetch times out, the engine returns hasNext==ture
, it's up to the client to decide whether to retry fetching. But in the PR, the engine would return hasNext=false
which indicates there's no more records. However, this may not be true, because the Flink job may be still in the initializing status (e.g. restoring states). That would break the current use case.
I suggest optimizing the beeline client and leave the engine as-is.
when timeout is reached, hasNext
should be true
as the streaming query is not finished. beeline
supports --incremental=true
to print the fetched records immediately.
when timeout is reached,
hasNext
should betrue
as the streaming query is not finished.beeline
supports--incremental=true
to print the fetched records immediately.
How about adding a exit-on-fetch-timeout
option to Flink engine?
I'm not very familiar with the streaming process, does Flink/Beam have such a switch to control the behavior of timeout reached but query not completed?
I'm not very familiar with the streaming process, does Flink/Beam have such a switch to control the behavior of timeout reached but query not completed?
As I understand, Streaming query does't have that switch. Considering such case, if there are many rows data in Kafka, we use query from kafka with limit 1 , should we return one row data or timeout ? For jdbc client,I think we shoud return one row. Considering of such situation,when timeout is reached, hasNext should be false as the streaming query is not finished.
the cases are different, if limit 1 is applied, once 1 record is reached, the query is finished, no matter streaming or batch
I'm not very familiar with the streaming process, does Flink/Beam have such a switch to control the behavior of timeout reached but query not completed?
Flink SQL Gateway exits the fetch loop and sets hasMoreResults
to true
once there's a fetch that returns no data. That's, like we discussed before, senmatically incorrect.
See https://github.com/apache/flink/blob/fd673a2f46206ff65978f05fcb96b525696aead2/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java#L265 .
I think we could add a option to make fetch timeout as EOS, while keep the current behavior as default. WDTY? @pan3793 @hadoopkandy