seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug2.3.4] [postgresqlCDC]对源表做update操作时 ,同步任务报错停止When updating the source table, the synchronization task stops with an error

Open 2606090723 opened this issue 11 months ago • 2 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

image 2024-02-28 08:45:23,237 ERROR [o.a.s.c.s.SeaTunnel ] [main] - ===============================================================================

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
    ... 2 more

2024-02-28 08:45:23,260 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is: INSERT INTO "test_db"."public"."t_user_2" ("id", "name", "age", "update_time") VALUES (?, ?, ?, ?) ON CONFLICT ("id") DO UPDATE SET "id"=EXCLUDED."id", "name"=EXCLUDED."name", "age"=EXCLUDED."age", "update_time"=EXCLUDED."update_time"

2024-02-28 08:45:23,265 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is: DELETE FROM "test_db"."public"."t_user_2" WHERE "id" = ?

2024-02-28 08:45:23,267 INFO [.JdbcMultiTableResourceManager] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - start close connection poolHikariPool-4 2024-02-28 08:45:23,268 INFO [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Shutdown initiated... 2024-02-28 08:45:23,272 WARN [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Interrupted during closing java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) ~[?:1.8.0_342] at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariPool.shutdown(HikariPool.java:255) ~[?:?] at org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource.close(HikariDataSource.java:351) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.ConnectionPoolManager.close(ConnectionPoolManager.java:66) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager.close(JdbcMultiTableResourceManager.java:42) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.close(MultiTableSinkWriter.java:251) ~[seatunnel-transforms-v2.jar:2.3.4] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:153) ~[seatunnel-starter.jar:2.3.4] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.4] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) [?:1.8.0_342] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_342] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_342] at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) [?:1.8.0_342] at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_342] at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) [?:1.8.0_342] at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_342] at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_342] at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) [?:1.8.0_342] at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) [?:1.8.0_342] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_342] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) [?:1.8.0_342] at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) [?:1.8.0_342] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.4] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:660) [seatunnel-starter.jar:2.3.4] at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) [seatunnel-starter.jar:2.3.4] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342] 2024-02-28 08:45:23,899 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher 2024-02-28 08:45:23,900 INFO [i.d.j.JdbcConnection ] [pool-64-thread-1] - Connection gracefully closed 2024-02-28 08:45:23,902 INFO [i.d.j.JdbcConnection ] [pool-65-thread-1] - Connection gracefully closed 2024-02-28 08:45:23,903 INFO [i.d.j.JdbcConnection ] [pool-66-thread-1] - Connection gracefully closed 2024-02-28 08:45:24,304 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited. 2024-02-28 08:45:24,316 INFO [s.c.s.s.c.ClientExecuteCommand] [Thread-105] - run shutdown hook because get close signal

SeaTunnel Version

SeaTunnel2.3.4 pgsqlCDC

SeaTunnel Config

env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  read_limit.bytes_per_second=7000000
  read_limit.rows_per_second=400
}

source {
  Postgres-CDC {
    username = "postgres"
    password = "postgres"
    database-names = ["test_db"]
    schema-names = ["public"]
    table-names = ["test_db.public.t_user"]
    base-url = "jdbc:postgresql://192.168.3.46:30028/test_db?loggerLevel=OFF"
  }
}

transform {

}
sink {
  jdbc {
    url = "jdbc:postgresql://192.168.3.46:30028/test_db?loggerLevel=OFF"
    driver = "org.postgresql.Driver"
    user = "postgres"
    password = "postgres"

    generate_sink_sql = true
    # You need to configure both database and table
    database = test_db
    table = "public.t_user_2"
    primary_keys = ["id"]
   
  }
}

Running Command

root@7366e5930d9f:/opt/apache-seatunnel-2.3.4# ./bin/seatunnel.sh --config ./config/pgsqlcdc.template -e local

Error Exception

