debezium-server-iceberg icon indicating copy to clipboard operation
debezium-server-iceberg copied to clipboard

"java.lang.RuntimeException: Failed to convert timestamp value, field: __source_ts_ms value: 0" with MongoDB source

Open julianpark90 opened this issue 3 months ago • 9 comments

I'm encountering a problem where the __source_ts_ms field from MongoDB consistently returns 0, resulting in the runtime exception.

Why does __source_ts_ms always return 0, and how can I confirm the validity of MongoDB timestamps? Also, it's odd that a value of 0 fails to match isLong() or isTextual(), resulting in exceptions.

Any advice on managing the constant 0 in __source_ts_ms would be helpful.

julianpark90 avatar Mar 11 '24 20:03 julianpark90

@julianpark90 could you please share stack trace and your config?

ismailsimsek avatar Mar 12 '24 07:03 ismailsimsek

@ismailsimsek The provided includes my configuration and the error trace. The issue occurs only when the program runs with pre-existing data in MongoDB, but a ts_ms value of 0 should not cause this problem, I guess.

# Use iceberg sink
debezium.sink.type=iceberg

# Iceberg sink config
debezium.sink.iceberg.table-prefix=dbz_
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.write.format.default=parquet
# Hadoop catalog, you can use other catalog supported by iceberg as well

# S3 config with glue catalog
debezium.sink.iceberg.warehouse=s3://julianpark/database/
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.glue.id=865306278000
debezium.sink.iceberg.table-namespace=julianpark

# enable event schemas - mandatory
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# saving debezium state data to destination, iceberg tables
# see https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-namespace=julianpark
debezium.source.offset.storage.iceberg.table-name=dbz_offset_storage_mongo
# see https://debezium.io/documentation/reference/stable/development/engine.html#database-history-properties
debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.catalog-name=julianpark
debezium.source.schema.history.internal.iceberg.table-name=dbz_schema_history_storage_mongo

# MongoDB source
debezium.source.poll.interval.ms=500
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.tasks.max=1
debezium.source.mongodb.connection.string=mongodb://172.17.0.1:27117,172.17.0.1:27118/?authSource=admin
debezium.source.database.include.list=test_database
#debezium.source.collection.include.list=inventory.customers
debezium.source.offset.storage.file.filename=/tmp/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.mongodb.connection.mode=sharded
debezium.source.topic.prefix=topic_prefix

# use outbox for mongodb
debezium.transforms=unwrap,renamekeyfield
debezium.transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
debezium.transforms.renamekeyfield.type=org.apache.kafka.connect.transforms.ReplaceField$Key
debezium.transforms.renamekeyfield.renames=id:_id
debezium.transforms.unwrap.add.fields=op,db,source.ts_ms
debezium.transforms.unwrap.delete.tombstone.handling.mode=rewrite
debezium.tombstones.on.delete=false
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
debezium.sink.iceberg.allow-field-addition=false

debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.source.max.batch.size=20480
debezium.source.max.queue.size=81920
debezium.sink.batch.batch-size-wait.max-wait-ms=12000
debezium.sink.batch.batch-size-wait.wait-interval-ms=1000

