starrocks icon indicating copy to clipboard operation
starrocks copied to clipboard

[BugFix] Fix bugs of Arrow Flight SQL

Open ZiheLiu opened this issue 1 month ago • 18 comments

Sequence between client and FE/BE

Sequence diagrams for interactions between client and FE/BE

For native Arrow Flight SQL clients, such as FlightSqlClient in Java:

  1. getFlightInfoStatement: Send SQL to the FE. The FE generates the plan, dispatches fragment instances to BEs, and returns the endpoint from which results can be fetched.
    • For queries: the endpoint is the address of the BE that hosts the result sink.
    • For non-queries: the endpoint is the address of the FE itself.
  2. getStreamStatement: The client pulls results from the endpoint.
image

For ADBC/JDBC drivers, SQL is executed using the following sequence:

  1. createPreparedStatement: Create a prepared statement and return a handle (preparedStmtId) to the client as its identifier.
  2. Execute the query. Note that a single connection can hold multiple prepared statements at the same time.
    1. acceptPutStatement: Update the parameter values of a prepared statement.
      • We currently do not implement this RPC, so only non-parameterized SQL is supported.
    2. getFlightInfoPreparedStatement: The FE starts executing the query associated with the given handle. The FE generates the plan, dispatches fragment instances to BEs, and returns the endpoint from which results can be fetched.
      • For queries: the endpoint is the address of the BE that hosts the result sink.
      • For non-queries: the endpoint is the address of the FE itself.
    3. getStreamStatement: Fetch results from the endpoint returned in the previous step.
  3. closePreparedStatement: Close a prepared statement.

On the client side, a single cursor only maintains one prepared statement. For the same cursor, when calling cursor.execute(SQL):

  • If the SQL is the same as the previous one, it directly runs the three “execute query” steps.
  • Otherwise, it first issues closePreparedStatement, then creates a new prepared statement via acceptPutStatement.
image

FE/BE Internal Execution Flow

There are four different execution flows, determined by two dimensions:

  • Whether the request is forwarded to the leader.
  • Whether it is a query or a non-query.

Non-forward Query

  1. The client sends a getFlightInfoPreparedStatement RPC to the FE. After the FE generates the plan and dispatches fragment instances to BEs, it returns the result sink BE endpoint to the client.
  2. The client uses the endpoint to send a getStreamStatement RPC to the BE and pulls results from the BE.
image

Non-forward Non-Query

  1. The client sends a getFlightInfoPreparedStatement RPC to the FE. The FE’s behavior:
    • For SHOW xxx statements, the FE executes everything locally; there is no BE-side execution.
    • For DDL and DML, execution must run on BEs. The FE waits until execution fully completes, and only then returns to the client.
  2. The client uses the endpoint to send getStreamStatement to the FE and fetch results from the FE.
image

Forward Query

  1. The follower FE executes in a dedicated executor thread, forwards the request to the leader FE, blocks waiting for the response, and then writes audit.log and query detail.
  2. After the leader FE finishes deploying fragment instances to BEs, it notifies the follower FE via a new RPC. The follower FE immediately returns the result sink BE endpoint to the client without waiting for step 1’s forward flow to complete.
image

Forward Non-Query

  1. The follower FE executes in a dedicated executor thread, forwards the request to the leader FE, blocks waiting for the response, and then writes audit.log and query detail.
  2. The follower FE waits until step 1 fully completes, writes the result into its local cache, and then returns its own FE endpoint to the client.
image

Issues

1. Column type conversion

SR Type Arrow-Before Arrow-PR
TYPE_LARGEINT utf8 Decimal128Type(38, 0)
TYPE_DATE utf8 date32
TYPE_DATETIME utf8 timestamp(arrow::TimeUnit::MICRO, "UTC")
TYPE_VARBINARY unsupported binary
TYPE_DECIMAL256 unsupported Decimal256
TYPE_HLL utf8 binary (always returns NULL, same as MySQL)
TYPE_OBJECT(BITMAP) unsupported binary (always returns NULL, same as MySQL)
TYPE_PERCENTILE unsupported binary (always returns NULL, same as MySQL)

