[Bug] [Connector-V2-Paimon] Data changes are lost when sinking into Paimon using batch mode.
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
When I use MySQL JDBC source and Paimon sink under batch mode, Some data with the same primary key have not been updated correctly. However I'm sure it's a PK table using deduplicate merge.
If I add checkpoint.interval=1000 into job env config, all the data can been updated correctly, but job will throw exciption .
And I didn't find any errors or warnings in HDFS logs.
To verify, I used Flink's batch mode and JDBC external tables to write same data, all the data were updated correctly without any errors.
SeaTunnel Version
2.3.5
SeaTunnel Config
env {
parallelism = 1
job.mode = "BATCH"
}
source {
jdbc {
url = "jdbc:mysql://10.1.xxx.xxx:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 180
user = "root"
password = "*****"
table_path="test.test2"
}
}
sink {
Paimon {
warehouse = "hdfs://10.1.xxx.xxx:9000/paimon"
database = "mytest"
table = "test2"
paimon.table.write-props = {
bucket = 1
}
}
}
Running Command
./bin/seatunnel.sh --config ./config/v2.mysql.config -e local
Error Exception
2024-05-10 17:32:01,333 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@6c0e24be
java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) [seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) [seatunnel-starter.jar:2.3.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_402]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_402]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_402]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_402]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_402]
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234) ~[seatunnel-transforms-v2.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5]
... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_402]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_402]
at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232) ~[seatunnel-transforms-v2.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5]
... 16 more
Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:165) ~[connector-paimon-2.3.5.jar:2.3.5]
at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5]
... 5 more
Caused by: java.lang.IllegalStateException: BatchTableWrite only support one-time committing.
at org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182) ~[connector-paimon-2.3.5.jar:2.3.5]
at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:201) ~[connector-paimon-2.3.5.jar:2.3.5]
at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:157) ~[connector-paimon-2.3.5.jar:2.3.5]
at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5]
... 5 more
2024-05-10 17:32:01,339 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] taskDone, taskId = 70000, taskGroup = TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}
2024-05-10 17:32:01,340 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] Interrupted task 60000 - org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@49d2a9d8
2024-05-10 17:32:01,340 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] taskDone, taskId = 60000, taskGroup = TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}
2024-05-10 17:32:01,340 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - Release classloader for job 841246167579754497 with jars [file:/bigdata/seatunnel/connectors/connector-paimon-2.3.5.jar, file:/bigdata/seatunnel/connectors/connector-jdbc-2.3.5.jar]
2024-05-10 17:32:01,349 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread st-multi-table-sink-writer-1
2024-05-10 17:32:01,350 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}
2024-05-10 17:32:01,350 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread st-multi-table-sink-writer-2
2024-05-10 17:32:01,351 INFO [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-5] - [localhost]:5801 [seatunnel-387664] [5.1] Task TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000} complete with state FAILED
2024-05-10 17:32:01,351 INFO [o.a.s.e.s.CoordinatorService ] [hz.main.seaTunnel.task.thread-5] - [localhost]:5801 [seatunnel-387664] [5.1] Received task end from execution TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}, state FAILED
2024-05-10 17:32:01,353 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] turned from state RUNNING to FAILED.
2024-05-10 17:32:01,353 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] state process is stopped
2024-05-10 17:32:01,353 ERROR [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] end with state FAILED and Exception: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68)
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191)
... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232)
... 17 more
Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit
at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:165)
at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217)
... 5 more
Caused by: java.lang.IllegalStateException: BatchTableWrite only support one-time committing.
at org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182)
at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:201)
at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:157)
... 6 more
Zeta or Flink or Spark Version
Zeta 2.3.5
Java or Scala Version
jdk 1.8.0_402
Screenshots
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
If you want to use paimon's upsert feature, your source should be mysql-cdc. In addition, the checkpoint.interval parameter is not supported for batch write in paimon sink, because the batch write in Paimon sink can be submitted only once.
If you want to use paimon's upsert feature, your source should be mysql-cdc. In addition, the checkpoint.interval parameter is not supported for batch write in paimon sink, because the batch write in Paimon sink can be submitted only once.
thanks for your reply. actually, I want to use the batch mode to synchronize SAP HANA's data with Paimon, but can't find a CDC tool for HANA. I'm curious about the difference between flink's batch mode and seaTunnel's batch mode when bundled with Paimon, because flink's batch insert seems to be capable of writing data correctly.
If you want to use paimon's upsert feature, your source should be mysql-cdc. In addition, the checkpoint.interval parameter is not supported for batch write in paimon sink, because the batch write in Paimon sink can be submitted only once.
thanks for your reply. actually, I want to use the batch mode to synchronize SAP HANA's data with Paimon, but can't find a CDC tool for HANA. I'm curious about the difference between flink's batch mode and seaTunnel's batch mode when bundled with Paimon, because flink's batch insert seems to be capable of writing data correctly.
Jdbc source can not capture cdc event. I think may be your test not all right.
I tried to explain why I use JDBC source connector and batch mode, but perhaps I wasn't clear. Well, there's a bug that only occurs in seatunnel's batch mode (CDC works well) : when I wrote data in batch mode using seatunnel's paimon sink, paimon didn't keep the latest record although the table with PK using deduplicate merge engine. However, both flink's batch mode and spark handle the same table and data correctly. So I think paimon's deduplicate merge engine should work even if there are only inserts without update events. It might be helpful to run a simple test to see if this is the case.
I tried to explain why I use JDBC source connector and batch mode, but perhaps I wasn't clear. Well, there's a bug that only occurs in seatunnel's batch mode (CDC works well) : when I wrote data in batch mode using seatunnel's paimon sink, paimon didn't keep the latest record although the table with PK using deduplicate merge engine. However, both flink's batch mode and spark handle the same table and data correctly. So I think paimon's deduplicate merge engine should work even if there are only inserts without update events. It might be helpful to run a simple test to see if this is the case.
I don't think this has anything to do with the paimon sink. In batch mode, the jdbc source only reads the data at the moment it executes the jdbc query. After the data is read, it is sent downstream. No matter the source is updated or inserted, it will not be synchronized to the downstream.
let's focus only the insert behavior of sink and paimon.
a PK table using deduplicate merge engine, like: create table test(id int, tchar string, primary key(id) not enforced); , and some data in the table:
| id | tchar |
|---|---|
| 1 | abc |
| 2 | abc |
| 3 | abc |
if insert some data where the primary keys already exist:
| id | tchar |
|---|---|
| 1 | ccc |
| 2 | ccc |
query this table (default lastest snapshot) should like (the result of using flink/spark to insert):
| id | tchar |
|---|---|
| 1 | ccc |
| 2 | ccc |
| 3 | abc |
when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data:
| id | tchar |
|---|---|
| 1 | abc |
| 2 | ccc |
| 3 | abc |
and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.
let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like:
create table test(id int, tchar string, primary key(id) not enforced);, and some data in the table:id tchar 1 abc 2 abc 3 abc if insert some data where the primary keys already exist:
id tchar 1 ccc 2 ccc query this table (default lastest snapshot) should like (the result of using flink/spark to insert):
id tchar 1 ccc 2 ccc 3 abc when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data:
id tchar 1 abc 2 ccc 3 abc and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.
Which connector does your source use?
let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like:
create table test(id int, tchar string, primary key(id) not enforced);, and some data in the table: id tchar 1 abc 2 abc 3 abc if insert some data where the primary keys already exist: id tchar 1 ccc 2 ccc query this table (default lastest snapshot) should like (the result of using flink/spark to insert): id tchar 1 ccc 2 ccc 3 abc when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data: id tchar 1 abc 2 ccc 3 abc and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.Which connector does your source use?
JDBC
let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like:
create table test(id int, tchar string, primary key(id) not enforced);, and some data in the table: id tchar 1 abc 2 abc 3 abc if insert some data where the primary keys already exist: id tchar 1 ccc 2 ccc query this table (default lastest snapshot) should like (the result of using flink/spark to insert): id tchar 1 ccc 2 ccc 3 abc when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data: id tchar 1 abc 2 ccc 3 abc and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.Which connector does your source use?
JDBC
Does your source table have a primary key?
Yes ,same primary key id, and I would use paimon.table.primary-keys to specify in sink
let's focus only the insert behavior of sink and paimon. a PK table using deduplicate merge engine, like:
create table test(id int, tchar string, primary key(id) not enforced);, and some data in the table: id tchar 1 abc 2 abc 3 abc if insert some data where the primary keys already exist: id tchar 1 ccc 2 ccc query this table (default lastest snapshot) should like (the result of using flink/spark to insert): id tchar 1 ccc 2 ccc 3 abc when using seatunnel's sink to insert, it looks like paimon has not correctly merged all of the data: id tchar 1 abc 2 ccc 3 abc and I try to query with time travel, every paimon's snapshot isn't correct,just like lost some data. that's why I think something may be wrong with the paimon sink, OR can't use paimon sink in batch mode to insert data have same PK.Which connector does your source use?
JDBC
Does your source table have a primary key?
If your source table has a primary key, is inserting data with the same primary key that is actully a modification operation?
yes
Jdbc source can not capture the update events. I think you can use this test case to vertify. You can test only insert with same key. https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
Jdbc source can not capture the update events. I think you can use this test case to vertify. You can test only insert with same key. https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
You can remove this.
Then test only insert.
same result as used JDBC source. first, insert five records into an empty table:
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
pk_id = bigint
tchar = string
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "AAA"]
},
{
kind = INSERT
fields = [2, "BBB"]
},
{
kind = INSERT
fields = [3, "CCC"]
},
{
kind = INSERT
fields = [4, "DDD"]
},
{
kind = INSERT
fields = [5, "EEE"]
}
]
}
}
sink {
Paimon {
warehouse = "file:///tmp/paimon"
database = "seatunnel_test"
table = "test1"
}
}
next, change field tchar to 'ZZZ' and insert:
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
pk_id = bigint
tchar = string
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "ZZZ"]
},
{
kind = INSERT
fields = [2, "ZZZ"]
},
{
kind = INSERT
fields = [3, "ZZZ"]
},
{
kind = INSERT
fields = [4, "ZZZ"]
},
{
kind = INSERT
fields = [5, "ZZZ"]
}
]
}
}
sink {
Paimon {
warehouse = "file:///tmp/paimon"
database = "seatunnel_test"
table = "test1"
paimon.table.primary-keys = "pk_id"
paimon.table.write-props = {
bucket = 1
}
}
}
query results for table test1 (even though all values of the 'tchar' should be 'ZZZ'):
I see what you mean.
@MirrerZu Thanks for your feedback, please use this pr to make a new jar of paimon connector.
@MirrerZu Thanks for your feedback, please use this pr to make a new jar of paimon connector.
It works well for me, Thanks!