# ############ SET LOG LEVELS ############
quarkus.log.level=DEBUG
quarkus.log.console.json=false
quarkus.log.file.enable=true
quarkus.log.file.path=application.log
quarkus.log.file.format=%d{yyyy-MM-dd HH:mm:ss} %-5p [%c] (%t) %s%e%n
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
# port
quarkus.http.port=9000
2024-03-13 04:10:21,103 INFO  [io.deb.pip.sig.SignalProcessor] (pool-7-thread-1) SignalProcessor stopped
2024-03-13 04:10:21,104 INFO  [io.deb.ser.DefaultServiceRegistry] (pool-7-thread-1) Debezium ServiceRegistry stopped.
2024-03-13 04:10:21,104 DEBUG [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) Setting task state to 'STOPPED', previous state was 'RUNNING'
2024-03-13 04:10:21,104 DEBUG [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Task stopped
2024-03-13 04:10:21,105 INFO  [io.deb.ser.ice.off.IcebergOffsetBackingStore] (pool-7-thread-1) Stopped IcebergOffsetBackingStore table:flo_reco_dev.dbz_offset_storage_mongo
2024-03-13 04:10:21,105 INFO  [io.deb.con.mon.MongoDbConnector] (pool-7-thread-1) Stopping MongoDB connector
2024-03-13 04:10:21,105 INFO  [io.deb.con.mon.MongoDbConnector] (pool-7-thread-1) Stopped MongoDB connector
2024-03-13 04:10:21,105 DEBUG [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector stopped
2024-03-13 04:10:21,105 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Failed to convert timestamp value, field:
__source_ts_ms value: 0', error = 'java.lang.RuntimeException: Failed to convert timestamp value, field: __source_ts_ms value: 0': java.lang.RuntimeException: Failed to convert timestamp value, field: __source_ts_ms value: 0
	at io.debezium.server.iceberg.IcebergChangeEvent.jsonValToIcebergVal(IcebergChangeEvent.java:157)
	at io.debezium.server.iceberg.IcebergChangeEvent.asIcebergRecord(IcebergChangeEvent.java:85)
	at io.debezium.server.iceberg.IcebergChangeEvent.asIcebergRecord(IcebergChangeEvent.java:71)
	at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTablePerSchema(IcebergTableOperator.java:173)
	at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:146)
	at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:167)
	at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
	at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:728)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
	at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
	at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

2024-03-13 04:10:21,106 DEBUG [io.qua.run.Application] (main) Stopping application
2024-03-13 04:10:21,106 DEBUG [io.qua.run.shu.ShutdownRecorder] (main) Attempting to gracefully shutdown.
2024-03-13 04:10:21,117 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2024-03-13 04:10:21,117 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2024-03-13 04:10:21,131 DEBUG [io.qua.arc.impl] (main) ArC DI container shut down
2024-03-13 04:10:21,142 DEBUG [io.qua.thread-pool] (main) loop: 1, remaining: 60000000000, intervalRemaining: 5000000000, interruptRemaining: 10000000000
2024-03-13 04:10:21,142 INFO  [io.quarkus] (main) debezium-server-iceberg-dist stopped in 0.036s

I've encountered another problem directly linked to issue DBZ-6725, PR. I'm currently using MongoDB versions 4.4 and 5.0, and upgrading to version 6.0 is not feasible for me. Here's the stack trace for reference.

2024-03-13 04:25:05,557 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: null', error = 'java.lang.NullPointerException': java.lang.NullPointerException
	at org.apache.iceberg.parquet.ParquetValueWriters$StringWriter.write(ParquetValueWriters.java:336)
	at org.apache.iceberg.parquet.ParquetValueWriters$StringWriter.write(ParquetValueWriters.java:324)
	at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
	at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
	at org.apache.iceberg.deletes.EqualityDeleteWriter.write(EqualityDeleteWriter.java:67)
	at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:388)
	at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:371)
	at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
	at org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.delete(BaseTaskWriter.java:174)
	at io.debezium.server.iceberg.tableoperator.BaseDeltaTaskWriter$RowDataDeltaWriter.delete(BaseDeltaTaskWriter.java:66)
	at io.debezium.server.iceberg.tableoperator.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:54)
	at io.debezium.server.iceberg.tableoperator.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:16)
	at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTablePerSchema(IcebergTableOperator.java:173)
	at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:146)
	at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:167)
	at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
	at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:728)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
	at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
	at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

Also I've tried with other options(debezium.sink.iceberg.upsert-keep-deletes=false) than rewrite mode then I get the below.

2024-03-13 04:30:19,586 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Unexpected data type 'null'', error = 'io.debezium.DebeziumException: Unexpected data type 'null'': io.debezium.DebeziumException: Unexpected data type 'null'
	at io.debezium.server.BaseChangeConsumer.getBytes(BaseChangeConsumer.java:77)
	at io.debezium.server.iceberg.IcebergChangeConsumer.lambda$handleBatch$0(IcebergChangeConsumer.java:153)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
	at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:162)
	at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
	at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:728)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
	at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
	at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

