seatunnel
seatunnel copied to clipboard
[Bug2.3.4] [postgresqlCDC]对源表做update操作时 ,同步任务报错停止When updating the source table, the synchronization task stops with an error
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
... 2 more
2024-02-28 08:45:23,260 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is: INSERT INTO "test_db"."public"."t_user_2" ("id", "name", "age", "update_time") VALUES (?, ?, ?, ?) ON CONFLICT ("id") DO UPDATE SET "id"=EXCLUDED."id", "name"=EXCLUDED."name", "age"=EXCLUDED."age", "update_time"=EXCLUDED."update_time"
2024-02-28 08:45:23,265 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is: DELETE FROM "test_db"."public"."t_user_2" WHERE "id" = ?
2024-02-28 08:45:23,267 INFO [.JdbcMultiTableResourceManager] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - start close connection poolHikariPool-4 2024-02-28 08:45:23,268 INFO [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Shutdown initiated... 2024-02-28 08:45:23,272 WARN [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Interrupted during closing java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) ~[?:1.8.0_342] at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariPool.shutdown(HikariPool.java:255) ~[?:?] at org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource.close(HikariDataSource.java:351) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.ConnectionPoolManager.close(ConnectionPoolManager.java:66) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager.close(JdbcMultiTableResourceManager.java:42) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.close(MultiTableSinkWriter.java:251) ~[seatunnel-transforms-v2.jar:2.3.4] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:153) ~[seatunnel-starter.jar:2.3.4] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.4] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) [?:1.8.0_342] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_342] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_342] at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) [?:1.8.0_342] at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_342] at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) [?:1.8.0_342] at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_342] at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) [?:1.8.0_342] at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) [?:1.8.0_342] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_342] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) [?:1.8.0_342] at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) [?:1.8.0_342] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.4] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:660) [seatunnel-starter.jar:2.3.4] at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) [seatunnel-starter.jar:2.3.4] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342] 2024-02-28 08:45:23,899 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher 2024-02-28 08:45:23,900 INFO [i.d.j.JdbcConnection ] [pool-64-thread-1] - Connection gracefully closed 2024-02-28 08:45:23,902 INFO [i.d.j.JdbcConnection ] [pool-65-thread-1] - Connection gracefully closed 2024-02-28 08:45:23,903 INFO [i.d.j.JdbcConnection ] [pool-66-thread-1] - Connection gracefully closed 2024-02-28 08:45:24,304 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited. 2024-02-28 08:45:24,316 INFO [s.c.s.s.c.ClientExecuteCommand] [Thread-105] - run shutdown hook because get close signal
SeaTunnel Version
SeaTunnel2.3.4 pgsqlCDC
SeaTunnel Config
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}
source {
Postgres-CDC {
username = "postgres"
password = "postgres"
database-names = ["test_db"]
schema-names = ["public"]
table-names = ["test_db.public.t_user"]
base-url = "jdbc:postgresql://192.168.3.46:30028/test_db?loggerLevel=OFF"
}
}
transform {
}
sink {
jdbc {
url = "jdbc:postgresql://192.168.3.46:30028/test_db?loggerLevel=OFF"
driver = "org.postgresql.Driver"
user = "postgres"
password = "postgres"
generate_sink_sql = true
# You need to configure both database and table
database = test_db
table = "public.t_user_2"
primary_keys = ["id"]
}
}
Running Command
root@7366e5930d9f:/opt/apache-seatunnel-2.3.4# ./bin/seatunnel.sh --config ./config/pgsqlcdc.template -e local
Error Exception
2024-02-28 08:45:23,064 INFO [o.a.s.e.s.d.p.SubPlan ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)] future complete with state FAILED
2024-02-28 08:45:23,064 ERROR [o.a.s.e.s.d.p.SubPlan ] [seatunnel-coordinator-service-2] - Task TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000} Failed in Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], Begin to cancel other tasks in this pipeline.
2024-02-28 08:45:23,065 INFO [o.a.s.e.s.d.p.SubPlan ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] turned from state RUNNING to FAILING.
2024-02-28 08:45:23,065 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] state process is start
2024-02-28 08:45:23,066 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] turned from state RUNNING to CANCELING.
2024-02-28 08:45:23,067 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [seatunnel-coordinator-service-2] - Send cancel Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] operator to member [localhost]:5801
2024-02-28 08:45:23,068 INFO [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-2] - [localhost]:5801 [seatunnel-347553] [5.1] Task (TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}) need cancel.
2024-02-28 08:45:23,069 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)] state process is start
2024-02-28 08:45:23,069 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - [localhost]:5801 [seatunnel-347553] [5.1] Interrupted task 20000 - org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask@1be5e807
2024-02-28 08:45:23,070 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - [localhost]:5801 [seatunnel-347553] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}
2024-02-28 08:45:23,070 INFO [o.a.s.e.c.l.ClassLoaderUtil ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - recycle classloader org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader@1c65c15f
2024-02-28 08:45:23,072 INFO [o.a.s.e.c.l.ClassLoaderUtil ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}
2024-02-28 08:45:23,073 INFO [.e.IncrementalSourceEnumerator] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - Closing enumerator...
2024-02-28 08:45:23,073 INFO [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-2] - [localhost]:5801 [seatunnel-347553] [5.1] Task TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1} complete with state CANCELED
2024-02-28 08:45:23,073 INFO [o.a.s.e.s.CoordinatorService ] [hz.main.seaTunnel.task.thread-2] - [localhost]:5801 [seatunnel-347553] [5.1] Received task end from execution TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}, state CANCELED
2024-02-28 08:45:23,075 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] turned from state CANCELING to CANCELED.
2024-02-28 08:45:23,075 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] state process is stopped
2024-02-28 08:45:23,075 INFO [o.a.s.e.s.d.p.SubPlan ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] future complete with state CANCELED
2024-02-28 08:45:23,076 INFO [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-2] - start clean pending checkpoint cause Pipeline turn to end state.
2024-02-28 08:45:23,079 INFO [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Closing Source Reader.
2024-02-28 08:45:23,080 INFO [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-2] - Turn checkpoint_state_815141672055734273_1 state from RUNNING to CANCELED
2024-02-28 08:45:23,081 INFO [o.a.s.e.s.d.p.SubPlan ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] will end with state FAILED
2024-02-28 08:45:23,081 INFO [o.a.s.e.s.d.p.SubPlan ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] turned from state FAILING to FAILED.
2024-02-28 08:45:23,085 WARN [o.a.s.s.c.z.h.HikariConfig ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - idleTimeout has been set but has no effect because the pool is operating as a fixed size pool.
2024-02-28 08:45:23,085 INFO [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Starting...
2024-02-28 08:45:23,090 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Shutting down split fetcher 0
2024-02-28 08:45:23,102 INFO [o.a.s.e.s.m.JobMaster ] [seatunnel-coordinator-service-2] - release the pipeline Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] resource
2024-02-28 08:45:23,102 INFO [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-46] - received slot release request, jobID: 815141672055734273, slot: SlotProfile{worker=[localhost]:5801, slotID=7, ownerJobID=815141672055734273, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='af5ae013-144f-4faa-937b-8981475315eb'}
2024-02-28 08:45:23,103 INFO [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-47] - received slot release request, jobID: 815141672055734273, slot: SlotProfile{worker=[localhost]:5801, slotID=8, ownerJobID=815141672055734273, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='af5ae013-144f-4faa-937b-8981475315eb'}
2024-02-28 08:45:23,104 INFO [o.a.s.e.s.d.p.SubPlan ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] state process is stop
2024-02-28 08:45:23,104 INFO [o.a.s.e.s.d.p.PhysicalPlan ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] future complete with state FAILED
2024-02-28 08:45:23,105 INFO [o.a.s.e.s.d.p.PhysicalPlan ] [seatunnel-coordinator-service-3] - cancel job Job SeaTunnel_Job (815141672055734273) because makeJobEndWhenPipelineEnded is true
2024-02-28 08:45:23,106 INFO [o.a.s.e.s.d.p.PhysicalPlan ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) turned from state RUNNING to FAILING.
2024-02-28 08:45:23,108 INFO [o.a.s.e.s.d.p.PhysicalPlan ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) turned from state FAILING to FAILED.
2024-02-28 08:45:23,109 INFO [o.a.s.e.s.d.p.PhysicalPlan ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) state process is stop
2024-02-28 08:45:23,137 INFO [o.a.s.e.c.j.ClientJobProxy ] [main] - Job (815141672055734273) end with state FAILED
2024-02-28 08:45:23,138 INFO [c.h.c.LifecycleService ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2024-02-28 08:45:23,142 INFO [c.h.i.s.t.TcpServerConnection ] [hz.main.IO.thread-in-1] - [localhost]:5801 [seatunnel-347553] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:35765, qualifier=null, endpoint=[127.0.0.1]:35765, remoteUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
2024-02-28 08:45:23,143 INFO [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel-347553] [5.1] Removed connection to endpoint: [localhost]:5801:6e1d5e93-9d1b-4167-a038-b76888e088fe, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:35765->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2024-02-28 08:45:23.129, lastWriteTime=2024-02-28 08:45:19.110, closedTime=2024-02-28 08:45:23.141, connected server version=5.1}
2024-02-28 08:45:23,144 INFO [c.h.c.LifecycleService ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2024-02-28 08:45:23,147 INFO [c.h.c.i.ClientEndpointManager ] [hz.main.event-1] - [localhost]:5801 [seatunnel-347553] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:35765, qualifier=null, endpoint=[127.0.0.1]:35765, remoteUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1709109719041, latest clientAttributes=lastStatisticsCollectionTime=1709109919108,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1709109719030,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=2709667840,os.freePhysicalMemorySize=320233472,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=1048576,os.openFileDescriptorCount=65,os.processCpuTime=12630000000,os.systemLoadAverage=0.62,os.totalPhysicalMemorySize=8280195072,os.totalSwapSpaceSize=0,runtime.availableProcessors=1,runtime.freeMemory=161719896,runtime.maxMemory=518979584,runtime.totalMemory=260177920,runtime.uptime=203553,runtime.usedMemory=98460344, labels=[]}
2024-02-28 08:45:23,151 INFO [c.h.c.LifecycleService ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2024-02-28 08:45:23,152 INFO [s.c.s.s.c.ClientExecuteCommand] [main] - Closed SeaTunnel client......
2024-02-28 08:45:23,153 INFO [c.h.c.LifecycleService ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] [localhost]:5801 is SHUTTING_DOWN
2024-02-28 08:45:23,158 INFO [c.h.i.p.i.MigrationManager ] [hz.main.cached.thread-6] - [localhost]:5801 [seatunnel-347553] [5.1] Shutdown request of Member [localhost]:5801 - 6e1d5e93-9d1b-4167-a038-b76888e088fe this is handled
2024-02-28 08:45:23,164 INFO [c.h.i.i.Node ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Shutting down connection manager...
2024-02-28 08:45:23,172 INFO [c.h.i.i.Node ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Shutting down node engine...
2024-02-28 08:45:23,211 INFO [c.h.i.i.NodeExtension ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Destroying node NodeExtension.
2024-02-28 08:45:23,212 INFO [c.h.i.i.Node ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Hazelcast Shutdown is completed in 56 ms.
2024-02-28 08:45:23,212 INFO [c.h.c.LifecycleService ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] [localhost]:5801 is SHUTDOWN
2024-02-28 08:45:23,214 INFO [s.c.s.s.c.ClientExecuteCommand] [main] - Closed HazelcastInstance ......
2024-02-28 08:45:23,216 INFO [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Start completed.
2024-02-28 08:45:23,218 INFO [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ......
2024-02-28 08:45:23,219 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
===============================================================================
2024-02-28 08:45:23,220 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error,
2024-02-28 08:45:23,221 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues
2024-02-28 08:45:23,222 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed
2024-02-28 08:45:23,223 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
... 2 more
2024-02-28 08:45:23,237 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
===============================================================================
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
... 2 more
2024-02-28 08:45:23,260 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is:
INSERT INTO "test_db"."public"."t_user_2" ("id", "name", "age", "update_time") VALUES (?, ?, ?, ?) ON CONFLICT ("id") DO UPDATE SET "id"=EXCLUDED."id", "name"=EXCLUDED."name", "age"=EXCLUDED."age", "update_time"=EXCLUDED."update_time"
2024-02-28 08:45:23,265 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is:
DELETE FROM "test_db"."public"."t_user_2" WHERE "id" = ?
2024-02-28 08:45:23,267 INFO [.JdbcMultiTableResourceManager] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - start close connection poolHikariPool-4
2024-02-28 08:45:23,268 INFO [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Shutdown initiated...
2024-02-28 08:45:23,272 WARN [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Interrupted during closing
java.lang.InterruptedException: null
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) ~[?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) ~[?:1.8.0_342]
at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariPool.shutdown(HikariPool.java:255) ~[?:?]
at org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource.close(HikariDataSource.java:351) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.ConnectionPoolManager.close(ConnectionPoolManager.java:66) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager.close(JdbcMultiTableResourceManager.java:42) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.close(MultiTableSinkWriter.java:251) ~[seatunnel-transforms-v2.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:153) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.4]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) [?:1.8.0_342]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_342]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_342]
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) [?:1.8.0_342]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_342]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_342]
at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) [?:1.8.0_342]
at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) [?:1.8.0_342]
at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_342]
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_342]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_342]
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) [?:1.8.0_342]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) [?:1.8.0_342]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_342]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) [?:1.8.0_342]
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) [?:1.8.0_342]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:660) [seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) [seatunnel-starter.jar:2.3.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
2024-02-28 08:45:23,899 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher
2024-02-28 08:45:23,900 INFO [i.d.j.JdbcConnection ] [pool-64-thread-1] - Connection gracefully closed
2024-02-28 08:45:23,902 INFO [i.d.j.JdbcConnection ] [pool-65-thread-1] - Connection gracefully closed
2024-02-28 08:45:23,903 INFO [i.d.j.JdbcConnection ] [pool-66-thread-1] - Connection gracefully closed
2024-02-28 08:45:24,304 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
2024-02-28 08:45:24,316 INFO [s.c.s.s.c.ClientExecuteCommand] [Thread-105] - run shutdown hook because get close signal
Zeta or Flink or Spark Version
zeta
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
+1
对源表delete时也有该问题,pgsql版本15, driver版本42.7.3
Please check your database settings https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Postgre-CDC.md#using-dependency
Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:
Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding "wal_level = logical", restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:
ALTER SYSTEM SET wal_level TO 'logical';
SELECT pg_reload_conf();
-- Change the REPLICA policy of the specified table to FULL
ALTER TABLE your_table_name REPLICA IDENTITY FULL;
Please check your database settings https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Postgre-CDC.md#using-dependency
Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:
Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding "wal_level = logical", restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:
ALTER SYSTEM SET wal_level TO 'logical'; SELECT pg_reload_conf(); -- Change the REPLICA policy of the specified table to FULL ALTER TABLE your_table_name REPLICA IDENTITY FULL;
I guess the settings are correct.
check
ALTER TABLE your_table_name REPLICA IDENTITY FULL;