The mapping from SR types to Arrow types is used not only by Arrow Flight SQL, but also by import/export. For compatibility reasons, only Arrow Flight SQL changes the mapping above.

See row_batch.h/cpp, starrocks_column_to_arrow.cpp.

2. A single connection cannot handle multiple queries concurrently

Problem

Our ConnectContext is not thread-safe.

There are two scenarios where a single connection can be handling multiple queries at once:

  • First, the ADBC driver allows a single connection to initiate multiple queries concurrently, as long as each cursor is only executing one query at a time.
  • Second, even if the client issues queries serially, this can still occur because the client and the FE do not consider a query “finished” at exactly the same time. A query executes as follows:
    1. The client sends a getFlightInfoPreparedStatement RPC to the FE. The FE generates a plan and deploys it to BEs, then returns the result sink BE endpoint to the client.
    2. The client sends a getStreamStatement RPC to the BE and pulls results directly from the BE.
    3. After finishing result fetching, the client issues the next query. However, at this point the FE may still not have received all BE profile reports. As a result, two queries may be using the same ConnectContext simultaneously.

Fix

Ideally, we would split ConnectContext into ConnectContext and StatementContext. Only a small subset of ConnectContext fields would need to be thread-safe, while StatementContext would not.

However, this change would be too large; ConnectContext is used in hundreds of places. Therefore, the current approach is: each ConnectContext can execute only one SQL at a time, and subsequent SQLs wait up to arrow_flight_sql_connection_query_wait_timeout_ms before timing out.

Implementation-wise, we maintain a RunningToken (a semaphore) that can be held by only one owner at a time. Others block and wait for up to arrow_flight_sql_connection_query_wait_timeout_ms. If it times out, an error is returned.

  • Before a SQL starts, acquire the RunningToken.
  • After the SQL fully completes, release the RunningToken. “Fully completes” means StmtExecutor::execute finishes, including the extra 2 seconds waiting for profile reports and writing audit logs and query detail.

See ArrowFlightSqlConnectProcessor::execute.

3. No statistics collected for BE execution

Problem

BE fragments report statistics when ResultSink finishes (on ResultSink::close), piggybacked on the last batch sent to the FE. However, Arrow Flight SQL doesn’t have a ResultSink between FE and BE.

Fix

We still let the FE issue a fetch_data RPC to the BE’s ResultSink. When ResultSink::close is called, the BE responds to this fetch_data RPC and sends the statistics to the FE.

This also implicitly lets the FE know that the query has finished.

4. One ConnectionContext maintaining multiple prepared queries

The client’s RPC sequence for a single query is:

[createPreparedStatement] -> [getFlightInfoPreparedStatement(preparedStmtId) -> getStreamStatement]

  1. createPreparedStatement: Send the query to the FE, and the FE returns a preparedStmtId.
  2. getFlightInfoPreparedStatement(preparedStmtId): Send a request to the FE to execute the query corresponding to the provided preparedStmtId. The FE returns the endpoint from which results can be fetched (BE for queries, FE for others).
  3. getStreamStatement: Fetch query results from the FE or BE.

Problem

ArrowFlightSQLConnectionContext currently stores only a single query field. In reality, a single ArrowFlightSQLConnectionContext may hold multiple different prepared statements at the same time.

More specifically, each client cursor maintains one prepared statement. For a given cursor:

  • If cursor.execute(sql) uses the same SQL as before, it reuses the existing prepared statement and issues getFlightInfoPreparedStatement(preparedStmtId) -> getStreamStatement.
  • Otherwise, it first closes the previous prepared statement and then creates a new one, issuing closePreparedStatement(prevPreparedStmtId) -> getFlightInfoPreparedStatement -> getStreamStatement.

Fix

ArrowFlightSQLConnectionContext should store a collection of preparedStatements instead of a single query.

5. Forwarding to the leader for execution

