seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

postgreCDC TO hive

Open yqyqyqyqyqyq opened this issue 1 year ago • 1 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)

SeaTunnel Version

2.3.4

SeaTunnel Config

env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 15000
}

source {
Postgres-CDC {
result_table_name ="fake"
base-url="jdbc:postgresql:///test"
source.reader.close.timeout = 120000
username = ""
password = ""
driver = "org.postgresql.Driver"
slot.name = "seatunnel1716623306008"
database-names = [""]
schema-names = [""]
table-names = [".."]
}
}

transform {
Sql{
source_table_name = "fake"
query = "select id,name,create_time,'$[yyyy-MM-dd]' as pt from fake "
result_table_name = "fake1"
}
FieldMapper {
source_table_name = "fake1"
result_table_name = "fake2"
field_mapper = {
id=id
name=name
create_time=create_time
pt = pt
}
}
}

sink {
Hive {
source_table_name = "fake2"
table_name ="."


}
}

Running Command

env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 15000
}

source {
Postgres-CDC {
result_table_name ="fake"
base-url="jdbc:postgresql:///test"
source.reader.close.timeout = 120000
username = ""
password = ""
driver = "org.postgresql.Driver"
slot.name = "seatunnel1716623306008"
database-names = [""]
schema-names = [""]
table-names = [".."]
}
}

transform {
Sql{
source_table_name = "fake"
query = "select id,name,create_time,'$[yyyy-MM-dd]' as pt from fake "
result_table_name = "fake1"
}
FieldMapper {
source_table_name = "fake1"
result_table_name = "fake2"
field_mapper = {
id=id
name=name
create_time=create_time
pt = pt
}
}
}

sink {
Hive {
source_table_name = "fake2"
table_name ="."


}
}

Error Exception

postGreCDC  to  hive:when job start, source  data insert or delete is ok,,but once source data updated  like update table set name ='name2' where id =2 ,it will appear
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

yqyqyqyqyqyq avatar May 25 '24 08:05 yqyqyqyqyqyq

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Jun 25 '24 00:06 github-actions[bot]

I encountered the same exception

seatunnel version 2.3.5

java.lang.NullPointerException
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:198)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:150)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:101)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:109)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:111)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:113)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:111)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Asura7969 avatar Jul 05 '24 01:07 Asura7969