kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
Added support for AWS IAM Roles in the S3Storage class
Hey guys,
I noticed that support for per-connector AWS credentials was recently added to master which is awesome and a feature I'm missing in the currently released version!
However, I also noticed that only basic AWS credentials are supported but no IAM roles, which would allow cross-account setups and are in my opinion also preferable from a security point of view.
This PR introduces a new config value aws.iam.role.arn which, if provided, configures an STSAssumeRoleSessionCredentialsProvider instead of the AWSStaticCredentialsProvider to fetch temporary session credentials in order to make requests to S3.
I've tested it with the following S3 sink connector:
name=iad_ad_s3_sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=iad.ad
file.delim=-
s3.region=us-east-1
s3.bucket.name=wh-test-bucket-us-east-1/ad-content/local/version=1
s3.part.size=5242880
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
format.class.schemas.enable=false
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
flush.size=100000
rotate.interval.ms=900000
partitioner.class=io.confluent.connect.storage.partitioner.HourlyPartitioner
timestamp.extractor=RecordField
timestamp.field=auditTimestamp
locale=de-AT
timezone=Europe/Vienna
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
key.converter=io.confluent.connect.avro.AvroConverter
#key.converter.schema.registry.url=https://sr-dev.willhaben.at/
key.converter.schema.registry.url=http://localhost:8081
key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
#value.converter.schema.registry.url=https://sr-dev.willhaben.at/
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
aws.iam.role.arn=arn:aws:iam::123456789012:role/wh-test-iam-role
aws.access.key.id=AKI.......I
aws.secret.access.key=dk......pF
From the logs, we can see that the IAM role is successfully assumed:
[2020-04-20 08:43:16,675] DEBUG Sending Request: POST https://sts.amazonaws.com / Parameters: ({"Action":["AssumeRole"],"Version":["2011-06-15"],"RoleArn":["arn:aws:iam::123456789012:role/wh-test-iam-role"],"RoleSessionName":["iad_ad_s3_sink-S3SinkConnector"],"DurationSeconds":["900"]}Headers: (amz-sdk-invocation-id: 01f97706-0d14-67cb-973f-25db5d1c877e, User-Agent: aws-sdk-java/1.11.725 Linux/4.15.0-96-generic OpenJDK_64-Bit_Server_VM/11.0.5+10 java/11.0.5 scala/2.12.11 vendor/AdoptOpenJDK, ) (com.amazonaws.request:1270)
[2020-04-20 08:43:16,717] DEBUG AWS4 Canonical Request: '"POST
/
amz-sdk-invocation-id:01f97706-0d14-67cb-973f-25db5d1c877e
amz-sdk-retry:0/0/500
host:sts.amazonaws.com
user-agent:aws-sdk-java/1.11.725 Linux/4.15.0-96-generic OpenJDK_64-Bit_Server_VM/11.0.5+10 java/11.0.5 scala/2.12.11 vendor/AdoptOpenJDK
x-amz-date:20200420T064316Z
amz-sdk-invocation-id;amz-sdk-retry;host;user-agent;x-amz-date
de49ed5d98ce394d4c8fd0ab5e0282fbb54b9c4190fa4fe26339283e2c099b02" (com.amazonaws.auth.AWS4Signer:33)
[2020-04-20 08:43:16,717] DEBUG AWS4 String to Sign: '"AWS4-HMAC-SHA256
20200420T064316Z
20200420/us-east-1/sts/aws4_request
582e261b5bee5a038e52653f95d433f29cd6c65aa883ec31949e344ffdcb4926" (com.amazonaws.auth.AWS4Signer:33)
[2020-04-20 08:43:17,911] DEBUG Received successful response: 200, AWS Request ID: 101c336d-c360-437e-8529-1510b4d1f81f (com.amazonaws.request:86)
And requests to S3 are succeeding:
[2020-04-20 08:43:22,606] DEBUG Sending Request: PUT https://s3.amazonaws.com /wh-test-bucket-us-east-1/ad-content/us-east-1-test/version%3D1/topics/iad.ad/year%3D2020/month%3D04/day%3D10/hour%3D15/iad.ad-0-0000159561.snappy.parquet Parameters: ({"uploadId":["XVNkbI7R.ZWD36SJLgCANLiAKOjTjm6x9FGoB18h_I50O76BMthmA7aTbobhIZDfZpOxzjDoyEBaHeOEsawk2QCAA_J3c8HYBY9HAIuHvib6.LrF9ee4fpis8ke_0LasgJjFIv4rR5_KaSmkf85b96AOf3587tobhv5HCclLkis-"],"partNumber":["1"]}Headers: (amz-sdk-invocation-id: f054694e-694c-87ee-acb7-d6cc7eea5d78, Content-Length: 1383754, Content-Type: application/octet-stream, User-Agent: APN/1.0 Confluent/1.0 KafkaS3Connector/6.0.0-SNAPSHOT, aws-sdk-java/1.11.725 Linux/4.15.0-96-generic OpenJDK_64-Bit_Server_VM/11.0.5+10 java/11.0.5 scala/2.12.11 vendor/AdoptOpenJDK, ) (com.amazonaws.request:1270)
[2020-04-20 08:43:22,607] DEBUG AWS4 Canonical Request: '"PUT
/wh-test-bucket-us-east-1/ad-content/us-east-1-test/version%3D1/topics/iad.ad/year%3D2020/month%3D04/day%3D10/hour%3D15/iad.ad-0-0000159561.snappy.parquet
partNumber=1&uploadId=XVNkbI7R.ZWD36SJLgCANLiAKOjTjm6x9FGoB18h_I50O76BMthmA7aTbobhIZDfZpOxzjDoyEBaHeOEsawk2QCAA_J3c8HYBY9HAIuHvib6.LrF9ee4fpis8ke_0LasgJjFIv4rR5_KaSmkf85b96AOf3587tobhv5HCclLkis-
amz-sdk-invocation-id:f054694e-694c-87ee-acb7-d6cc7eea5d78
amz-sdk-retry:0/0/500
content-length:1383754
content-type:application/octet-stream
host:s3.amazonaws.com
user-agent:APN/1.0 Confluent/1.0 KafkaS3Connector/6.0.0-SNAPSHOT, aws-sdk-java/1.11.725 Linux/4.15.0-96-generic OpenJDK_64-Bit_Server_VM/11.0.5+10 java/11.0.5 scala/2.12.11 vendor/AdoptOpenJDK
x-amz-content-sha256:UNSIGNED-PAYLOAD
x-amz-date:20200420T064322Z
x-amz-security-token:FwoGZXIvYXdzEPj//////////wEaDBoI82HQon2kJJbu+iLCAfVc221RRKyRZS5QRwrrTYI+EW1yH/7Qe2ldNdAqDdxDnAeDodg7oVz995/c4xUwklfJo2IdmMiD9Qpom7pP0p2U4MTlSt7YLExZxs/KgYD7/Wfc55xms+joY6GkvlqTfu80FMxNj9OXHRmShogOCoA3Uzz9Hu9p0u1Frb02kXhW4rhuE0Y1mVULqdmZNZzCuGix4N7o7F2UFz/90oC8SrvOKZMic+3j7QulbCgaixdSQXDs8QyoGDiuiJ5NAUGqq20TKIWJ9fQFMi02HfU6lnjEVLN3ItmUV7rz6CFLGKCre4rJ36L+Rc9i37hkzGuC3H6bFFu0s6U=
amz-sdk-invocation-id;amz-sdk-retry;content-length;content-type;host;user-agent;x-amz-content-sha256;x-amz-date;x-amz-security-token
UNSIGNED-PAYLOAD" (com.amazonaws.auth.AWS4Signer:33)
[2020-04-20 08:43:22,607] DEBUG AWS4 String to Sign: '"AWS4-HMAC-SHA256
20200420T064322Z
20200420/us-east-1/s3/aws4_request
6c67cea3c506d0316329cad26c4af6a3f82654d7f78f58388fca815a67b734f8" (com.amazonaws.auth.AWS4Signer:33)
[2020-04-20 08:43:25,169] DEBUG Received successful response: 200, AWS Request ID: 26A3BADAC4F56787 (com.amazonaws.request:86)
Also files are actually showing up in S3:
aws s3 ls wh-test-bucket-us-east-1/ad-content/us-east-1-test/version=1/topics/iad.ad/year=2020/month=04/day=10/hour=15/ --region us-east-1
2020-04-20 08:43:23 1383754 iad.ad-0-0000159561.snappy.parquet
There are a couple of things I'd like your feedback on before proceeding:
- I only implemented a very basic test. I would consider the test coverage to be insufficient, but I'd like your feedback first on what is expected and how tests should be implemented. The credentials provider as well as the STS client are instantiated in the class and are not mocked out; hence, it's difficult to unit test. My suggestion would be to either 1) create protected factory classes in the
S3Storageclass as well as a protected constructor to provide mocks or 2) use PowerMock (ugly). - I removed the scope down policy (reducing the requested permissions when assuming the IAM role) in the STSAssumeRoleSessionCredentialsProvider because I only tested the S3SinkConnector but haven't tried the source from S3 connector, which (I assume) needs different permissions than the S3 Sink. If you provide me with the exact permissions that both types of connectors require (I think I got the S3 Sink one[1]), I'm happy to add it.
Looking forward to your feedback on this!
[1] https://github.com/Step2Web/kafka-connect-storage-cloud/commit/17e179610afbf052bf07cee721355b1443c7e97e#diff-c5c2842b25e9f1f10f73571553d1b336R74-R94
It looks like @Step2Web hasn't signed our Contributor License Agreement, yet.
The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence. Wikipedia
You can read and sign our full Contributor License Agreement here.
Once you've signed reply with [clabot:check] to prove it.
Appreciation of efforts,
clabot
[clabot:check]
@confluentinc It looks like @Step2Web just signed our Contributor License Agreement. :+1:
Always at your service,
clabot
why it is not reviewed? does it mean currently s3 sink still can't use IAM role?
We are looking for this please