Is there a possible workaround for this issue by adjusting configurations? Any help would be greatly appreciated.

julianpark90 avatar Mar 13 '24 04:03 julianpark90

@julianpark90 latest release now handles 0 __source_ts_ms values.

as far as i see for the delete events your best option is to drop the, ignore them. or use version 6.0 for dropping them see the config option

ismailsimsek avatar Mar 13 '24 11:03 ismailsimsek

@ismailsimsek Thank you for the patch. It seems to be working fine.

As for the mongodb deletion, what if we do something like

((ObjectNode) value).putIfAbsent(schema.identifierFieldNames().iterator().next(), key);

https://github.com/memiiso/debezium-server-iceberg/blob/1558a7948a95df5663d2547f00c1699bb5edd4ac/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java#L70-L72 so that if id is missing from the data to be written, in this case strictly for the mongodb deletion, it fixes the value by adding the id

julianpark90 avatar Mar 16 '24 16:03 julianpark90

@julianpark90 I think this fix should be added to Debezium project, for mongodb 4.x and 5.x debezium should generate delete events like following payload. this will solve the issue and be much close to the debezium standard

"payload": {
        "id": 1004,
        "__op": "d"
        "__deleted": true
}

ismailsimsek avatar Mar 16 '24 21:03 ismailsimsek

@ismailsimsek, I faced the same problem about deleting events. With insert and update operation, the connector works well But, if delete events, connector raise error as below:

cdc  | 2024-03-15 04:12:02,323 INFO  [io.deb.con.mon.MongoDbConnector] (pool-7-thread-1) Stopping MongoDB connector
cdc  | 2024-03-15 04:12:02,323 INFO  [io.deb.con.mon.MongoDbConnector] (pool-7-thread-1) Stopped MongoDB connector
cdc  | 2024-03-15 04:12:02,324 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Debezium key/identifier field `_id` not found in event columns!', error = 'org.apache.iceberg.exceptions.ValidationException: Debezium key/identifier field `_id` not found in event columns!': org.apache.iceberg.exceptions.ValidationException: Debezium key/identifier field `_id` not found in event columns!
cdc  |  at io.debezium.server.iceberg.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:344)
cdc  |  at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:155)
cdc  |  at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:167)
cdc  |  at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
cdc  |  at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:728)
cdc  |  at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
cdc  |  at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
cdc  |  at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
cdc  |  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
cdc  |  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
cdc  |  at java.base/java.lang.Thread.run(Unknown Source)
cdc  | 
cdc  | 2024-03-15 04:12:02,347 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
cdc  | 2024-03-15 04:12:02,347 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
cdc  | 2024-03-15 04:12:02,374 INFO  [io.quarkus] (main) debezium-server-iceberg-dist stopped in 0.049s
cdc exited with code 1

If I set the configuration delete.tombstone.handling.mode to "drop" as you suggested, deleting events will be skipped. This could result in duplicated data in the target table. Is there any other way to handle this problem

duc-dn avatar Mar 21 '24 10:03 duc-dn

@julianpark90 I think this fix should be added to Debezium project, for mongodb 4.x and 5.x debezium should generate delete events like following payload. this will solve the issue and be much close to the debezium standard

"payload": {
        "id": 1004,
        "__op": "d"
        "__deleted": true
}

@duc-dn i believe this should be added to Debezium project. i don't see any other way.

ismailsimsek avatar Mar 21 '24 10:03 ismailsimsek

@duc-dn @julianpark90 For the delete events issue, i created ticket for debezium project https://issues.redhat.com/browse/DBZ-7695

feel free to comment or implement it.. FYI

ismailsimsek avatar Mar 21 '24 14:03 ismailsimsek

great, thanks @ismailsimsek

duc-dn avatar Mar 21 '24 14:03 duc-dn