kafka-connect-jdbc
kafka-connect-jdbc copied to clipboard
JDBC Sink Connector getting 401 unauthorized while connecting confluent cloud schema registry
I am running connect from docker-compose which connects to a confluent cloud instance. My topic is having messages in Avro with the schema defined. I am trying to run the JDBCK sink connector to dump the topic data into PostgreSQL DB. While the same set up in a dockerized Kafka cluster runs perfectly, it is getting an unauthorized error when trying to connect the schema registry In the confluent cloud.
I have followed all the required steps as mentioned here
https://rmoff.net/2021/01/11/running-a-self-managed-kafka-connect-worker-for-confluent-cloud/
and also explained in this video
https://www.youtube.com/watch?v=GOCpD2aJyTE
docker-compose
connect:
image: confluentinc/cp-server-connect:latest
#image: cnfldemos/kafka-connect-datagen:0.6.0-7.3.0
hostname: connect
container_name: connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
CONNECT_REST_ADVERTISED_HOST_NAME: "self-managed-connect"
CONNECT_GROUP_ID: "self-managed-connect"
CONNECT_CONFIG_STORAGE_TOPIC: "self-managed-connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "self-managed-connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "self-managed-connect-status"
CONNECT_REPLICATION_FACTOR: 3
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=DEBUG,org.reflections=ERROR'
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.3.0.jar
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM : "https"
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
command:
- bash
- -c
- |
echo "Installing Kafka Confluent JDBC Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.0.0
echo "Installing Kafka Confluent S3 Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
echo "Waiting for Kafka Connect to start listening on 0.0.0.0:8083 ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://0.0.0.0:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
sleep infinity
Environment values for docker compose
BOOTSTRAP_SERVERS="pkc-xrnwx.asia-south2.gcp.confluent.cloud:9092"
SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='KYKEFWOILSILAB55' password='D/xiKbDaGwt6vOP03HRvrswK4H4Aii674IKrfBo+9F3A57mmBp5ryEpVAt/9TUtl';"
SCHEMA_REGISTRY_URL="https://psrc-nx5kv.australia-southeast1.gcp.confluent.cloud"
BASIC_AUTH_CREDENTIALS_SOURCE="USER_INFO"
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="'SQ3VYMMIK7W3HCIB':'PwuON9QhE5HE8XdoohebQEfXiIDHVAG973UXx2ZXNzdTpYDifxf/sFwyr32iI8au'"
I have also tried to supply the schema registry credential in this format but that did not help as well.
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=SQ3VYMMIK7W3HCIB:PwuON9QhE5HE8XdoohebQEfXiIDHVAG973UXx2ZXNzdTpYDifxf/sFwyr32iI8au
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="SQ3VYMMIK7W3HCIB:PwuON9QhE5HE8XdoohebQEfXiIDHVAG973UXx2ZXNzdTpYDifxf/sFwyr32iI8au"
The same schema registry credential works fine when using from a curl command or from a python avro producer
I have attached the security keys here as the cluster does not exist anymore and it might help to understand if I am providing the secrets in wrong format.
Curl Payload used to create connector
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "jdbc-media-events-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"connection.url": "jdbc:xxxxxxx://xxxxxxxxxxx.xxxxxxxxxxx.ap-south-1.rds.amazonaws.com:5432/xxxxxxxx",
"connection.user": "xxxxxxxxx",
"connection.password": "xxxxxx",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://psrc-p6o1m.eu-central-1.aws.confluent.cloud",
"value.converter.schemas.enable": "true",
"topics": "media_events",
"table.name.format": "${topic} ",
"auto.create": "true",
"poll.interval.ms": 1000,
"pk.mode": "kafka",
"pk.fields": "",
"insert.mode": "INSERT",
"transforms": "renameTopic",
"transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.renameTopic.regex": "media_events",
"transforms.renameTopic.replacement": "media_events"
}
}'
Exception in connect
connect | [2023-01-14 20:33:31,332] INFO [Consumer clientId=connector-consumer-jdbc-media-events-sink-0, groupId=connect-jdbc-media-events-sink] Adding newly assigned partitions: avro_media-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect | [2023-01-14 20:33:31,429] INFO [Consumer clientId=connector-consumer-jdbc-media-events-sink-0, groupId=connect-jdbc-media-events-sink] Found no committed offset for partition avro_media-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect | [2023-01-14 20:33:33,043] ERROR WorkerSinkTask{id=jdbc-media-events-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:519)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:494)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: org.apache.kafka.common.config.ConfigException: Failed to access Avro data from topic avro_media : Unauthorized; error code: 401
connect | at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:129)
connect | at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:519)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
connect | ... 13 more
connect | [2023-01-14 20:33:33,044] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
connect | [2023-01-14 20:33:33,044] INFO [Consumer clientId=connector-consumer-jdbc-media-events-sink-0, groupId=connect-jdbc-media-events-sink] Revoke previously assigned partitions avro_media-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect | [2023-01-14 20:33:33,044] INFO [Consumer clientId=connector-consumer-jdbc-media-events-sink-0, groupId=connect-jdbc-media-events-sink] Member connector-consumer-jdbc-media-events-sink-0-564e74dc-455e-43a4-8321-9ad7551ffeb9 sending LeaveGroup request to coordinator b0-pkc-xrnwx.asia-south2.gcp.confluent.cloud:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
how do you solve the problem?
Bumping. I too am curious how this was solved.
Another bump. I'm experiencing the same issue. Any resolution?
Has anyone found a solution to this
If it helps I was able to get things working by putting in schema registry settings for both value and key: CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter" CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter" CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true" CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO