cp-docker-images
cp-docker-images copied to clipboard
Configuring s3-sink connector with kafka-connect on Amazon EC2 instance
My Ec2 instance, s3 bucket are in same region and I have explicitly defined keys in following way:
export AWS_ACCESS_KEY_ID= export AWS_SECRET_ACCESS_KEY=
I have build jar of kafka-connect-storage-cloud from source and supplied to kafka-connect by binding volume jar in folowing way.
docker run -d \
--name=kafka-connect \
--net=host \
-e CONNECT_BOOTSTRAP_SERVERS=localhost:29092 \
-e CONNECT_REST_PORT=28083 \
-e CONNECT_GROUP_ID="quickstart-avro" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-avro-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-avro-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-avro-status" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
-e CONNECT_LOG4J_LOGGERS=org.reflections=ERROR \
-e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars \
-v /home/utkarsh.saraf/quickstart/file:/tmp/quickstart \
-v /home/utkarsh.saraf/quickstart/jars:/etc/kafka-connect/jars \
confluentinc/cp-kafka-connect:3.3.0
Now i am configuring s3-connector via rest in following way:
curl -X POST \
-H "Content-Type: application/json" \
--data '{ "name": "s3-sink-docker", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": 1, "topics": "order", "s3.region": "ap-south-1", "s3.bucket.name": "test-order", "s3.part.size": 5242880, "flush.size": 10000, "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", "schema.compatibility": "NONE", "partition.duration.ms": 2000, "path.format": "YYYY/M/d/h", "locale": "US", "timezone": "UTC", "rotate.schedule.interval.ms": 60000 } }' \
http://localhost:28083/connectors
When i check status of connector it is giving following error:
_{"name":"s3-sink-docker","connector":{"state":"RUNNING","worker_id":"localhost:28083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain\n\tat io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:115)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain\n\tat com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:131)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1115)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:764)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:728)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)\n\tat com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)\n\tat com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4185)\n\tat com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4132)\n\tat com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1302)\n\tat com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1259)\n\tat io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:110)\n\tat io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:102)\n\t... 9 more\n","id":0,"worker_id":"localhost:28083"}]}_
- The issue here is that
export
doesn't expose variables to the internal Docker commands. - Why did you build from source? S3 Connect is already included in
cp-kafka-connect
-
CONNECT_PLUGIN_PATH
only applies to Confluent 4.0+
I suggest you volume mount a file at /root/.aws/credentials
. Otherwise, you would be exposing your credentials on the command line / docker run command.
https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-file-format
More info: https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
Instead of AWS S3, I want to connect my Kafka pod(s) to Minio. Where/How should I pass Minio URL?
@tirelibirefe Google is your friend https://blog.minio.io/journaling-kafka-messages-with-s3-connector-and-minio-83651a51045d?gi=5f2c7fde948d