kyuubi icon indicating copy to clipboard operation
kyuubi copied to clipboard

[BUG] [kyuubi-flink-sql-engine] FlinkSQL returns duplicate results when executing query statement

Open bobo495 opened this issue 2 years ago • 16 comments

Code of Conduct

Search before asking

  • [X] I have searched in the issues and found no similar issues.

Describe the bug

The runQueryOperation method repeats the snapshotResult every 50ms and adds it to the rows. At this point, the Query will not have to return the result until the accumulation of rows rows over kyuubi.session.engine.flink.max.rows set value or FlinkSQL end.

It is not reasonable to add snapshotResult repeatedly to rows in runQueryOperation. Should the result be returned directly after snapshotResult is fetched?

These are the two changes I suggested. Are they appropriate?

  1. When snapshotResult gets the result, jumps out of the loop.
if (rows.size != 0) {
    loop = false
}
  1. Loop sets the maximum timeout to avoid being unable to exit the loop if the result is null.

Environment

  • Flink 1.14.0
  • Kyuubi 1.7.0-SNAPSHOT(Compile from master)

Reference UT

  test("data is repeatedly added to the resultSet") {
    withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "20"))(Map.empty) {
      withJdbcStatement() { statement =>
        statement.execute(
          """
            |create table tbl_src (
            |           a bigint
            |           ) with (
            |           'connector' = 'datagen',
            |          'rows-per-second'='1',
            |          'fields.a.kind'='sequence',
            |          'fields.a.start'='1',
            |          'fields.a.end'='5'
            |          )
            |""".stripMargin)
        val resultSet = statement.executeQuery(s"select a from tbl_src")
        var rows = List[Long]()
        while (resultSet.next()) {
          rows :+= resultSet.getLong("a")
        }
        // rows size more than the input data
        assert(rows.size <= 5)
      }
    }
  }

shot_2023-01-04_18 05 38

Affects Version(s)

master

Kyuubi Server Log Output

No response

Kyuubi Engine Log Output

No response

Kyuubi Server Configurations

kyuubi.engine.type FLINK_SQL

Kyuubi Engine Configurations

No response

Additional context

Test Case

Case1

kyuubi-defaults.conf uses the default configuration and generates an unbound source through datagen. At this point, Kyuubi does not have a result output for a long time, and the FlinkSQL job does not finish.

Connected to: Apache Flink (version 1.14.0)
Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
0: jdbc:hive2://localhost:10009/> create table tbl_src (
. . . . . . . . . . . . . . . . . >            a bigint
. . . . . . . . . . . . . . . . . >            ) with (
. . . . . . . . . . . . . . . . . >            'connector' = 'datagen',
. . . . . . . . . . . . . . . . . >           'rows-per-second'='1'
. . . . . . . . . . . . . . . . . >           );
...
query[7ca29418-03ff-40d9-bdba-b46eedb560b3]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.001 seconds
+---------+
| result  |
+---------+
+---------+
No rows selected (0.066 seconds)
0: jdbc:hive2://localhost:10009/> select * from tbl_src;

Case2

kyuubi-defaults.conf uses the default configuration and generates an bound source through datagen. At this point, duplicate results are returned after the FlinkSQL job status isFINISHED.

Connected to: Apache Flink (version 1.14.0)
Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
0: jdbc:hive2://localhost:10009/> create table tbl_src (
. . . . . . . . . . . . . . . . . >            a bigint
. . . . . . . . . . . . . . . . . >            ) with (
. . . . . . . . . . . . . . . . . >            'connector' = 'datagen',
. . . . . . . . . . . . . . . . . >           'rows-per-second'='1',
. . . . . . . . . . . . . . . . . >           'fields.a.kind'='sequence',
. . . . . . . . . . . . . . . . . >           'fields.a.start'='1',
. . . . . . . . . . . . . . . . . >           'fields.a.end'='5'
. . . . . . . . . . . . . . . . . >           );
...
query[fb4fbfe3-7e7c-45d0-97a2-f4b5438714ee]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.004 seconds
+---------+
| result  |
+---------+
+---------+
No rows selected (0.058 seconds)
0: jdbc:hive2://localhost:10009/> select * from tbl_src;
...
2023-01-04 10:06:20.336 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher: An exception occurred when fetching query results
java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (ca16d67ca30cfe4ea6e3fc0f93c65d21)
	at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:917)
	at org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:931)
	at org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:719)
	at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

