seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [Connector-V2-Paimon] Data changes are lost when sinking into Paimon using batch mode.

Open MirrerZu opened this issue 1 year ago • 18 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

When I use MySQL JDBC source and Paimon sink under batch mode, Some data with the same primary key have not been updated correctly. However I'm sure it's a PK table using deduplicate merge. If I add checkpoint.interval=1000 into job env config, all the data can been updated correctly, but job will throw exciption . And I didn't find any errors or warnings in HDFS logs. To verify, I used Flink's batch mode and JDBC external tables to write same data, all the data were updated correctly without any errors.

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  jdbc {
    url = "jdbc:mysql://10.1.xxx.xxx:3306/test"
	driver = "com.mysql.cj.jdbc.Driver"
	connection_check_timeout_sec = 180
	user = "root"
	password = "*****"
	table_path="test.test2"
  }
}

sink {

  Paimon {
    warehouse = "hdfs://10.1.xxx.xxx:9000/paimon"
    database = "mytest"
    table = "test2"
	paimon.table.write-props = {
        bucket = 1
    }
  }
 
}

Running Command

./bin/seatunnel.sh --config ./config/v2.mysql.config -e local

Error Exception

2024-05-10 17:32:01,333 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@6c0e24be
java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) [seatunnel-starter.jar:2.3.5]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) [seatunnel-starter.jar:2.3.5]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_402]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_402]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_402]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_402]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_402]
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234) ~[seatunnel-transforms-v2.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5]
        ... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_402]
        at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_402]
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232) ~[seatunnel-transforms-v2.jar:2.3.5]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5]
        ... 16 more
Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:165) ~[connector-paimon-2.3.5.jar:2.3.5]
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5]
        ... 5 more
Caused by: java.lang.IllegalStateException: BatchTableWrite only support one-time committing.
        at org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182) ~[connector-paimon-2.3.5.jar:2.3.5]
        at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:201) ~[connector-paimon-2.3.5.jar:2.3.5]
        at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:157) ~[connector-paimon-2.3.5.jar:2.3.5]
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5]
        ... 5 more
2024-05-10 17:32:01,339 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] taskDone, taskId = 70000, taskGroup = TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}
2024-05-10 17:32:01,340 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] Interrupted task 60000 - org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@49d2a9d8
2024-05-10 17:32:01,340 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] taskDone, taskId = 60000, taskGroup = TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}
2024-05-10 17:32:01,340 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - Release classloader for job 841246167579754497 with jars [file:/bigdata/seatunnel/connectors/connector-paimon-2.3.5.jar, file:/bigdata/seatunnel/connectors/connector-jdbc-2.3.5.jar]
2024-05-10 17:32:01,349 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread st-multi-table-sink-writer-1
2024-05-10 17:32:01,350 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}
2024-05-10 17:32:01,350 INFO  [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread st-multi-table-sink-writer-2
2024-05-10 17:32:01,351 INFO  [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-5] - [localhost]:5801 [seatunnel-387664] [5.1] Task TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000} complete with state FAILED
2024-05-10 17:32:01,351 INFO  [o.a.s.e.s.CoordinatorService  ] [hz.main.seaTunnel.task.thread-5] - [localhost]:5801 [seatunnel-387664] [5.1] Received task end from execution TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}, state FAILED
2024-05-10 17:32:01,353 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] turned from state RUNNING to FAILED.
2024-05-10 17:32:01,353 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] state process is stopped
2024-05-10 17:32:01,353 ERROR [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] end with state FAILED and Exception: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191)
        ... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232)
        ... 17 more
Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
        at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:165)
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217)
        ... 5 more
Caused by: java.lang.IllegalStateException: BatchTableWrite only support one-time committing.
        at org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182)
        at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:201)
        at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:157)
        ... 6 more

Zeta or Flink or Spark Version

Zeta 2.3.5

Java or Scala Version

jdk 1.8.0_402

Screenshots

image image

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

MirrerZu avatar May 10 '24 09:05 MirrerZu

If you want to use paimon's upsert feature, your source should be mysql-cdc. In addition, the checkpoint.interval parameter is not supported for batch write in paimon sink, because the batch write in Paimon sink can be submitted only once.

dailai avatar May 10 '24 09:05 dailai

If you want to use paimon's upsert feature, your source should be mysql-cdc. In addition, the checkpoint.interval parameter is not supported for batch write in paimon sink, because the batch write in Paimon sink can be submitted only once.

thanks for your reply. actually, I want to use the batch mode to synchronize SAP HANA's data with Paimon, but can't find a CDC tool for HANA. I'm curious about the difference between flink's batch mode and seaTunnel's batch mode when bundled with Paimon, because flink's batch insert seems to be capable of writing data correctly.

MirrerZu avatar May 10 '24 11:05 MirrerZu

If you want to use paimon's upsert feature, your source should be mysql-cdc. In addition, the checkpoint.interval parameter is not supported for batch write in paimon sink, because the batch write in Paimon sink can be submitted only once.

thanks for your reply. actually, I want to use the batch mode to synchronize SAP HANA's data with Paimon, but can't find a CDC tool for HANA. I'm curious about the difference between flink's batch mode and seaTunnel's batch mode when bundled with Paimon, because flink's batch insert seems to be capable of writing data correctly.

Jdbc source can not capture cdc event. I think may be your test not all right.

dailai avatar May 11 '24 00:05 dailai

I tried to explain why I use JDBC source connector and batch mode, but perhaps I wasn't clear. Well, there's a bug that only occurs in seatunnel's batch mode (CDC works well) : when I wrote data in batch mode using seatunnel's paimon sink, paimon didn't keep the latest record although the table with PK using deduplicate merge engine. However, both flink's batch mode and spark handle the same table and data correctly. So I think paimon's deduplicate merge engine should work even if there are only inserts without update events. It might be helpful to run a simple test to see if this is the case.