2024-02-28 08:45:23,064 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)] future complete with state FAILED
2024-02-28 08:45:23,064 ERROR [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Task TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000} Failed in Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], Begin to cancel other tasks in this pipeline.
2024-02-28 08:45:23,065 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] turned from state RUNNING to FAILING.
2024-02-28 08:45:23,065 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] state process is start
2024-02-28 08:45:23,066 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] turned from state RUNNING to CANCELING.
2024-02-28 08:45:23,067 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Send cancel Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] operator to member [localhost]:5801
2024-02-28 08:45:23,068 INFO  [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-2] - [localhost]:5801 [seatunnel-347553] [5.1] Task (TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}) need cancel.
2024-02-28 08:45:23,069 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)] state process is start
2024-02-28 08:45:23,069 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - [localhost]:5801 [seatunnel-347553] [5.1] Interrupted task 20000 - org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask@1be5e807
2024-02-28 08:45:23,070 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - [localhost]:5801 [seatunnel-347553] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}
2024-02-28 08:45:23,070 INFO  [o.a.s.e.c.l.ClassLoaderUtil   ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - recycle classloader org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader@1c65c15f
2024-02-28 08:45:23,072 INFO  [o.a.s.e.c.l.ClassLoaderUtil   ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}
2024-02-28 08:45:23,073 INFO  [.e.IncrementalSourceEnumerator] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - Closing enumerator...
2024-02-28 08:45:23,073 INFO  [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-2] - [localhost]:5801 [seatunnel-347553] [5.1] Task TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1} complete with state CANCELED
2024-02-28 08:45:23,073 INFO  [o.a.s.e.s.CoordinatorService  ] [hz.main.seaTunnel.task.thread-2] - [localhost]:5801 [seatunnel-347553] [5.1] Received task end from execution TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}, state CANCELED
2024-02-28 08:45:23,075 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] turned from state CANCELING to CANCELED.
2024-02-28 08:45:23,075 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] state process is stopped
2024-02-28 08:45:23,075 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] future complete with state CANCELED
2024-02-28 08:45:23,076 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-2] - start clean pending checkpoint cause Pipeline turn to end state.
2024-02-28 08:45:23,079 INFO  [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Closing Source Reader.
2024-02-28 08:45:23,080 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-2] - Turn checkpoint_state_815141672055734273_1 state from RUNNING to CANCELED
2024-02-28 08:45:23,081 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] will end with state FAILED
2024-02-28 08:45:23,081 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] turned from state FAILING to FAILED.
2024-02-28 08:45:23,085 WARN  [o.a.s.s.c.z.h.HikariConfig    ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - idleTimeout has been set but has no effect because the pool is operating as a fixed size pool.
2024-02-28 08:45:23,085 INFO  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Starting...
2024-02-28 08:45:23,090 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Shutting down split fetcher 0
2024-02-28 08:45:23,102 INFO  [o.a.s.e.s.m.JobMaster         ] [seatunnel-coordinator-service-2] - release the pipeline Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] resource
2024-02-28 08:45:23,102 INFO  [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-46] - received slot release request, jobID: 815141672055734273, slot: SlotProfile{worker=[localhost]:5801, slotID=7, ownerJobID=815141672055734273, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='af5ae013-144f-4faa-937b-8981475315eb'}
2024-02-28 08:45:23,103 INFO  [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-47] - received slot release request, jobID: 815141672055734273, slot: SlotProfile{worker=[localhost]:5801, slotID=8, ownerJobID=815141672055734273, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='af5ae013-144f-4faa-937b-8981475315eb'}
2024-02-28 08:45:23,104 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] state process is stop
2024-02-28 08:45:23,104 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] future complete with state FAILED
2024-02-28 08:45:23,105 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - cancel job Job SeaTunnel_Job (815141672055734273) because makeJobEndWhenPipelineEnded is true
2024-02-28 08:45:23,106 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) turned from state RUNNING to FAILING.
2024-02-28 08:45:23,108 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) turned from state FAILING to FAILED.
2024-02-28 08:45:23,109 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) state process is stop
2024-02-28 08:45:23,137 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Job (815141672055734273) end with state FAILED
2024-02-28 08:45:23,138 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2024-02-28 08:45:23,142 INFO  [c.h.i.s.t.TcpServerConnection ] [hz.main.IO.thread-in-1] - [localhost]:5801 [seatunnel-347553] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:35765, qualifier=null, endpoint=[127.0.0.1]:35765, remoteUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
2024-02-28 08:45:23,143 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel-347553] [5.1] Removed connection to endpoint: [localhost]:5801:6e1d5e93-9d1b-4167-a038-b76888e088fe, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:35765->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2024-02-28 08:45:23.129, lastWriteTime=2024-02-28 08:45:19.110, closedTime=2024-02-28 08:45:23.141, connected server version=5.1}
2024-02-28 08:45:23,144 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2024-02-28 08:45:23,147 INFO  [c.h.c.i.ClientEndpointManager ] [hz.main.event-1] - [localhost]:5801 [seatunnel-347553] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:35765, qualifier=null, endpoint=[127.0.0.1]:35765, remoteUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1709109719041, latest clientAttributes=lastStatisticsCollectionTime=1709109919108,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1709109719030,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=2709667840,os.freePhysicalMemorySize=320233472,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=1048576,os.openFileDescriptorCount=65,os.processCpuTime=12630000000,os.systemLoadAverage=0.62,os.totalPhysicalMemorySize=8280195072,os.totalSwapSpaceSize=0,runtime.availableProcessors=1,runtime.freeMemory=161719896,runtime.maxMemory=518979584,runtime.totalMemory=260177920,runtime.uptime=203553,runtime.usedMemory=98460344, labels=[]}
2024-02-28 08:45:23,151 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2024-02-28 08:45:23,152 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed SeaTunnel client......
2024-02-28 08:45:23,153 INFO  [c.h.c.LifecycleService        ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] [localhost]:5801 is SHUTTING_DOWN
2024-02-28 08:45:23,158 INFO  [c.h.i.p.i.MigrationManager    ] [hz.main.cached.thread-6] - [localhost]:5801 [seatunnel-347553] [5.1] Shutdown request of Member [localhost]:5801 - 6e1d5e93-9d1b-4167-a038-b76888e088fe this is handled
2024-02-28 08:45:23,164 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Shutting down connection manager...
2024-02-28 08:45:23,172 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Shutting down node engine...
2024-02-28 08:45:23,211 INFO  [c.h.i.i.NodeExtension         ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Destroying node NodeExtension.
2024-02-28 08:45:23,212 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Hazelcast Shutdown is completed in 56 ms.
2024-02-28 08:45:23,212 INFO  [c.h.c.LifecycleService        ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] [localhost]:5801 is SHUTDOWN
2024-02-28 08:45:23,214 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed HazelcastInstance ......
2024-02-28 08:45:23,216 INFO  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Start completed.
2024-02-28 08:45:23,218 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ......
2024-02-28 08:45:23,219 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 

===============================================================================


2024-02-28 08:45:23,220 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error, 

2024-02-28 08:45:23,221 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-02-28 08:45:23,222 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed 

2024-02-28 08:45:23,223 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
        ... 2 more
 
2024-02-28 08:45:23,237 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
===============================================================================



Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
        ... 2 more
2024-02-28 08:45:23,260 INFO  [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is:
INSERT INTO "test_db"."public"."t_user_2" ("id", "name", "age", "update_time") VALUES (?, ?, ?, ?) ON CONFLICT ("id") DO UPDATE SET "id"=EXCLUDED."id", "name"=EXCLUDED."name", "age"=EXCLUDED."age", "update_time"=EXCLUDED."update_time"

2024-02-28 08:45:23,265 INFO  [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is:
DELETE FROM "test_db"."public"."t_user_2" WHERE "id" = ?

2024-02-28 08:45:23,267 INFO  [.JdbcMultiTableResourceManager] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - start close connection poolHikariPool-4
2024-02-28 08:45:23,268 INFO  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Shutdown initiated...
2024-02-28 08:45:23,272 WARN  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Interrupted during closing
java.lang.InterruptedException: null
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) ~[?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) ~[?:1.8.0_342]
        at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariPool.shutdown(HikariPool.java:255) ~[?:?]
        at org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource.close(HikariDataSource.java:351) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.ConnectionPoolManager.close(ConnectionPoolManager.java:66) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager.close(JdbcMultiTableResourceManager.java:42) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.close(MultiTableSinkWriter.java:251) ~[seatunnel-transforms-v2.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:153) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.4]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) [?:1.8.0_342]
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_342]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_342]
        at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) [?:1.8.0_342]
        at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_342]
        at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) [?:1.8.0_342]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) [?:1.8.0_342]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_342]
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) [?:1.8.0_342]
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) [?:1.8.0_342]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:660) [seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) [seatunnel-starter.jar:2.3.4]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
2024-02-28 08:45:23,899 INFO  [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher
2024-02-28 08:45:23,900 INFO  [i.d.j.JdbcConnection          ] [pool-64-thread-1] - Connection gracefully closed
2024-02-28 08:45:23,902 INFO  [i.d.j.JdbcConnection          ] [pool-65-thread-1] - Connection gracefully closed
2024-02-28 08:45:23,903 INFO  [i.d.j.JdbcConnection          ] [pool-66-thread-1] - Connection gracefully closed
2024-02-28 08:45:24,304 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
2024-02-28 08:45:24,316 INFO  [s.c.s.s.c.ClientExecuteCommand] [Thread-105] - run shutdown hook because get close signal

Zeta or Flink or Spark Version

zeta

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

2606090723 avatar Feb 28 '24 08:02 2606090723

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Mar 31 '24 00:03 github-actions[bot]

+1

xuchangqun avatar Apr 30 '24 03:04 xuchangqun

对源表delete时也有该问题,pgsql版本15, driver版本42.7.3

chess3cake avatar Jun 13 '24 08:06 chess3cake

Please check your database settings https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Postgre-CDC.md#using-dependency

Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:

Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding "wal_level = logical", restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:

ALTER SYSTEM SET wal_level TO 'logical';
SELECT pg_reload_conf();

-- Change the REPLICA policy of the specified table to FULL
ALTER TABLE your_table_name REPLICA IDENTITY FULL;

hailin0 avatar Jun 13 '24 08:06 hailin0

Please check your database settings https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Postgre-CDC.md#using-dependency

Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:

Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding "wal_level = logical", restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:

ALTER SYSTEM SET wal_level TO 'logical';
SELECT pg_reload_conf();

-- Change the REPLICA policy of the specified table to FULL
ALTER TABLE your_table_name REPLICA IDENTITY FULL;

I guess the settings are correct. lQLPJw2DkNURrWHMts0BvLBCKuSmcRfbXAZVynWC-TEA_444_182

chess3cake avatar Jun 13 '24 08:06 chess3cake

check

ALTER TABLE your_table_name REPLICA IDENTITY FULL;

hailin0 avatar Jun 13 '24 08:06 hailin0