End of exception on server side>]
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_211]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_211]
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) [flink-table_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74) [flink-sql-client_2.12-1.14.0.jar:1.14.0]
...
query[e1581803-04c4-4fb0-a09b-f2e8ffd0b157]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.02 seconds
+----+
| a  |
+----+
| 1  |
| 2  |
| 3  |
| 4  |
| 1  |
| 2  |
| 3  |
| 4  |
...
| 1  |
| 2  |
| 3  |
| 4  |
| 5  |
| 1  |
| 2  |
| 3  |
| 4  |
| 5  |
+----+
270 rows selected (6.675 seconds)

Case3

kyuubi-defaults.conf set kyuubi.session.engine.flink.max.rows 100, and generates an bound source through datagen. At this point, 100 duplicate results will be returned and the FlinkSQL job state will be 'CANCELED'.

Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
0: jdbc:hive2://localhost:10009/> create table tbl_src (
. . . . . . . . . . . . . . . . . >            a bigint
. . . . . . . . . . . . . . . . . >            ) with (
. . . . . . . . . . . . . . . . . >            'connector' = 'datagen',
. . . . . . . . . . . . . . . . . >           'rows-per-second'='1',
. . . . . . . . . . . . . . . . . >           'fields.a.kind'='sequence',
. . . . . . . . . . . . . . . . . >           'fields.a.start'='1',
. . . . . . . . . . . . . . . . . >           'fields.a.end'='5'
. . . . . . . . . . . . . . . . . >           );
...
query[31a19428-0f34-4b2e-9f21-1acf8e5e1885]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.031 seconds
+---------+
| result  |
+---------+
| OK      |
+---------+
1 row selected (0.315 seconds)
0: jdbc:hive2://localhost:10009/> select * from tbl_src;
...
2023-01-04 10:14:14.304 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher: Interrupted when sleeping before a retry
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_211]
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [flink-dist_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) [flink-table_2.12-1.14.0.jar:1.14.0]
	at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74) [flink-sql-client_2.12-1.14.0.jar:1.14.0]
...
2023-01-04 10:14:14.315 INFO org.apache.kyuubi.engine.flink.operation.ExecuteStatement: Processing anonymous's query[063b61fe-8e20-4325-ac25-762f65b3ccf2]: RUNNING_STATE -> FINISHED_STATE, time taken: 4.648 seconds
2023-01-04 10:14:14.321 INFO org.apache.kyuubi.operation.ExecuteStatement: Query[4e003511-c4c9-4661-bd48-d77822f7352c] in FINISHED_STATE
2023-01-04 10:14:14.322 INFO org.apache.kyuubi.operation.ExecuteStatement: Processing anonymous's query[4e003511-c4c9-4661-bd48-d77822f7352c]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.004 seconds
+----+
| a  |
+----+
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 1  |
| 2  |
| 1  |
| 2  |
...
| 1  |
| 2  |
| 3  |
| 1  |
| 2  |
+----+
100 rows selected (4.702 seconds)

Are you willing to submit PR?

  • [ ] Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
  • [ ] No. I cannot submit a PR at this time.

bobo495 avatar Jan 04 '23 10:01 bobo495

Hello @bobo495, Thanks for finding the time to report the issue! We really appreciate the community's efforts to improve Apache Kyuubi.

github-actions[bot] avatar Jan 04 '23 10:01 github-actions[bot]

cc @link3280 @deadwind4 @SteNicholas

pan3793 avatar Jan 04 '23 10:01 pan3793

@bobo495 Thanks for reporting the problem. I agree that the current implementation of the result retrieval of the Flink engine is improvable. The final solution would be refactoring it to return results in a streaming manner, which is mentioned before but the related work doesn't start yet.

WRT your suggestions:

When snapshotResult gets the result, jumps out of the loop.

