kyuubi
kyuubi copied to clipboard
[BUG] [kyuubi-flink-sql-engine] FlinkSQL returns duplicate results when executing query statement
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Search before asking
- [X] I have searched in the issues and found no similar issues.
Describe the bug
The runQueryOperation method repeats the snapshotResult
every 50ms and adds it to the rows
. At this point, the Query will not have to return the result until the accumulation of rows
rows over kyuubi.session.engine.flink.max.rows
set value or FlinkSQL end.
It is not reasonable to add snapshotResult
repeatedly to rows
in runQueryOperation
. Should the result be returned directly after snapshotResult is fetched?
These are the two changes I suggested. Are they appropriate?
- When
snapshotResult
gets the result, jumps out of the loop.
if (rows.size != 0) {
loop = false
}
- Loop sets the maximum timeout to avoid being unable to exit the loop if the result is null.
Environment
- Flink 1.14.0
- Kyuubi 1.7.0-SNAPSHOT(Compile from master)
Reference UT
test("data is repeatedly added to the resultSet") {
withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "20"))(Map.empty) {
withJdbcStatement() { statement =>
statement.execute(
"""
|create table tbl_src (
| a bigint
| ) with (
| 'connector' = 'datagen',
| 'rows-per-second'='1',
| 'fields.a.kind'='sequence',
| 'fields.a.start'='1',
| 'fields.a.end'='5'
| )
|""".stripMargin)
val resultSet = statement.executeQuery(s"select a from tbl_src")
var rows = List[Long]()
while (resultSet.next()) {
rows :+= resultSet.getLong("a")
}
// rows size more than the input data
assert(rows.size <= 5)
}
}
}
Affects Version(s)
master
Kyuubi Server Log Output
No response
Kyuubi Engine Log Output
No response
Kyuubi Server Configurations
kyuubi.engine.type FLINK_SQL
Kyuubi Engine Configurations
No response
Additional context
Test Case
Case1
kyuubi-defaults.conf
uses the default configuration and generates an unbound source through datagen
. At this point, Kyuubi does not have a result output for a long time, and the FlinkSQL job does not finish.
Connected to: Apache Flink (version 1.14.0)
Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
0: jdbc:hive2://localhost:10009/> create table tbl_src (
. . . . . . . . . . . . . . . . . > a bigint
. . . . . . . . . . . . . . . . . > ) with (
. . . . . . . . . . . . . . . . . > 'connector' = 'datagen',
. . . . . . . . . . . . . . . . . > 'rows-per-second'='1'
. . . . . . . . . . . . . . . . . > );
...
query[7ca29418-03ff-40d9-bdba-b46eedb560b3]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.001 seconds
+---------+
| result |
+---------+
+---------+
No rows selected (0.066 seconds)
0: jdbc:hive2://localhost:10009/> select * from tbl_src;
Case2
kyuubi-defaults.conf
uses the default configuration and generates an bound source through datagen
. At this point, duplicate results are returned after the FlinkSQL job status isFINISHED
.
Connected to: Apache Flink (version 1.14.0)
Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
0: jdbc:hive2://localhost:10009/> create table tbl_src (
. . . . . . . . . . . . . . . . . > a bigint
. . . . . . . . . . . . . . . . . > ) with (
. . . . . . . . . . . . . . . . . > 'connector' = 'datagen',
. . . . . . . . . . . . . . . . . > 'rows-per-second'='1',
. . . . . . . . . . . . . . . . . > 'fields.a.kind'='sequence',
. . . . . . . . . . . . . . . . . > 'fields.a.start'='1',
. . . . . . . . . . . . . . . . . > 'fields.a.end'='5'
. . . . . . . . . . . . . . . . . > );
...
query[fb4fbfe3-7e7c-45d0-97a2-f4b5438714ee]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.004 seconds
+---------+
| result |
+---------+
+---------+
No rows selected (0.058 seconds)
0: jdbc:hive2://localhost:10009/> select * from tbl_src;
...
2023-01-04 10:06:20.336 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher: An exception occurred when fetching query results
java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (ca16d67ca30cfe4ea6e3fc0f93c65d21)
at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:917)
at org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:931)
at org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:719)
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
End of exception on server side>]
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_211]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_211]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) [flink-table_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74) [flink-sql-client_2.12-1.14.0.jar:1.14.0]
...
query[e1581803-04c4-4fb0-a09b-f2e8ffd0b157]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.02 seconds
+----+
| a |
+----+
| 1 |
| 2 |
| 3 |
| 4 |
| 1 |
| 2 |
| 3 |
| 4 |
...
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
+----+
270 rows selected (6.675 seconds)
Case3
kyuubi-defaults.conf
set kyuubi.session.engine.flink.max.rows 100
, and generates an bound source through datagen
. At this point, 100 duplicate results will be returned and the FlinkSQL job state will be 'CANCELED'.
Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
0: jdbc:hive2://localhost:10009/> create table tbl_src (
. . . . . . . . . . . . . . . . . > a bigint
. . . . . . . . . . . . . . . . . > ) with (
. . . . . . . . . . . . . . . . . > 'connector' = 'datagen',
. . . . . . . . . . . . . . . . . > 'rows-per-second'='1',
. . . . . . . . . . . . . . . . . > 'fields.a.kind'='sequence',
. . . . . . . . . . . . . . . . . > 'fields.a.start'='1',
. . . . . . . . . . . . . . . . . > 'fields.a.end'='5'
. . . . . . . . . . . . . . . . . > );
...
query[31a19428-0f34-4b2e-9f21-1acf8e5e1885]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.031 seconds
+---------+
| result |
+---------+
| OK |
+---------+
1 row selected (0.315 seconds)
0: jdbc:hive2://localhost:10009/> select * from tbl_src;
...
2023-01-04 10:14:14.304 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher: Interrupted when sleeping before a retry
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_211]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) [flink-table_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74) [flink-sql-client_2.12-1.14.0.jar:1.14.0]
...
2023-01-04 10:14:14.315 INFO org.apache.kyuubi.engine.flink.operation.ExecuteStatement: Processing anonymous's query[063b61fe-8e20-4325-ac25-762f65b3ccf2]: RUNNING_STATE -> FINISHED_STATE, time taken: 4.648 seconds
2023-01-04 10:14:14.321 INFO org.apache.kyuubi.operation.ExecuteStatement: Query[4e003511-c4c9-4661-bd48-d77822f7352c] in FINISHED_STATE
2023-01-04 10:14:14.322 INFO org.apache.kyuubi.operation.ExecuteStatement: Processing anonymous's query[4e003511-c4c9-4661-bd48-d77822f7352c]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.004 seconds
+----+
| a |
+----+
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 1 |
| 2 |
| 1 |
| 2 |
...
| 1 |
| 2 |
| 3 |
| 1 |
| 2 |
+----+
100 rows selected (4.702 seconds)
Are you willing to submit PR?
- [ ] Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
- [ ] No. I cannot submit a PR at this time.
Hello @bobo495, Thanks for finding the time to report the issue! We really appreciate the community's efforts to improve Apache Kyuubi.
cc @link3280 @deadwind4 @SteNicholas
@bobo495 Thanks for reporting the problem. I agree that the current implementation of the result retrieval of the Flink engine is improvable. The final solution would be refactoring it to return results in a streaming manner, which is mentioned before but the related work doesn't start yet.
WRT your suggestions:
When snapshotResult gets the result, jumps out of the loop.
I think the current behavior is expected. snapshotResult
returns paged returns, and we fetch them page by page.
Loop sets the maximum timeout to avoid being unable to exit the loop if the result is null.
It's a viable approach in addition to max result rows as for now.
cc @bowenliang123
Loop sets the maximum timeout to avoid being unable to exit the loop if the result is null.
It's a viable approach in addition to max result rows as for now.
@bobo495 @link3280 I also agree with adding query timeout to prevent stream jobs running forever when the max result rows cannot be met or never will be. And I will submit the corresponding PR as soon as possible.
Query timeout is one of the possible paths. And also using max loop tries is worth considering. With limited messages in an unbound upstream source (like Kafka), the plain query timeout will force end-user to wait for enough time cost. And max loop retries may help to reduce the waiting time in this case.
@yuruguo Thanks a lot for your help! In another way, we could turn to the incremental fetching approach, which solves the problem once for all. I've started some related work and could get this done in the next release.
For the stream job,i personally think flink sql engine is unfriendly.I got the same problem for datagen query in kyuubi, the query only 20 result expected, but return 4K+ result. By the way, when I submit stream job by insert operation, the beeline client(use jdbc is the same) will block,I think is not good for the sql gateway.
@weaksloth Thanks for your feedback. I agree Flink engine is not production ready yet and we have a plan to improve this situation (see https://github.com/apache/kyuubi/issues/2100). If things go well, we would get many issues fixed in the next release.
By the way, when I submit stream job by insert operation, the beeline client(use jdbc is the same) will block,I think is not good for the sql gateway.
Did you set execution.attached
to true
in your flink conf?
@link3280 Thank you! I'm looking forward the next release.
I have tried to set execution.attached: true
in my flink-conf.yaml, I can see this config option in my flink web ui's jobmanager configuration page, but the beeline client still blocked.
my env info: kyuubi: 1.6.1 flink: 1.15.3 flink cluster mode: standalone
@weaksloth you're right. I've confirmed that the connection would be blocked by Flink insert operations. The problem lies in https://issues.apache.org/jira/browse/FLINK-24461, which refactors the fetching of Flink table results and breaks the rule that the insert results would return immediately.
Kyuubi 1.6.x and above would be affected. I created an issue to track this https://github.com/apache/kyuubi/issues/4446.
@link3280 Thank you, I've got it. I will also pay attention to this issue😀
I think the flink engine completely uses the asynchronous mode, and it is wrong for the insert statement to return the jobid. In the offline scenario, it is still hoped that the block will end. I think it is possible to set both asynchronous and synchronous modes.
I think the flink engine completely uses the asynchronous mode, and it is wrong for the insert statement to return the jobid. In the offline scenario, it is still hoped that the block will end. I think it is possible to set both asynchronous and synchronous modes.
@waywtdcc Thanks for your input. Do you think insert statements should always return the number of affected rows? By default, Flink returns -1
as the number of affected rows for inserts, is that what you expected?
I think it is possible to set both asynchronous and synchronous modes.
Agreed. Flink supports both modes natively, we could make use of that if necessary. WDYT? @pan3793
Hello, does the flink sql engine data duplication problem still exist?