seatunnel
seatunnel copied to clipboard
postgreCDC TO hive
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
SeaTunnel Version
2.3.4
SeaTunnel Config
env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 15000
}
source {
Postgres-CDC {
result_table_name ="fake"
base-url="jdbc:postgresql:///test"
source.reader.close.timeout = 120000
username = ""
password = ""
driver = "org.postgresql.Driver"
slot.name = "seatunnel1716623306008"
database-names = [""]
schema-names = [""]
table-names = [".."]
}
}
transform {
Sql{
source_table_name = "fake"
query = "select id,name,create_time,'$[yyyy-MM-dd]' as pt from fake "
result_table_name = "fake1"
}
FieldMapper {
source_table_name = "fake1"
result_table_name = "fake2"
field_mapper = {
id=id
name=name
create_time=create_time
pt = pt
}
}
}
sink {
Hive {
source_table_name = "fake2"
table_name ="."
}
}
Running Command
env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 15000
}
source {
Postgres-CDC {
result_table_name ="fake"
base-url="jdbc:postgresql:///test"
source.reader.close.timeout = 120000
username = ""
password = ""
driver = "org.postgresql.Driver"
slot.name = "seatunnel1716623306008"
database-names = [""]
schema-names = [""]
table-names = [".."]
}
}
transform {
Sql{
source_table_name = "fake"
query = "select id,name,create_time,'$[yyyy-MM-dd]' as pt from fake "
result_table_name = "fake1"
}
FieldMapper {
source_table_name = "fake1"
result_table_name = "fake2"
field_mapper = {
id=id
name=name
create_time=create_time
pt = pt
}
}
}
sink {
Hive {
source_table_name = "fake2"
table_name ="."
}
}
Error Exception
postGreCDC to hive:when job start, source data insert or delete is ok,,but once source data updated like update table set name ='name2' where id =2 ,it will appear
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
Zeta or Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
I encountered the same exception
seatunnel version 2.3.5
java.lang.NullPointerException
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:198)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:150)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:101)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:109)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:111)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:113)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:111)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)