seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Improve][Connector-V2] Support maxcompute sink writer upsert/delete action with upsert session mode

Open dybyte opened this issue 6 months ago • 3 comments

Fixes https://github.com/apache/seatunnel/issues/8611

Purpose of this pull request

In this PR, we added support for upsert session mode in the MaxCompute sink. Previously, only the upload session mode was supported to insert data into MaxCompute. With this change, users can now update, and delete data using upsert sessions, which allows updating or deleting existing rows based on primary key values. For reference, see MaxCompute Upsert Session.

Why add a PK-based locking mechanism? In a multi-threaded environment, concurrent upsert operations targeting the same primary key (PK) can cause race conditions, leading to unexpected behavior or data inconsistency. According to the official documentation:

"Due to the writing characteristics of the primary key table, we should carefully control the writing logic when writing to the same table (partition) concurrently. If there are multiple concurrent writes to the same primary key at the same time, unexpected behavior may occur. A common solution is to use the shuffle by pk operation to assign records with the same primary key to the same thread for writing."

Based on this, we introduced a PK-based locking mechanism that uses a striped lock pool keyed by a hash of the primary key values.

Does this PR introduce any user-facing change?

Yes.

This PR adds upsert and delete support for the MaxCompute connector, enabling users to perform these operations on MaxCompute tables. Additionally, it introduces a primary key based locking mechanism to handle concurrent upsert requests safely in multi-threaded scenarios, preventing race conditions and ensuring data consistency.

Users can now rely on the connector for more robust and concurrent write operations to MaxCompute tables with primary keys.

How was this patch tested?

  • Added unit tests to verify the upsert and delete functionalities.

  • Added multi-threaded test cases (testLockProcessWithSameId_MultiThreaded) to ensure that the primary key based lock mechanism works correctly and prevents race conditions when multiple threads try to upsert the same row concurrently.

  • Negative cases were also handled to ensure that invalid rows (e.g., null primary keys) throw proper exceptions.

  • Tried to add an integration test for MaxCompute, but currently it is disabled due to limitations in the maxcompute-emulator: @Disabled("maxcompute-emulator does not support upload session for now, we need to move to upsert session in MaxComputeWriter") So full end-to-end tests will be enabled when local MaxCompute upsert session support is available.

Check list

  • [x] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
  • [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
  • [ ] If you are contributing the connector code, please check that the following files are updated:
    1. Update plugin-mapping.properties and add new connector information in it
    2. Update the pom file of seatunnel-dist
    3. Add ci label in label-scope-conf
    4. Add e2e testcase in seatunnel-e2e
    5. Update connector plugin_config

dybyte avatar Jun 18 '25 08:06 dybyte

Please update the e2e test case.

Hisoka-X avatar Jun 18 '25 11:06 Hisoka-X

Please update the e2e test case.

Hi, @Hisoka-X I tried to write an e2e test for the upsert session feature, but I keep getting the following error: Caused by: ErrorCode=Local Error, ErrorMessage=Failed to create upsert session with tunnel endpoint http://127.0.0.1:8080 As far as I checked, the current MaxCompute emulator does not seem to support the Tunnel API for upsert sessions, so this test cannot run properly in the local mock environment.

Could you please confirm if this is expected? If so, I think we should skip the upsert session e2e test for now and only test it in a real environment.

dybyte avatar Jun 18 '25 14:06 dybyte

Could you refer https://github.com/dingxin-tech/maxcompute-emulator/blob/master/src/main/java/com/aliyun/odps/controller/TunnelController.java#L84 and https://github.com/apache/flink-cdc/pull/3254/files#diff-d995689e2ec048a012389e6cd18fef5e08f2d00f9115e3fb4cb22606e4b32a99 ? It should be fine with upsert tunnel. Because it already be verified in flink-cdc.

Hisoka-X avatar Jun 19 '25 02:06 Hisoka-X

@Hisoka-X Thank you for providing the helpful materials—they were really useful during testing. Due to an unknown Netty version conflict in Spark 3.3.0, I decided to skip this test for Spark 3.x for now to keep the test suite stable. I think we should investigate this version conflict further and find a proper solution in the future. This is just my current guess, so please help verify if you have more insights!

dybyte avatar Jun 22 '25 11:06 dybyte