debezium-timestamp-converter
debezium-timestamp-converter copied to clipboard
Converter class is not found upon connector creation.
First, thanks so much for your work on this! It definitely addresses a need we have, specifically in going from a Debezium CDC stream to a JDBC connector, where inserts are currently failing because the Debezium stream represents dates as Int32
days since the epoch.
Unfortunately, neither a colleague nor I have been successful in actually deploying the converter. I'll try to describe how to reproduce the issue without revealing proprietary information.
First, I have plain Apache Kafka 2.2.1 on my laptop, and run it in my terminal with:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
Because we add a few connectors and converters to our Kafka Connect deployment, we customize Debezium's Docker image to do so. Our Dockerfile has:
FROM debezium/connect-base:1.1
and the relevant line, after installing the MySQL connector:
RUN ... curl 'https://pkg.githubusercontent.com/256272685/822be400-856f-11ea-98b6-bd0196c9afa5?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20200605%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20200605T134430Z&X-Amz-Expires=300&X-Amz-Signature=b313e062d9f14b595ab07665681d676d08c640adba757fb43b0bc78042947201&X-Amz-SignedHeaders=host&actor_id=27828429&repo_id=0&response-content-disposition=filename%3DTimestampConverter-1.2.0-20200423.143415-1.jar&response-content-type=application%2Foctet-stream' -o $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-mysql/TimestampConverter-1.2.0.jar
Building the image, I then run it with:
docker run --network host -e BOOTSTRAP_SERVERS=localhost:9092 -e OFFSET_STORAGE_TOPIC=offset_store -e CONFIG_STORAGE_TOPIC=config_store connect:1.1-compstak &
This presents me with a lot of normal-looking log output. However, your converter doesn't appear to be found. And when I try to create a connector using your converter:
curl -s -X POST -H "Accept:application/json" -H "Content-Type:application/json" [email protected] http://172.26.0.1:8083/connectors/ | jq
The result is:
2020-06-08 17:08:10,602 ERROR || Uncaught exception in REST call to /connectors/ [org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper]
java.lang.IllegalArgumentException: Unable to find class oryanmoshe.kafka.connect.util.TimestampConverter
at io.debezium.config.Instantiator.getInstance(Instantiator.java:37)
at io.debezium.config.Configuration.getInstance(Configuration.java:1423)
at io.debezium.config.Configuration.getInstance(Configuration.java:1409)
at io.debezium.config.CommonConnectorConfig.lambda$getCustomConverters$1(CommonConnectorConfig.java:368)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1654)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at io.debezium.config.CommonConnectorConfig.getCustomConverters(CommonConnectorConfig.java:372)
at io.debezium.config.CommonConnectorConfig.<init>(CommonConnectorConfig.java:304)
at io.debezium.relational.RelationalDatabaseConnectorConfig.<init>(RelationalDatabaseConnectorConfig.java:272)
at io.debezium.connector.mysql.MySqlConnectorConfig.<init>(MySqlConnectorConfig.java:1010)
at io.debezium.connector.mysql.MySqlConnector.validate(MySqlConnector.java:97)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:313)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:745)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:742)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: oryanmoshe.kafka.connect.util.TimestampConverter
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at io.debezium.config.Instantiator.getInstance(Instantiator.java:32)
... 25 more
{
"error_code": 500,
"message": "Unable to find class oryanmoshe.kafka.connect.util.TimestampConverter"
}
I think this completely describes the process, but please let me know if I've overlooked anything.
Thanks so much!
Update: if I move the .jar
to just /kafka/connect
, against the explicit instructions in the README, I see:
2020-06-09 18:34:41,746 INFO || Loading plugin from: /kafka/connect/TimestampConverter-1.2.0.jar [org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader]
2020-06-09 18:34:41,750 INFO || Registered loader: PluginClassLoader{pluginLocation=file:/kafka/connect/TimestampConverter-1.2.0.jar} [org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader]
in the log output. What's striking is the lack of an "Added plugin" line as I see for all other plugins.
Something appears to be off with plugin discovery and registration, but it's proving difficult to determine just what.
@paul-snively you need to move the converter jar inside the connector that you're using. In my case I moved it to /kafka/connect/debezium-connector-postgres
. If you're using the mysql connector, the same applies.
Here's what that directory looks like:
CHANGELOG.md
COPYRIGHT.txt
LICENSE.txt
TimestampConverter.jar
debezium-connector-postgres-1.2.0.Final.jar
postgresql-42.2.12.jar
CONTRIBUTE.md
LICENSE-3rd-PARTIES.txt
README.md
debezium-api-1.2.0.Final.jar
debezium-core-1.2.0.Final.jar
protobuf-java-3.8.0.jar
Getting the similar error , Jar files is placed inside the connector directory
azureuser@zookeeper1:~$ cat timereg.json { "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "xxxxxx", "database.port": "5432", "database.user": "replicator", "database.password": "secret", "database.dbname" : "demo", "database.server.name": "debezium1", "schema.whitelist": "public", "plugin.name": "pgoutput", "snapshot.mode": "always", "tombstones.on.delete": "false", "time.precision.mode": "connect", "decimal.handling.mode": "double", "converters": "timestampConverter", "timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter"
} } azureuser@zookeeper1:~$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @timereg.json HTTP/1.1 500 Server Error Cache-Control: must-revalidate,no-cache,no-store Content-Type: application/json Content-Length: 137 Connection: close Server: Jetty(9.4.24.v20191120)
{ "servlet":"org.glassfish.jersey.servlet.ServletContainer-248deced", "message":"Request failed.", "url":"/connectors/", "status":"500"
azureuser@zookeeper1:~$ ls -lrth kafka/connectors/debezium-connector-postgres/TimestampConverter-1.2.0-SNAPSHOT.jar -rw-rw-r-- 1 azureuser azureuser 9.3M Dec 14 09:41 kafka/connectors/debezium-connector-postgres/TimestampConverter-1.2.0-SNAPSHOT.jar
I confirm this problem. My solution is to rename the class. As I understand it, there is a conflict with the org.apache.kafka.connect.transforms.TimestampConverter
after the renaming - everything worked.
https://docs.confluent.io/5.5.1/connect/transforms/timestampconverter.html
Anyone able to fix this? I am facing same issue. I have renamed the class to CustomDatetimeConverter.jar. Still facing issue.
below are the files in debezium-connector-postgres,
CHANGELOG.md
CustomDatetimeConverter.jar
README.md
debezium-api-1.9.0.Beta1.jar
failureaccess-1.0.1.jar postgresql-42.3.2.jar
CONTRIBUTE.md
LICENSE-3rd-PARTIES.txt
README_JA.md
debezium-connector-postgres-1.9.0.Beta1.jar
guava-30.0-jre.jar
protobuf-java-3.19.2.jar
COPYRIGHT.txt
LICENSE.txt
README_ZH.md
debezium-core-1.9.0.Beta1.jar
postgres.json