camel-kafka-connector
camel-kafka-connector copied to clipboard
camel-cassandra-sink-kafka-connector - Codec not found for requested operation: [TIMESTAMP <-> java.lang.Long]
Hi @oscerd & @orpiske,
I'm trying to insert a timestamp value into one of my Cassandra columns as an epoch millisecond Long. This works when prepareStatements is set to false, however I'm receiving a codec not found error when it's set to true. Here is the error log:
2023-02-01 17:42:31,259 ERROR [my-topic-sink|task-0] Error encountered in task my-topic-sink-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter) [task-thread-my-topic-sink-0]
com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec not found for requested operation: [TIMESTAMP <-> java.lang.Long]
at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:609)
at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:95)
at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:92)
at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2276)
at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.get(LocalCache.java:3951)
at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.getOrLoad(LocalCache.java:3973)
at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957)
at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4963)
at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry.getCachedCodec(DefaultCodecRegistry.java:117)
at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.codecFor(CachingCodecRegistry.java:258)
at com.datastax.oss.driver.internal.core.data.ValuesHelper.encodePreparedValues(ValuesHelper.java:112)
at com.datastax.oss.driver.internal.core.cql.DefaultPreparedStatement.bind(DefaultPreparedStatement.java:159)
at org.apache.camel.component.cassandra.CassandraProducer.executePreparedStatement(CassandraProducer.java:132)
at org.apache.camel.component.cassandra.CassandraProducer.execute(CassandraProducer.java:104)
at org.apache.camel.component.cassandra.CassandraProducer.process(CassandraProducer.java:172)
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:214)
at org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
at org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:199)
at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
After some digging, I assume I somehow need to register the TIMESTAMP_MILLIS_UTC codec, found within ExtraTypeCodecs. How can this be done / is there an alternative to inserting timestamps via the connector. I've also tried using the built-in TimestampConverter SMT, to convert the epoch millis to a timestamp string and parsing it that way - however this returns a similar error: Codec not found for requested operation: [TIMESTAMP <-> java.lang.String]. It seems that a codec is also required for this? Any hints would be greatly appreciated.
For reference, here is the table I'm trying to insert to:
USE connect;
CREATE TABLE person (
id TEXT PRIMARY KEY,
name TEXT,
created_at TIMESTAMP
);
The connector properties:
name=my-topic-sink
topics=my-topic
tasks.max=1
connector.class=org.apache.camel.kafkaconnector.cassandrasink.CamelCassandrasinkSinkConnector
value.converter=org.apache.kafka.connect.storage.StringConverter
camel.kamelet.cassandra-sink.connectionHost=cassandra
camel.kamelet.cassandra-sink.connectionPort=9042
camel.kamelet.cassandra-sink.keyspace=connect
camel.kamelet.cassandra-sink.query=insert into person (id, name, created_at) values (?, ?, ?)
camel.kamelet.cassandra-sink.prepareStatements=true
And the message that is being consumed from Kafka:
["1","John",1670428382089]
I guess this will need to be done at component level in Camel: https://github.com/apache/camel/blob/main/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java#L139-L163
We'll need to open an issue on Camel core JIRA.
Created this https://issues.apache.org/jira/browse/CAMEL-18996
@oscerd Can we prioritise this bug please, as we've also came across the same error message: Codec not found for requested operation: [TIMESTAMP <-> _java.lang.Long]. It seems like it a simple case of adding codex support within CQL session builder?
It's not that straightforward. We need to do this on camel, release camel, update camel kamelets, release camel kamelets, update camel Kafka connector release camel Kafka connector.
I'll work on the bug and I'll do what I can, but it won't be tomorrow I guess.
@oscerd thank you soo much for picking this up.
The fix on Camel has been done: https://issues.apache.org/jira/browse/CAMEL-18996
Now we need to align the kamelet https://github.com/apache/camel-kamelets/issues/1256
@oscerd excellent, thank you for the quick response!