MirrerZu avatar May 15 '24 12:05 MirrerZu

I tried to explain why I use JDBC source connector and batch mode, but perhaps I wasn't clear. Well, there's a bug that only occurs in seatunnel's batch mode (CDC works well) : when I wrote data in batch mode using seatunnel's paimon sink, paimon didn't keep the latest record although the table with PK using deduplicate merge engine. However, both flink's batch mode and spark handle the same table and data correctly. So I think paimon's deduplicate merge engine should work even if there are only inserts without update events. It might be helpful to run a simple test to see if this is the case.

I don't think this has anything to do with the paimon sink. In batch mode, the jdbc source only reads the data at the moment it executes the jdbc query. After the data is read, it is sent downstream. No matter the source is updated or inserted, it will not be synchronized to the downstream.

dailai avatar May 16 '24 06:05 dailai

let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table:

id tchar
1 abc
2 abc
3 abc

if insert some data where the primary keys already exist:

id tchar
1 ccc
2 ccc

query this table (default lastest snapshot) should like (the result of using flink/spark to insert):

id tchar
1 ccc
2 ccc
3 abc

when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data:

id tchar
1 abc
2 ccc
3 abc

and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.

MirrerZu avatar May 16 '24 09:05 MirrerZu

let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table:

id tchar 1 abc 2 abc 3 abc if insert some data where the primary keys already exist:

id tchar 1 ccc 2 ccc query this table (default lastest snapshot) should like (the result of using flink/spark to insert):

id tchar 1 ccc 2 ccc 3 abc when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data:

id tchar 1 abc 2 ccc 3 abc and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.

Which connector does your source use?

dailai avatar May 16 '24 09:05 dailai

let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table: id tchar 1 abc 2 abc 3 abc if insert some data where the primary keys already exist: id tchar 1 ccc 2 ccc query this table (default lastest snapshot) should like (the result of using flink/spark to insert): id tchar 1 ccc 2 ccc 3 abc when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data: id tchar 1 abc 2 ccc 3 abc and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.

Which connector does your source use?

JDBC

MirrerZu avatar May 16 '24 09:05 MirrerZu

let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table: id tchar 1 abc 2 abc 3 abc if insert some data where the primary keys already exist: id tchar 1 ccc 2 ccc query this table (default lastest snapshot) should like (the result of using flink/spark to insert): id tchar 1 ccc 2 ccc 3 abc when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data: id tchar 1 abc 2 ccc 3 abc and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.

Which connector does your source use?

JDBC

Does your source table have a primary key?

dailai avatar May 16 '24 09:05 dailai

Yes ,same primary key id, and I would use paimon.table.primary-keys to specify in sink

MirrerZu avatar May 16 '24 09:05 MirrerZu

let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table: id tchar 1 abc 2 abc 3 abc if insert some data where the primary keys already exist: id tchar 1 ccc 2 ccc query this table (default lastest snapshot) should like (the result of using flink/spark to insert): id tchar 1 ccc 2 ccc 3 abc when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data: id tchar 1 abc 2 ccc 3 abc and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.

Which connector does your source use?

JDBC

Does your source table have a primary key?

If your source table has a primary key, is inserting data with the same primary key that is actully a modification operation?

dailai avatar May 16 '24 09:05 dailai

yes

MirrerZu avatar May 16 '24 09:05 MirrerZu

Jdbc source can not capture the update events. I think you can use this test case to vertify. You can test only insert with same key. https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf

dailai avatar May 16 '24 09:05 dailai

Jdbc source can not capture the update events. I think you can use this test case to vertify. You can test only insert with same key. https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf

You can remove this. image Then test only insert.

dailai avatar May 16 '24 09:05 dailai

same result as used JDBC source. first, insert five records into an empty table:

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        tchar = string
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "AAA"]
      },
      {
        kind = INSERT
        fields = [2, "BBB"]
      },
      {
        kind = INSERT
        fields = [3, "CCC"]
      },
      {
        kind = INSERT
        fields = [4, "DDD"]
      },
      {
        kind = INSERT
        fields = [5, "EEE"]
      }
    ]
  }
}

sink {
  Paimon {
    warehouse = "file:///tmp/paimon"
    database = "seatunnel_test"
    table = "test1"
  }
}

next, change field tchar to 'ZZZ' and insert:

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
		tchar = string
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "ZZZ"]
      },
      {
        kind = INSERT
        fields = [2, "ZZZ"]
      },
      {
        kind = INSERT
        fields = [3, "ZZZ"]
      },
      {
        kind = INSERT
        fields = [4, "ZZZ"]
      },
      {
        kind = INSERT
        fields = [5, "ZZZ"]
      }
    ]
  }
}

sink {
  Paimon {
    warehouse = "file:///tmp/paimon"
    database = "seatunnel_test"
    table = "test1"
	paimon.table.primary-keys = "pk_id"
	paimon.table.write-props = {
        bucket = 1
    }
  }
}

query results for table test1 (even though all values of the 'tchar' should be 'ZZZ'): image

MirrerZu avatar May 16 '24 11:05 MirrerZu

I see what you mean.

dailai avatar May 16 '24 11:05 dailai

@MirrerZu Thanks for your feedback, please use this pr to make a new jar of paimon connector.

dailai avatar May 17 '24 01:05 dailai

@MirrerZu Thanks for your feedback, please use this pr to make a new jar of paimon connector.

It works well for me, Thanks!

MirrerZu avatar May 17 '24 13:05 MirrerZu