kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

JDBC Sink Connector getting 401 unauthorized while connecting confluent cloud schema registry

Open rakibmirza opened this issue 2 years ago • 5 comments

I am running connect from docker-compose which connects to a confluent cloud instance. My topic is having messages in Avro with the schema defined. I am trying to run the JDBCK sink connector to dump the topic data into PostgreSQL DB. While the same set up in a dockerized Kafka cluster runs perfectly, it is getting an unauthorized error when trying to connect the schema registry In the confluent cloud.

I have followed all the required steps as mentioned here

https://rmoff.net/2021/01/11/running-a-self-managed-kafka-connect-worker-for-confluent-cloud/

and also explained in this video

https://www.youtube.com/watch?v=GOCpD2aJyTE

docker-compose

 connect:
    image: confluentinc/cp-server-connect:latest
    #image: cnfldemos/kafka-connect-datagen:0.6.0-7.3.0
    hostname: connect
    container_name: connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
      CONNECT_REST_ADVERTISED_HOST_NAME: "self-managed-connect"
      CONNECT_GROUP_ID: "self-managed-connect"
      CONNECT_CONFIG_STORAGE_TOPIC: "self-managed-connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "self-managed-connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "self-managed-connect-status"
      CONNECT_REPLICATION_FACTOR: 3
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=DEBUG,org.reflections=ERROR'
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.3.0.jar
      CONNECT_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_RETRY_BACKOFF_MS: "500"
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM : "https"
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_SASL_MECHANISM: PLAIN
      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN    
    command:
      - bash
      - -c
      - |
        echo "Installing Kafka Confluent JDBC Connector"
        confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.0.0
        echo "Installing Kafka Confluent S3 Connector"
        confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        echo "Waiting for Kafka Connect to start listening on 0.0.0.0:8083 ⏳"
        while : ; do
          curl_status=$$(curl -s -o /dev/null -w %{http_code} http://0.0.0.0:8083/connectors)
          echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
          if [ $$curl_status -eq 200 ] ; then
          break
          fi
          sleep 5
        done
        sleep infinity

Environment values for docker compose

BOOTSTRAP_SERVERS="pkc-xrnwx.asia-south2.gcp.confluent.cloud:9092"
SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='KYKEFWOILSILAB55' password='D/xiKbDaGwt6vOP03HRvrswK4H4Aii674IKrfBo+9F3A57mmBp5ryEpVAt/9TUtl';"
SCHEMA_REGISTRY_URL="https://psrc-nx5kv.australia-southeast1.gcp.confluent.cloud"
BASIC_AUTH_CREDENTIALS_SOURCE="USER_INFO"
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="'SQ3VYMMIK7W3HCIB':'PwuON9QhE5HE8XdoohebQEfXiIDHVAG973UXx2ZXNzdTpYDifxf/sFwyr32iI8au'"

I have also tried to supply the schema registry credential in this format but that did not help as well.

SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=SQ3VYMMIK7W3HCIB:PwuON9QhE5HE8XdoohebQEfXiIDHVAG973UXx2ZXNzdTpYDifxf/sFwyr32iI8au

SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="SQ3VYMMIK7W3HCIB:PwuON9QhE5HE8XdoohebQEfXiIDHVAG973UXx2ZXNzdTpYDifxf/sFwyr32iI8au"

The same schema registry credential works fine when using from a curl command or from a python avro producer

I have attached the security keys here as the cluster does not exist anymore and it might help to understand if I am providing the secrets in wrong format.

Curl Payload used to create connector


curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "jdbc-media-events-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "connection.url": "jdbc:xxxxxxx://xxxxxxxxxxx.xxxxxxxxxxx.ap-south-1.rds.amazonaws.com:5432/xxxxxxxx",
        "connection.user": "xxxxxxxxx",
        "connection.password": "xxxxxx",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "https://psrc-p6o1m.eu-central-1.aws.confluent.cloud",
        "value.converter.schemas.enable": "true",
        "topics": "media_events",
        "table.name.format": "${topic} ",
        "auto.create": "true",
        "poll.interval.ms": 1000,
        "pk.mode": "kafka",
        "pk.fields": "",
        "insert.mode": "INSERT",
        "transforms": "renameTopic",
        "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.renameTopic.regex": "media_events",
        "transforms.renameTopic.replacement": "media_events"
    }
}'

Exception in connect


connect    | [2023-01-14 20:33:31,332] INFO [Consumer clientId=connector-consumer-jdbc-media-events-sink-0, groupId=connect-jdbc-media-events-sink] Adding newly assigned partitions: avro_media-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect    | [2023-01-14 20:33:31,429] INFO [Consumer clientId=connector-consumer-jdbc-media-events-sink-0, groupId=connect-jdbc-media-events-sink] Found no committed offset for partition avro_media-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect    | [2023-01-14 20:33:33,043] ERROR WorkerSinkTask{id=jdbc-media-events-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect    | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect    | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)
connect    | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
connect    | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:519)
connect    | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:494)
connect    | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
connect    | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
connect    | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
connect    | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
connect    | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
connect    | 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect    | 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect    | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect    | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect    | 	at java.base/java.lang.Thread.run(Thread.java:829)
connect    | Caused by: org.apache.kafka.common.config.ConfigException: Failed to access Avro data from topic avro_media : Unauthorized; error code: 401
connect    | 	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:129)
connect    | 	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
connect    | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:519)
connect    | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
connect    | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
connect    | 	... 13 more
connect    | [2023-01-14 20:33:33,044] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
connect    | [2023-01-14 20:33:33,044] INFO [Consumer clientId=connector-consumer-jdbc-media-events-sink-0, groupId=connect-jdbc-media-events-sink] Revoke previously assigned partitions avro_media-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect    | [2023-01-14 20:33:33,044] INFO [Consumer clientId=connector-consumer-jdbc-media-events-sink-0, groupId=connect-jdbc-media-events-sink] Member connector-consumer-jdbc-media-events-sink-0-564e74dc-455e-43a4-8321-9ad7551ffeb9 sending LeaveGroup request to coordinator b0-pkc-xrnwx.asia-south2.gcp.confluent.cloud:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

rakibmirza avatar Jan 14 '23 20:01 rakibmirza

how do you solve the problem?

vl-kp avatar Aug 24 '23 09:08 vl-kp

Bumping. I too am curious how this was solved.

KyleL1988 avatar Dec 13 '23 21:12 KyleL1988

Another bump. I'm experiencing the same issue. Any resolution?

djsalty avatar Jan 05 '24 16:01 djsalty

Has anyone found a solution to this

jdarrahSNT avatar Apr 07 '24 23:04 jdarrahSNT

If it helps I was able to get things working by putting in schema registry settings for both value and key: CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter" CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter" CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true" CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO

jdarrahSNT avatar Apr 08 '24 23:04 jdarrahSNT