Flow changes

  • For non-queries, the flow is the same as the MySQL protocol: wait for the response from the RPC forwarded to the leader, and then set the result.
  • For queries, since the client fetches results directly from BEs, we cannot wait for the leader FE to return results to the follower FE. Instead, once the leader FE finishes deploying fragment instances to BEs, it sends a notifyForwardDeploymentFinished RPC (including BE information) to the follower FE. The follower FE directly returns this information to the client.

Kill-related fixes

  • For MySQL, when canceling a query, it automatically sends a KILL connection_id command. The follower FE forwards this KILL connection_id to the leader FE.
  • For Arrow Flight SQL, there are two ways to cancel, both requiring the FE to actively send a KILL connection_id command to the leader FE:
    • Closing the connection: closeSession RPC.
    • Canceling the connection/query: cancelFlightInfo RPC (not yet implemented).

6. Passing the cancel reason to the BE

Problem

When one fragment fails, it passes the error message to the FE. The FE then cancels the other fragments, but only sends an INTERNAL_ERROR without the error message.

However, since the client pulls data directly from BEs, it cannot obtain the error message from the FE. It only sees INTERNAL_ERROR from the BE.

Fix

When the FE cancels other fragments, it also passes along the error message.

7. Other issues

FE

  1. Exceptions thrown when closing a connection prevented the connection context from being released.
  2. showExternalBasicStatsMeta has one more column than StatsMeta for internal tables. MySQL results ignore this extra column, so there is no issue.
  3. Parser retry does not support large IN value lists.

BE

  1. The map from query_id to schema has unsynchronized read/write access (no locking).
  2. The get_arrow_batch method had incorrect checks on the terminal state _status, causing occasional deadlocks.

SQL Test

  • Adding --arrow to python run.py will run the cases using the Arrow Flight SQL protocol.
  • Adding @no_arrow_flight_sql to a case will cause that case to be skipped when running with --arrow.
  • Adding @arrow_flight_sql to a case will make that case run using the Arrow Flight SQL protocol even when --arrow is not specified.

TODO

  1. Parameterized prepared SQL is not supported.
  2. A single connection cannot initiate multiple SQLs concurrently.
  3. SQL retries are not supported, especially when BEs restart/fail.
    • Since the client pulls data directly from BEs, the FE cannot re-plan and retry. Once we support fetching data via an FE proxy, we can consider adding this feature.
  4. enable_short_circuit is not supported.
  5. enable_constant_execute_in_fe is not supported.

Why I'm doing:

What I'm doing:

Fixes #issue

What type of PR is this:

  • [x] BugFix
  • [ ] Feature
  • [ ] Enhancement
  • [ ] Refactor
  • [ ] UT
  • [ ] Doc
  • [ ] Tool

Does this PR entail a change in behavior?

  • [ ] Yes, this PR will result in a change in behavior.
  • [x] No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • [ ] Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • [ ] Parameter changes: default values, similar parameters but with different default values
  • [ ] Policy changes: use new policy to replace old one, functionality automatically enabled
  • [ ] Feature removed
  • [ ] Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • [x] I have added test cases for my bug fix or my new feature
  • [ ] This pr needs user documentation (for new or modified features or behaviors)
    • [ ] I have added documentation for my new feature or new function
  • [ ] This is a backport pr

Bugfix cherry-pick branch check:

  • [x] I have checked the version labels which the pr will be auto-backported to the target branch
    • [x] 4.0
    • [x] 3.5
    • [ ] 3.4
    • [ ] 3.3

