camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

camel-cassandra-sink-kafka-connector - Codec not found for requested operation: [TIMESTAMP <-> java.lang.Long]

Open camwardy opened this issue 2 years ago • 8 comments

Hi @oscerd & @orpiske,

I'm trying to insert a timestamp value into one of my Cassandra columns as an epoch millisecond Long. This works when prepareStatements is set to false, however I'm receiving a codec not found error when it's set to true. Here is the error log:

2023-02-01 17:42:31,259 ERROR [my-topic-sink|task-0] Error encountered in task my-topic-sink-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter) [task-thread-my-topic-sink-0]
com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec not found for requested operation: [TIMESTAMP <-> java.lang.Long]
        at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:609)
        at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:95)
        at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:92)
        at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
        at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2276)
        at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
        at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
        at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.get(LocalCache.java:3951)
        at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.getOrLoad(LocalCache.java:3973)
        at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957)
        at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4963)
        at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry.getCachedCodec(DefaultCodecRegistry.java:117)
        at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.codecFor(CachingCodecRegistry.java:258)
        at com.datastax.oss.driver.internal.core.data.ValuesHelper.encodePreparedValues(ValuesHelper.java:112)
        at com.datastax.oss.driver.internal.core.cql.DefaultPreparedStatement.bind(DefaultPreparedStatement.java:159)
        at org.apache.camel.component.cassandra.CassandraProducer.executePreparedStatement(CassandraProducer.java:132)
        at org.apache.camel.component.cassandra.CassandraProducer.execute(CassandraProducer.java:104)
        at org.apache.camel.component.cassandra.CassandraProducer.process(CassandraProducer.java:172)
        at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
        at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
        at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
        at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
        at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
        at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
        at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
        at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:214)
        at org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
        at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
        at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
        at org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:199)
        at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
        at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
        at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

After some digging, I assume I somehow need to register the TIMESTAMP_MILLIS_UTC codec, found within ExtraTypeCodecs. How can this be done / is there an alternative to inserting timestamps via the connector. I've also tried using the built-in TimestampConverter SMT, to convert the epoch millis to a timestamp string and parsing it that way - however this returns a similar error: Codec not found for requested operation: [TIMESTAMP <-> java.lang.String]. It seems that a codec is also required for this? Any hints would be greatly appreciated.


For reference, here is the table I'm trying to insert to:

USE connect;
CREATE TABLE person (
    id          TEXT PRIMARY KEY, 
    name        TEXT, 
    created_at  TIMESTAMP
);

The connector properties:

name=my-topic-sink
topics=my-topic
tasks.max=1
connector.class=org.apache.camel.kafkaconnector.cassandrasink.CamelCassandrasinkSinkConnector
value.converter=org.apache.kafka.connect.storage.StringConverter

camel.kamelet.cassandra-sink.connectionHost=cassandra
camel.kamelet.cassandra-sink.connectionPort=9042
camel.kamelet.cassandra-sink.keyspace=connect
camel.kamelet.cassandra-sink.query=insert into person (id, name, created_at) values (?, ?, ?)
camel.kamelet.cassandra-sink.prepareStatements=true

And the message that is being consumed from Kafka:

["1","John",1670428382089]

camwardy avatar Feb 01 '23 17:02 camwardy

I guess this will need to be done at component level in Camel: https://github.com/apache/camel/blob/main/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java#L139-L163

oscerd avatar Feb 01 '23 18:02 oscerd

We'll need to open an issue on Camel core JIRA.

oscerd avatar Feb 01 '23 18:02 oscerd

Created this https://issues.apache.org/jira/browse/CAMEL-18996

oscerd avatar Feb 01 '23 18:02 oscerd

@oscerd Can we prioritise this bug please, as we've also came across the same error message: Codec not found for requested operation: [TIMESTAMP <-> _java.lang.Long]. It seems like it a simple case of adding codex support within CQL session builder?

aozmen121 avatar Feb 02 '23 08:02 aozmen121

It's not that straightforward. We need to do this on camel, release camel, update camel kamelets, release camel kamelets, update camel Kafka connector release camel Kafka connector.

I'll work on the bug and I'll do what I can, but it won't be tomorrow I guess.

oscerd avatar Feb 02 '23 08:02 oscerd

@oscerd thank you soo much for picking this up.

aozmen121 avatar Feb 02 '23 08:02 aozmen121

The fix on Camel has been done: https://issues.apache.org/jira/browse/CAMEL-18996

Now we need to align the kamelet https://github.com/apache/camel-kamelets/issues/1256

oscerd avatar Feb 02 '23 13:02 oscerd

@oscerd excellent, thank you for the quick response!

camwardy avatar Feb 02 '23 14:02 camwardy