[BugFix] Fix bugs of Arrow Flight SQL
Sequence between client and FE/BE
Sequence diagrams for interactions between client and FE/BE
For native Arrow Flight SQL clients, such as FlightSqlClient in Java:
getFlightInfoStatement: Send SQL to the FE. The FE generates the plan, dispatches fragment instances to BEs, and returns the endpoint from which results can be fetched.- For queries: the endpoint is the address of the BE that hosts the result sink.
- For non-queries: the endpoint is the address of the FE itself.
getStreamStatement: The client pulls results from the endpoint.
For ADBC/JDBC drivers, SQL is executed using the following sequence:
createPreparedStatement: Create a prepared statement and return a handle (preparedStmtId) to the client as its identifier.- Execute the query. Note that a single connection can hold multiple prepared statements at the same time.
acceptPutStatement: Update the parameter values of a prepared statement.- We currently do not implement this RPC, so only non-parameterized SQL is supported.
getFlightInfoPreparedStatement: The FE starts executing the query associated with the given handle. The FE generates the plan, dispatches fragment instances to BEs, and returns the endpoint from which results can be fetched.- For queries: the endpoint is the address of the BE that hosts the result sink.
- For non-queries: the endpoint is the address of the FE itself.
getStreamStatement: Fetch results from the endpoint returned in the previous step.
closePreparedStatement: Close a prepared statement.
On the client side, a single cursor only maintains one prepared statement. For the same cursor, when calling cursor.execute(SQL):
- If the SQL is the same as the previous one, it directly runs the three “execute query” steps.
- Otherwise, it first issues
closePreparedStatement, then creates a new prepared statement viaacceptPutStatement.
FE/BE Internal Execution Flow
There are four different execution flows, determined by two dimensions:
- Whether the request is forwarded to the leader.
- Whether it is a query or a non-query.
Non-forward Query
- The client sends a
getFlightInfoPreparedStatementRPC to the FE. After the FE generates the plan and dispatches fragment instances to BEs, it returns the result sink BE endpoint to the client. - The client uses the endpoint to send a
getStreamStatementRPC to the BE and pulls results from the BE.
Non-forward Non-Query
- The client sends a
getFlightInfoPreparedStatementRPC to the FE. The FE’s behavior:- For
SHOW xxxstatements, the FE executes everything locally; there is no BE-side execution. - For DDL and DML, execution must run on BEs. The FE waits until execution fully completes, and only then returns to the client.
- For
- The client uses the endpoint to send
getStreamStatementto the FE and fetch results from the FE.
Forward Query
- The follower FE executes in a dedicated executor thread, forwards the request to the leader FE, blocks waiting for the response, and then writes
audit.logand query detail. - After the leader FE finishes deploying fragment instances to BEs, it notifies the follower FE via a new RPC. The follower FE immediately returns the result sink BE endpoint to the client without waiting for step 1’s forward flow to complete.
Forward Non-Query
- The follower FE executes in a dedicated executor thread, forwards the request to the leader FE, blocks waiting for the response, and then writes
audit.logand query detail. - The follower FE waits until step 1 fully completes, writes the result into its local cache, and then returns its own FE endpoint to the client.
Issues
1. Column type conversion
| SR Type | Arrow-Before | Arrow-PR |
|---|---|---|
TYPE_LARGEINT |
utf8 | Decimal128Type(38, 0) |
TYPE_DATE |
utf8 | date32 |
TYPE_DATETIME |
utf8 | timestamp(arrow::TimeUnit::MICRO, "UTC") |
TYPE_VARBINARY |
unsupported | binary |
TYPE_DECIMAL256 |
unsupported | Decimal256 |
TYPE_HLL |
utf8 | binary (always returns NULL, same as MySQL) |
TYPE_OBJECT(BITMAP) |
unsupported | binary (always returns NULL, same as MySQL) |
TYPE_PERCENTILE |
unsupported | binary (always returns NULL, same as MySQL) |
The mapping from SR types to Arrow types is used not only by Arrow Flight SQL, but also by import/export. For compatibility reasons, only Arrow Flight SQL changes the mapping above.
See row_batch.h/cpp, starrocks_column_to_arrow.cpp.
2. A single connection cannot handle multiple queries concurrently
Problem
Our ConnectContext is not thread-safe.
There are two scenarios where a single connection can be handling multiple queries at once:
- First, the ADBC driver allows a single connection to initiate multiple queries concurrently, as long as each cursor is only executing one query at a time.
- Second, even if the client issues queries serially, this can still occur because the client and the FE do not consider a query “finished” at exactly the same time. A query executes as follows:
- The client sends a
getFlightInfoPreparedStatementRPC to the FE. The FE generates a plan and deploys it to BEs, then returns the result sink BE endpoint to the client. - The client sends a
getStreamStatementRPC to the BE and pulls results directly from the BE. - After finishing result fetching, the client issues the next query. However, at this point the FE may still not have received all BE profile reports. As a result, two queries may be using the same
ConnectContextsimultaneously.
- The client sends a
Fix
Ideally, we would split ConnectContext into ConnectContext and StatementContext. Only a small subset of ConnectContext fields would need to be thread-safe, while StatementContext would not.
However, this change would be too large; ConnectContext is used in hundreds of places. Therefore, the current approach is: each ConnectContext can execute only one SQL at a time, and subsequent SQLs wait up to arrow_flight_sql_connection_query_wait_timeout_ms before timing out.
Implementation-wise, we maintain a RunningToken (a semaphore) that can be held by only one owner at a time. Others block and wait for up to arrow_flight_sql_connection_query_wait_timeout_ms. If it times out, an error is returned.
- Before a SQL starts, acquire the
RunningToken. - After the SQL fully completes, release the
RunningToken. “Fully completes” meansStmtExecutor::executefinishes, including the extra 2 seconds waiting for profile reports and writing audit logs and query detail.
See ArrowFlightSqlConnectProcessor::execute.
3. No statistics collected for BE execution
Problem
BE fragments report statistics when ResultSink finishes (on ResultSink::close), piggybacked on the last batch sent to the FE. However, Arrow Flight SQL doesn’t have a ResultSink between FE and BE.
Fix
We still let the FE issue a fetch_data RPC to the BE’s ResultSink. When ResultSink::close is called, the BE responds to this fetch_data RPC and sends the statistics to the FE.
This also implicitly lets the FE know that the query has finished.
4. One ConnectionContext maintaining multiple prepared queries
The client’s RPC sequence for a single query is:
[createPreparedStatement] -> [getFlightInfoPreparedStatement(preparedStmtId) -> getStreamStatement]
createPreparedStatement: Send the query to the FE, and the FE returns apreparedStmtId.getFlightInfoPreparedStatement(preparedStmtId): Send a request to the FE to execute the query corresponding to the providedpreparedStmtId. The FE returns the endpoint from which results can be fetched (BE for queries, FE for others).getStreamStatement: Fetch query results from the FE or BE.
Problem
ArrowFlightSQLConnectionContext currently stores only a single query field. In reality, a single ArrowFlightSQLConnectionContext may hold multiple different prepared statements at the same time.
More specifically, each client cursor maintains one prepared statement. For a given cursor:
- If
cursor.execute(sql)uses the same SQL as before, it reuses the existing prepared statement and issuesgetFlightInfoPreparedStatement(preparedStmtId) -> getStreamStatement. - Otherwise, it first closes the previous prepared statement and then creates a new one, issuing
closePreparedStatement(prevPreparedStmtId) -> getFlightInfoPreparedStatement -> getStreamStatement.
Fix
ArrowFlightSQLConnectionContext should store a collection of preparedStatements instead of a single query.
5. Forwarding to the leader for execution
Flow changes
- For non-queries, the flow is the same as the MySQL protocol: wait for the response from the RPC forwarded to the leader, and then set the result.
- For queries, since the client fetches results directly from BEs, we cannot wait for the leader FE to return results to the follower FE. Instead, once the leader FE finishes deploying fragment instances to BEs, it sends a
notifyForwardDeploymentFinishedRPC (including BE information) to the follower FE. The follower FE directly returns this information to the client.
Kill-related fixes
- For MySQL, when canceling a query, it automatically sends a
KILL connection_idcommand. The follower FE forwards thisKILL connection_idto the leader FE. - For Arrow Flight SQL, there are two ways to cancel, both requiring the FE to actively send a
KILL connection_idcommand to the leader FE:- Closing the connection:
closeSessionRPC. - Canceling the connection/query:
cancelFlightInfoRPC (not yet implemented).
- Closing the connection:
6. Passing the cancel reason to the BE
Problem
When one fragment fails, it passes the error message to the FE. The FE then cancels the other fragments, but only sends an INTERNAL_ERROR without the error message.
However, since the client pulls data directly from BEs, it cannot obtain the error message from the FE. It only sees INTERNAL_ERROR from the BE.
Fix
When the FE cancels other fragments, it also passes along the error message.
7. Other issues
FE
- Exceptions thrown when closing a connection prevented the connection context from being released.
showExternalBasicStatsMetahas one more column thanStatsMetafor internal tables. MySQL results ignore this extra column, so there is no issue.- Parser retry does not support large
INvalue lists.
BE
- The map from
query_idto schema has unsynchronized read/write access (no locking). - The
get_arrow_batchmethod had incorrect checks on the terminal state_status, causing occasional deadlocks.
SQL Test
- Adding
--arrowtopython run.pywill run the cases using the Arrow Flight SQL protocol. - Adding
@no_arrow_flight_sqlto a case will cause that case to be skipped when running with--arrow. - Adding
@arrow_flight_sqlto a case will make that case run using the Arrow Flight SQL protocol even when--arrowis not specified.
TODO
- Parameterized prepared SQL is not supported.
- A single connection cannot initiate multiple SQLs concurrently.
- SQL retries are not supported, especially when BEs restart/fail.
- Since the client pulls data directly from BEs, the FE cannot re-plan and retry. Once we support fetching data via an FE proxy, we can consider adding this feature.
enable_short_circuitis not supported.enable_constant_execute_in_feis not supported.
Why I'm doing:
What I'm doing:
Fixes #issue
What type of PR is this:
- [x] BugFix
- [ ] Feature
- [ ] Enhancement
- [ ] Refactor
- [ ] UT
- [ ] Doc
- [ ] Tool
Does this PR entail a change in behavior?
- [ ] Yes, this PR will result in a change in behavior.
- [x] No, this PR will not result in a change in behavior.
If yes, please specify the type of change:
- [ ] Interface/UI changes: syntax, type conversion, expression evaluation, display information
- [ ] Parameter changes: default values, similar parameters but with different default values
- [ ] Policy changes: use new policy to replace old one, functionality automatically enabled
- [ ] Feature removed
- [ ] Miscellaneous: upgrade & downgrade compatibility, etc.
Checklist:
- [x] I have added test cases for my bug fix or my new feature
- [ ] This pr needs user documentation (for new or modified features or behaviors)
- [ ] I have added documentation for my new feature or new function
- [ ] This is a backport pr
Bugfix cherry-pick branch check:
- [x] I have checked the version labels which the pr will be auto-backported to the target branch
- [x] 4.0
- [x] 3.5
- [ ] 3.4
- [ ] 3.3
[!NOTE] Overhauls Arrow Flight SQL across FE/BE: corrects type mappings (incl. DECIMAL256/VARBINARY/date/timestamp), adds prepared-statement and endpoint flow with leader-forward notify, propagates cancel errors, enforces per-connection concurrency, fixes ResultSink/status races, and updates tests/configs.
- Arrow Flight SQL flow (FE/BE):
- Add per-connection concurrency control via running token; new timeout config
arrow_flight_sql_connection_query_wait_timeout_ms.- Implement prepared-statement handling and endpoint selection; disable FE-only execution paths for Arrow Flight.
- Support forwarding: leader notifies follower on deployment (
notifyForwardDeploymentFinished); follower returns BE endpoint immediately for queries.- Propagate detailed cancel errors to BE; include error message in logs and RPCs.
- Types & Arrow conversion:
- Map SR types for Flight SQL:
LARGEINT→Decimal128(38,0),DATE→date32,DATETIME→timestamp(us,"UTC"),VARBINARY→binary,DECIMAL256→Decimal256; convert HLL/BITMAP/PERCENTILE tobinarywith nulls.- New converters for date/datetime units, Decimal256, binary/string, arrays/maps/structs; handle map null-key rejection; improve chunk→RecordBatch errors.
- Runtime/Execution:
- Carry Arrow Flight SQL version in requests; build schema per version; count written rows.
- Fix
BufferControlBlock::get_arrow_batchandResultBufferMgr::fetch_arrow_datastatus handling to avoid deadlocks and return correct terminal status.- Add
DateValue::to_days_since_unix_epoch()andTimestampValue::to_unix_microsecond().- RPC/Thrift:
- Extend
PCancelPlanFragmentRequestwitherror_message; addTArrowFlightSQLVersion; new FE RPCnotifyForwardDeploymentFinished.- FE plumbing & proxy:
- Refactor
ConnectProcessor/LeaderOpExecutor/StmtExecutorto support Arrow Flight SQL paths, return rows/statistics updates, and proxy-kill behavior.- Tests & tooling:
- Add Arrow Flight SQL execution mode, pooled connections, Arrow result conversion; tag cases with
@no_arrow_flight_sql/@arrow_flight_sql; relax error matchers; bumppyarrow>=21.0.0.Written by Cursor Bugbot for commit 55b2488cf95834819fd3ef5731d6ca7eb386fa91. This will update automatically on new commits. Configure here.
@cursor review
@cursor review
@cursor review
@cursor review
@cursor review
@cursor review
@cursor review
@cursor review
@cursor review
🧪 CI Insights
Here's what we observed from your CI run for 55b2488c.
🟢 All jobs passed!
But CI Insights is watching 👀
@cursor review
@cursor review
@cursor review
@cursor review
Quality Gate failed
Failed conditions
C Reliability Rating on New Code (required ≥ A)
See analysis details on SonarQube Cloud
Catch issues before they fail your Quality Gate with our IDE extension
SonarQube for IDE
[Java-Extensions Incremental Coverage Report]
:white_check_mark: pass : 0 / 0 (0%)
[FE Incremental Coverage Report]
:white_check_mark: pass : 335 / 403 (83.13%)
file detail
| path | covered_line | new_line | coverage | not_covered_line_detail | |
|---|---|---|---|---|---|
| :large_blue_circle: | com/starrocks/qe/scheduler/dag/ExecutionDAG.java | 0 | 3 | 00.00% | [620, 621, 637] |
| :large_blue_circle: | com/starrocks/service/arrow/flight/sql/ArrowFlightSqlProxyQueryManager.java | 7 | 16 | 43.75% | [37, 38, 39, 42, 43, 48, 55, 56, 57] |
| :large_blue_circle: | com/starrocks/qe/scheduler/dag/FragmentInstanceExecState.java | 1 | 2 | 50.00% | [387] |
| :large_blue_circle: | com/starrocks/qe/DefaultCoordinator.java | 6 | 10 | 60.00% | [1090, 1091, 1092, 1094] |
| :large_blue_circle: | com/starrocks/service/arrow/flight/sql/ArrowFlightSqlConnectProcessor.java | 109 | 134 | 81.34% | [90, 91, 92, 98, 100, 101, 102, 103, 112, 121, 125, 126, 127, 187, 218, 235, 236, 237, 245, 250, 251, 252, 253, 264, 328] |
| :large_blue_circle: | com/starrocks/qe/SimpleExecutor.java | 9 | 11 | 81.82% | [92, 93] |
| :large_blue_circle: | com/starrocks/qe/StmtExecutor.java | 42 | 50 | 84.00% | [815, 1209, 1634, 2247, 2248, 2257, 2258, 2259] |
| :large_blue_circle: | com/starrocks/qe/LeaderOpExecutor.java | 27 | 32 | 84.38% | [135, 140, 141, 163, 176] |
| :large_blue_circle: | com/starrocks/qe/ConnectProcessor.java | 13 | 15 | 86.67% | [858, 859] |
| :large_blue_circle: | com/starrocks/service/arrow/flight/sql/ArrowFlightSqlServiceImpl.java | 30 | 34 | 88.24% | [206, 207, 226, 227] |
| :large_blue_circle: | com/starrocks/service/arrow/flight/sql/ArrowFlightSqlConnectContext.java | 41 | 45 | 91.11% | [146, 147, 149, 151] |
| :large_blue_circle: | com/starrocks/service/FrontendServiceImpl.java | 16 | 17 | 94.12% | [1040] |
| :large_blue_circle: | com/starrocks/qe/ConnectScheduler.java | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/common/Config.java | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/qe/ConnectContext.java | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/service/arrow/flight/sql/ArrowUtils.java | 8 | 8 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/qe/scheduler/TFragmentInstanceFactory.java | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/mysql/MysqlChannel.java | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/sql/ast/ShowBasicStatsMetaStmt.java | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/service/arrow/flight/sql/ArrowFlightSqlResultDescriptor.java | 15 | 15 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/sql/StatementPlanner.java | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/rpc/BackendServiceClient.java | 2 | 2 | 100.00% | [] |
[BE Incremental Coverage Report]
:white_check_mark: pass : 179 / 196 (91.33%)
file detail
| path | covered_line | new_line | coverage | not_covered_line_detail | |
|---|---|---|---|---|---|
| :large_blue_circle: | be/src/exec/arrow_flight_batch_reader.cpp | 3 | 4 | 75.00% | [35] |
| :large_blue_circle: | be/src/util/arrow/row_batch.cpp | 91 | 105 | 86.67% | [98, 99, 100, 109, 110, 111, 173, 174, 175, 222, 223, 224, 234, 235] |
| :large_blue_circle: | be/src/util/arrow/starrocks_column_to_arrow.cpp | 63 | 65 | 96.92% | [763, 764] |
| :large_blue_circle: | be/src/types/timestamp_value.cpp | 7 | 7 | 100.00% | [] |
| :large_blue_circle: | be/src/exec/pipeline/fragment_executor.cpp | 4 | 4 | 100.00% | [] |
| :large_blue_circle: | be/src/runtime/arrow_result_writer.cpp | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | be/src/service/internal_service.cpp | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | be/src/types/date_value.h | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | be/src/runtime/runtime_state.h | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | be/src/runtime/result_buffer_mgr.cpp | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | be/src/runtime/buffer_control_block.cpp | 3 | 3 | 100.00% | [] |
@cursor review
@Mergifyio backport branch-4.0
@Mergifyio backport branch-3.5
backport branch-4.0
✅ Backports have been created
- #66932 [BugFix] Fix bugs of Arrow Flight SQL (backport #65889) has been created for branch
branch-4.0but encountered conflicts
backport branch-3.5
✅ Backports have been created
- #66933 [BugFix] Fix bugs of Arrow Flight SQL (backport #65889) has been created for branch
branch-3.5but encountered conflicts