[!NOTE] Overhauls Arrow Flight SQL across FE/BE: corrects type mappings (incl. DECIMAL256/VARBINARY/date/timestamp), adds prepared-statement and endpoint flow with leader-forward notify, propagates cancel errors, enforces per-connection concurrency, fixes ResultSink/status races, and updates tests/configs.

  • Arrow Flight SQL flow (FE/BE):
    • Add per-connection concurrency control via running token; new timeout config arrow_flight_sql_connection_query_wait_timeout_ms.
    • Implement prepared-statement handling and endpoint selection; disable FE-only execution paths for Arrow Flight.
    • Support forwarding: leader notifies follower on deployment (notifyForwardDeploymentFinished); follower returns BE endpoint immediately for queries.
    • Propagate detailed cancel errors to BE; include error message in logs and RPCs.
  • Types & Arrow conversion:
    • Map SR types for Flight SQL: LARGEINT→Decimal128(38,0), DATE→date32, DATETIME→timestamp(us,"UTC"), VARBINARY→binary, DECIMAL256→Decimal256; convert HLL/BITMAP/PERCENTILE to binary with nulls.
    • New converters for date/datetime units, Decimal256, binary/string, arrays/maps/structs; handle map null-key rejection; improve chunk→RecordBatch errors.
  • Runtime/Execution:
    • Carry Arrow Flight SQL version in requests; build schema per version; count written rows.
    • Fix BufferControlBlock::get_arrow_batch and ResultBufferMgr::fetch_arrow_data status handling to avoid deadlocks and return correct terminal status.
    • Add DateValue::to_days_since_unix_epoch() and TimestampValue::to_unix_microsecond().
  • RPC/Thrift:
    • Extend PCancelPlanFragmentRequest with error_message; add TArrowFlightSQLVersion; new FE RPC notifyForwardDeploymentFinished.
  • FE plumbing & proxy:
    • Refactor ConnectProcessor/LeaderOpExecutor/StmtExecutor to support Arrow Flight SQL paths, return rows/statistics updates, and proxy-kill behavior.
  • Tests & tooling:
    • Add Arrow Flight SQL execution mode, pooled connections, Arrow result conversion; tag cases with @no_arrow_flight_sql/@arrow_flight_sql; relax error matchers; bump pyarrow>=21.0.0.

Written by Cursor Bugbot for commit 55b2488cf95834819fd3ef5731d6ca7eb386fa91. This will update automatically on new commits. Configure here.

ZiheLiu avatar Nov 24 '25 08:11 ZiheLiu

@cursor review

alvin-celerdata avatar Nov 24 '25 17:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Nov 25 '25 04:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Nov 25 '25 17:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Nov 26 '25 17:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Nov 27 '25 05:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Nov 27 '25 15:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 03 '25 04:12 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 03 '25 17:12 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 04 '25 03:12 alvin-celerdata

🧪 CI Insights

Here's what we observed from your CI run for 55b2488c.

🟢 All jobs passed!

But CI Insights is watching 👀

mergify[bot] avatar Dec 04 '25 09:12 mergify[bot]

@cursor review

alvin-celerdata avatar Dec 04 '25 17:12 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 04 '25 18:12 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 05 '25 17:12 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 10 '25 04:12 alvin-celerdata

Quality Gate Failed Quality Gate failed

Failed conditions
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

sonarqubecloud[bot] avatar Dec 16 '25 06:12 sonarqubecloud[bot]

[Java-Extensions Incremental Coverage Report]

:white_check_mark: pass : 0 / 0 (0%)

github-actions[bot] avatar Dec 16 '25 09:12 github-actions[bot]

[FE Incremental Coverage Report]

:white_check_mark: pass : 335 / 403 (83.13%)

file detail

