cp-docker-images icon indicating copy to clipboard operation
cp-docker-images copied to clipboard

Configuring s3-sink connector with kafka-connect on Amazon EC2 instance

Open utkarshsaraf19 opened this issue 6 years ago • 3 comments

My Ec2 instance, s3 bucket are in same region and I have explicitly defined keys in following way:

export AWS_ACCESS_KEY_ID= export AWS_SECRET_ACCESS_KEY=

I have build jar of kafka-connect-storage-cloud from source and supplied to kafka-connect by binding volume jar in folowing way.

docker run -d \
  --name=kafka-connect \
  --net=host \
  -e CONNECT_BOOTSTRAP_SERVERS=localhost:29092 \
  -e CONNECT_REST_PORT=28083 \
  -e CONNECT_GROUP_ID="quickstart-avro" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-avro-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-avro-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="quickstart-avro-status" \
  -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081" \
  -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081" \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
  -e CONNECT_LOG4J_LOGGERS=org.reflections=ERROR \
  -e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars \
  -v /home/utkarsh.saraf/quickstart/file:/tmp/quickstart \
  -v /home/utkarsh.saraf/quickstart/jars:/etc/kafka-connect/jars \
  confluentinc/cp-kafka-connect:3.3.0

Now i am configuring s3-connector via rest in following way:

curl -X POST \
  -H "Content-Type: application/json" \
  --data '{ "name": "s3-sink-docker", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": 1, "topics": "order", "s3.region": "ap-south-1", "s3.bucket.name": "test-order", "s3.part.size": 5242880, "flush.size": 10000, "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",  "schema.compatibility": "NONE", "partition.duration.ms": 2000, "path.format": "YYYY/M/d/h", "locale": "US", "timezone": "UTC", "rotate.schedule.interval.ms": 60000 } }' \
  http://localhost:28083/connectors

When i check status of connector it is giving following error:

_{"name":"s3-sink-docker","connector":{"state":"RUNNING","worker_id":"localhost:28083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain\n\tat io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:115)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain\n\tat com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:131)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1115)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:764)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:728)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)\n\tat com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)\n\tat com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4185)\n\tat com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4132)\n\tat com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1302)\n\tat com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1259)\n\tat io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:110)\n\tat io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:102)\n\t... 9 more\n","id":0,"worker_id":"localhost:28083"}]}_

utkarshsaraf19 avatar Apr 13 '18 08:04 utkarshsaraf19

  1. The issue here is that export doesn't expose variables to the internal Docker commands.
  2. Why did you build from source? S3 Connect is already included in cp-kafka-connect
  3. CONNECT_PLUGIN_PATH only applies to Confluent 4.0+

I suggest you volume mount a file at /root/.aws/credentials. Otherwise, you would be exposing your credentials on the command line / docker run command.

https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-file-format

More info: https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html

OneCricketeer avatar Apr 16 '18 20:04 OneCricketeer

Instead of AWS S3, I want to connect my Kafka pod(s) to Minio. Where/How should I pass Minio URL?

tirelibirefe avatar May 18 '20 04:05 tirelibirefe

@tirelibirefe Google is your friend https://blog.minio.io/journaling-kafka-messages-with-s3-connector-and-minio-83651a51045d?gi=5f2c7fde948d

OneCricketeer avatar May 18 '20 04:05 OneCricketeer