I think the current behavior is expected. snapshotResult returns paged returns, and we fetch them page by page.

Loop sets the maximum timeout to avoid being unable to exit the loop if the result is null.

It's a viable approach in addition to max result rows as for now.

link3280 avatar Jan 04 '23 10:01 link3280

cc @bowenliang123

bowenliang123 avatar Jan 04 '23 10:01 bowenliang123

Loop sets the maximum timeout to avoid being unable to exit the loop if the result is null. It's a viable approach in addition to max result rows as for now.

@bobo495 @link3280 I also agree with adding query timeout to prevent stream jobs running forever when the max result rows cannot be met or never will be. And I will submit the corresponding PR as soon as possible.

yuruguo avatar Mar 03 '23 01:03 yuruguo

Query timeout is one of the possible paths. And also using max loop tries is worth considering. With limited messages in an unbound upstream source (like Kafka), the plain query timeout will force end-user to wait for enough time cost. And max loop retries may help to reduce the waiting time in this case.

bowenliang123 avatar Mar 03 '23 02:03 bowenliang123

@yuruguo Thanks a lot for your help! In another way, we could turn to the incremental fetching approach, which solves the problem once for all. I've started some related work and could get this done in the next release.

link3280 avatar Mar 03 '23 02:03 link3280

For the stream job,i personally think flink sql engine is unfriendly.I got the same problem for datagen query in kyuubi, the query only 20 result expected, but return 4K+ result. By the way, when I submit stream job by insert operation, the beeline client(use jdbc is the same) will block,I think is not good for the sql gateway.

weaksloth avatar Mar 03 '23 03:03 weaksloth

@weaksloth Thanks for your feedback. I agree Flink engine is not production ready yet and we have a plan to improve this situation (see https://github.com/apache/kyuubi/issues/2100). If things go well, we would get many issues fixed in the next release.

By the way, when I submit stream job by insert operation, the beeline client(use jdbc is the same) will block,I think is not good for the sql gateway.

Did you set execution.attached to true in your flink conf?

link3280 avatar Mar 03 '23 05:03 link3280

@link3280 Thank you! I'm looking forward the next release.

I have tried to set execution.attached: true in my flink-conf.yaml, I can see this config option in my flink web ui's jobmanager configuration page, but the beeline client still blocked.

my env info: kyuubi: 1.6.1 flink: 1.15.3 flink cluster mode: standalone

weaksloth avatar Mar 03 '23 07:03 weaksloth

@weaksloth you're right. I've confirmed that the connection would be blocked by Flink insert operations. The problem lies in https://issues.apache.org/jira/browse/FLINK-24461, which refactors the fetching of Flink table results and breaks the rule that the insert results would return immediately.

Kyuubi 1.6.x and above would be affected. I created an issue to track this https://github.com/apache/kyuubi/issues/4446.

link3280 avatar Mar 05 '23 02:03 link3280

@link3280 Thank you, I've got it. I will also pay attention to this issue😀

weaksloth avatar Mar 05 '23 13:03 weaksloth

I think the flink engine completely uses the asynchronous mode, and it is wrong for the insert statement to return the jobid. In the offline scenario, it is still hoped that the block will end. I think it is possible to set both asynchronous and synchronous modes.

waywtdcc avatar Mar 20 '23 01:03 waywtdcc

I think the flink engine completely uses the asynchronous mode, and it is wrong for the insert statement to return the jobid. In the offline scenario, it is still hoped that the block will end. I think it is possible to set both asynchronous and synchronous modes.

@waywtdcc Thanks for your input. Do you think insert statements should always return the number of affected rows? By default, Flink returns -1 as the number of affected rows for inserts, is that what you expected?

link3280 avatar Mar 20 '23 02:03 link3280

I think it is possible to set both asynchronous and synchronous modes.

Agreed. Flink supports both modes natively, we could make use of that if necessary. WDYT? @pan3793

link3280 avatar Mar 20 '23 02:03 link3280

Hello, does the flink sql engine data duplication problem still exist?

waywtdcc avatar Dec 11 '23 05:12 waywtdcc