path covered_line new_line coverage not_covered_line_detail
:large_blue_circle: com/starrocks/qe/scheduler/dag/ExecutionDAG.java 0 3 00.00% [620, 621, 637]
:large_blue_circle: com/starrocks/service/arrow/flight/sql/ArrowFlightSqlProxyQueryManager.java 7 16 43.75% [37, 38, 39, 42, 43, 48, 55, 56, 57]
:large_blue_circle: com/starrocks/qe/scheduler/dag/FragmentInstanceExecState.java 1 2 50.00% [387]
:large_blue_circle: com/starrocks/qe/DefaultCoordinator.java 6 10 60.00% [1090, 1091, 1092, 1094]
:large_blue_circle: com/starrocks/service/arrow/flight/sql/ArrowFlightSqlConnectProcessor.java 109 134 81.34% [90, 91, 92, 98, 100, 101, 102, 103, 112, 121, 125, 126, 127, 187, 218, 235, 236, 237, 245, 250, 251, 252, 253, 264, 328]
:large_blue_circle: com/starrocks/qe/SimpleExecutor.java 9 11 81.82% [92, 93]
:large_blue_circle: com/starrocks/qe/StmtExecutor.java 42 50 84.00% [815, 1209, 1634, 2247, 2248, 2257, 2258, 2259]
:large_blue_circle: com/starrocks/qe/LeaderOpExecutor.java 27 32 84.38% [135, 140, 141, 163, 176]
:large_blue_circle: com/starrocks/qe/ConnectProcessor.java 13 15 86.67% [858, 859]
:large_blue_circle: com/starrocks/service/arrow/flight/sql/ArrowFlightSqlServiceImpl.java 30 34 88.24% [206, 207, 226, 227]
:large_blue_circle: com/starrocks/service/arrow/flight/sql/ArrowFlightSqlConnectContext.java 41 45 91.11% [146, 147, 149, 151]
:large_blue_circle: com/starrocks/service/FrontendServiceImpl.java 16 17 94.12% [1040]
:large_blue_circle: com/starrocks/qe/ConnectScheduler.java 2 2 100.00% []
:large_blue_circle: com/starrocks/common/Config.java 1 1 100.00% []
:large_blue_circle: com/starrocks/qe/ConnectContext.java 1 1 100.00% []
:large_blue_circle: com/starrocks/service/arrow/flight/sql/ArrowUtils.java 8 8 100.00% []
:large_blue_circle: com/starrocks/qe/scheduler/TFragmentInstanceFactory.java 1 1 100.00% []
:large_blue_circle: com/starrocks/mysql/MysqlChannel.java 2 2 100.00% []
:large_blue_circle: com/starrocks/sql/ast/ShowBasicStatsMetaStmt.java 1 1 100.00% []
:large_blue_circle: com/starrocks/service/arrow/flight/sql/ArrowFlightSqlResultDescriptor.java 15 15 100.00% []
:large_blue_circle: com/starrocks/sql/StatementPlanner.java 1 1 100.00% []
:large_blue_circle: com/starrocks/rpc/BackendServiceClient.java 2 2 100.00% []

github-actions[bot] avatar Dec 16 '25 10:12 github-actions[bot]

[BE Incremental Coverage Report]

:white_check_mark: pass : 179 / 196 (91.33%)

file detail

path covered_line new_line coverage not_covered_line_detail
:large_blue_circle: be/src/exec/arrow_flight_batch_reader.cpp 3 4 75.00% [35]
:large_blue_circle: be/src/util/arrow/row_batch.cpp 91 105 86.67% [98, 99, 100, 109, 110, 111, 173, 174, 175, 222, 223, 224, 234, 235]
:large_blue_circle: be/src/util/arrow/starrocks_column_to_arrow.cpp 63 65 96.92% [763, 764]
:large_blue_circle: be/src/types/timestamp_value.cpp 7 7 100.00% []
:large_blue_circle: be/src/exec/pipeline/fragment_executor.cpp 4 4 100.00% []
:large_blue_circle: be/src/runtime/arrow_result_writer.cpp 1 1 100.00% []
:large_blue_circle: be/src/service/internal_service.cpp 2 2 100.00% []
:large_blue_circle: be/src/types/date_value.h 2 2 100.00% []
:large_blue_circle: be/src/runtime/runtime_state.h 2 2 100.00% []
:large_blue_circle: be/src/runtime/result_buffer_mgr.cpp 1 1 100.00% []
:large_blue_circle: be/src/runtime/buffer_control_block.cpp 3 3 100.00% []

github-actions[bot] avatar Dec 16 '25 10:12 github-actions[bot]

@cursor review

alvin-celerdata avatar Dec 16 '25 17:12 alvin-celerdata

@Mergifyio backport branch-4.0

github-actions[bot] avatar Dec 18 '25 05:12 github-actions[bot]

@Mergifyio backport branch-3.5

github-actions[bot] avatar Dec 18 '25 05:12 github-actions[bot]

backport branch-4.0

✅ Backports have been created

mergify[bot] avatar Dec 18 '25 05:12 mergify[bot]

backport branch-3.5

✅ Backports have been created

mergify[bot] avatar Dec 18 '25 05:12 mergify[bot]