stream-reactor icon indicating copy to clipboard operation
stream-reactor copied to clipboard

S3 Sink Connector Support of many Topics

Open Alexander-ac opened this issue 2 years ago • 6 comments

Issue Guidelines

Are multiple topics also supported? I've tried different spellings.

for example: connect.s3.kcql=INSERT INTO kafka-backup SELECT * FROM testtopic, testtopic2 STOREAS JSON WITH_FLUSH_COUNT = 1

I keep getting the following error message:

java.lang.IllegalStateException: fatal: Can't find fileNamingStrategy in config

Can you help me?

Thank you

What version of the Stream Reactor are you reporting this issue for? 2.8

What is your connector properties configuration (my-connector.properties)?

name=S3SinkConnector connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector tasks.max=1 key.converter.schemas.enable=false connect.s3.kcql=INSERT INTO kafka-backup SELECT * FROM testtopic, testtopic2 STOREAS JSON WITH_FLUSH_COUNT = 1 connect.s3.aws.region=eu-central-1 topics=testtopic, testopic2 schema.enable=false errors.log.enable=true key.converter.schemas.enable=false value.converter.schemas.enable=false key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter

Please provide full log files (redact and sensitive information)

[2022-05-20 10:07:46,074] INFO [S3SinkConnector] Seeked offset None for TP TopicPartition(Topic(testtopic),0) (io.lenses.streamreactor.connect.aws.s3.sink.seek.IndexManager:144) [2022-05-20 10:07:46,103] INFO [S3SinkConnector] Seeked offset None for TP TopicPartition(Topic(testtopic),4) (io.lenses.streamreactor.connect.aws.s3.sink.seek.IndexManager:144) [2022-05-20 10:07:46,137] INFO [S3SinkConnector] Seeked offset None for TP TopicPartition(Topic(testtopic),1) (io.lenses.streamreactor.connect.aws.s3.sink.seek.IndexManager:144) [2022-05-20 10:07:46,163] INFO [S3SinkConnector] Seeked offset None for TP TopicPartition(Topic(testtopic),3) (io.lenses.streamreactor.connect.aws.s3.sink.seek.IndexManager:144) [2022-05-20 10:07:46,199] INFO [S3SinkConnector] Seeked offset None for TP TopicPartition(Topic(testtopic),2) (io.lenses.streamreactor.connect.aws.s3.sink.seek.IndexManager:144) [2022-05-20 10:07:46,302] ERROR WorkerSinkTask{id=S3SinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184) java.lang.IllegalStateException: fatal: Can't find fileNamingStrategy in config

nonFatal:

Fatal TPs: HashSet(Set(TopicPartition(Topic(testtopic2),1)), Set(TopicPartition(Topic(testtopic2),0)), Set(TopicPartition(Topic(testtopic2),3)), Set(TopicPartition(Topic(testtopic2),4)), Set(TopicPartition(Topic(testtopic2),2))) at io.lenses.streamreactor.connect.aws.s3.sink.S3SinkTask.handleErrors(S3SinkTask.scala:125) at io.lenses.streamreactor.connect.aws.s3.sink.S3SinkTask.open(S3SinkTask.scala:212) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:635) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:71) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:700) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:452) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) [2022-05-20 10:07:46,307] INFO [Consumer clientId=connector-consumer-S3SinkConnector-0, groupId=connect-S3SinkConnector] Revoke previously assigned partitions testtopic-0, testtopic2-3, testtopic2-4, testtopic-2, testtopic-1, testtopic-4, testtopic-3, testtopic2-0, testtopic2-1, testtopic2-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307)

Alexander-ac avatar May 20 '22 10:05 Alexander-ac

Multiple topics support would be nice. Actually, a topic wildcard support would be ideal. Confluent S3 Sink has that feature and it is very handy.

AlexeyRaga avatar Jun 08 '22 02:06 AlexeyRaga

Multiple KCQL statements can be provided to the connector to support multiple topics, separated by a semi-colon. For example:

INSERT INTO kafka-backup:data1 SELECT * FROM testtopic STOREAS JSON WITH_FLUSH_COUNT = 1;
INSERT INTO kafka-backup:data2 SELECT * FROM testtopic2 STOREAS JSON WITH_FLUSH_COUNT = 1

We've not formally tested this behaviour with this connector, so let us know how you get on.

davidsloan avatar Jun 08 '22 08:06 davidsloan

@davidsloan true, but not wildcards... Use case: I want to backup all the "domain" topics and I being able to say domain-.* would be great.

AlexeyRaga avatar Jun 08 '22 10:06 AlexeyRaga

Thanks guys, I tried it and it works. However, the expressions must all be on one line.

Alexander-ac avatar Jun 09 '22 07:06 Alexander-ac

@Alexander-ac you can use \ to separate lines, like any java properties file.

ethanttbui avatar Jun 14 '22 00:06 ethanttbui

@Alexander-ac do you mind closing this ticket if your problem was resolved?

davidsloan avatar Jul 13 '22 09:07 davidsloan

We have a use case where we need wildcard support. We can use topics.regex to have Kafka Connect subscribe to multiple topics, but it seems like the limitation is with KCQL.

What we really would like to do is be able to specify write a single KCQL statement that would work with any matched topic based on the regex pattern, allowing us to partition based on topic name, known common fields, etc.

In fact, having to specify the topic name in the KCQL itself seems a bit redundant given the topics and topics.regex options. Maybe a special meta topic name could be introduced (e.g. __topic__) to indicate that the KCQL should be applied to any topic?

jalaziz avatar Oct 03 '22 23:10 jalaziz

tbh, I disagree with using KCQL to start with. It intervenes with Kafka Stream's interface design and add some extra complexities.

ethanttbui avatar Oct 12 '22 01:10 ethanttbui

Multiple KCQL statements can be provided to the connector to support multiple topics, separated by a semi-colon. For example:

INSERT INTO kafka-backup:data1 SELECT * FROM testtopic STOREAS JSON WITH_FLUSH_COUNT = 1;
INSERT INTO kafka-backup:data2 SELECT * FROM testtopic2 STOREAS JSON WITH_FLUSH_COUNT = 1

We've not formally tested this behaviour with this connector, so let us know how you get on.

@davidsloan Thanks for this tip. It works as expected on backup. On restore, it works too, almost. With the following kcql in the restore;

connect.s3.kcql=INSERT INTO target-topic SELECT * FROM msk-backup-source-topic:source-topic STOREAS \`JSON\` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 1; \
INSERT INTO target-topic-2 SELECT * FROM msk-backup-source-topic:source-topic-2 STOREAS \`JSON\` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 1; \
INSERT INTO target-topic-3 SELECT * FROM msk-backup-source-topic:source-topic-3 STOREAS \`JSON\` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 1, \

target-topic-2 and target-topic-3 get their events in the correct order. But target-topic for some reason gets the events of all 3 topics.

Any ideas?

smrutimandal avatar Jan 24 '23 21:01 smrutimandal

### Update

I replicated the data of a single topic to another bucket and the restore worked as expected.

I deliberately didn't copy the .indexes/ folder to the new bucket. Can this folder be the reason for the earlier failures?

smrutimandal avatar Jan 29 '23 20:01 smrutimandal

https://github.com/lensesio/stream-reactor/pull/920

We are adding the ability to use a wildcard so as to avoid having to specify the multiple kcql statements. I will update this thread when this feature is released.

davidsloan avatar Feb 09 '23 17:02 davidsloan

### Update

I replicated the data of a single topic to another bucket and the restore worked as expected.

I deliberately didn't copy the .indexes/ folder to the new bucket. Can this folder be the reason for the earlier failures?

Yes. Thanks for the feedback @smrutimandal . I will work on adding the condition to the source so that it ignores the '.indexes' directory whenever it is encountered.

davidsloan avatar Feb 09 '23 17:02 davidsloan

We have now rolled out support for the KCQL

SELECT * FROM `*` 

to avoid having to define the topics multiple times in configuration. This will apply to all topics that are configured in the sink.

From 4.1.0

davidsloan avatar Mar 19 '23 13:03 davidsloan

Hi, I have a problem when running 4.2.0 connector version on AWS MSK connect with wildcard support. I get strange error and I cant find the way to fix it.

[Worker-0cfe00eb343584c62] java.lang.IllegalStateException: Unable to load region from any of the providers in the chain software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain@427ca428: [software.amazon.awssdk.regions.providers.SystemSettingsRegionProvider@36477d3b: Unable to load region from system settings. Region must be specified either via environment variable (AWS_REGION) or system property (aws.region)., software.amazon.awssdk.regions.providers.AwsProfileRegionProvider@7345f597: No region provided in profile: default, software.amazon.awssdk.regions.providers.InstanceProfileRegionProvider@370cb8fa: Unable to retrieve region information from EC2 Metadata service. Please make sure the application is running on EC2.]

The way that I run the connector is: aws kafkaconnect create-connector --cli-input-json file://backupall.json --region eu-central-1

Below is my backupall.json code: { "connectorConfiguration": { "connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector", "key.converter.schemas.enable": "false", "connect.s3.kcql": "INSERT INTO MYBUCKET:backupall SELECT * FROM * WITH_FLUSH_COUNT = 1", "aws.region": "eu-central-1", "topics.regex": ".*", "tasks.max": "2", "schema.enable": "false", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "errors.log.enable": "true", "key.converter": "org.apache.kafka.connect.storage.StringConverter" }, "connectorName": "mskS3backupALL", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "bootstrapServers", "vpc": { "subnets": [ "subnet-1", "subnet-2", "subnet-3" ], "securityGroups": ["sg-1"] } } }, "capacity": { "autoScaling": { "maxWorkerCount": 2, "mcuCount": 1, "minWorkerCount": 1, "scaleInPolicy": { "cpuUtilizationPercentage" : 20 }, "scaleOutPolicy": { "cpuUtilizationPercentage" : 80 } } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "serviceExecutionRoleArn", "plugins": [ { "customPlugin": { "customPluginArn": "customPluginArn", "revision": 1 } } ], "kafkaClusterEncryptionInTransit": {"encryptionType": "TLS"}, "kafkaClusterClientAuthentication": {"authenticationType": "IAM"}, "logDelivery": { "workerLogDelivery": { "cloudWatchLogs": { "enabled": true, "logGroup": "connector-log" } } } }

EC2 instance which is use to run the code has administrator access role. Service Execution Role has Administrator access. All MSK ports are opened in the security group.

gwrona85 avatar May 23 '23 07:05 gwrona85

@gwrona85

I would recommend you create a new issue in future rather than commenting on a closed issue, your response may be missed.

Please see documentation at https://docs.lenses.io/5.2/connectors/sinks/s3sinkconnector/#options "aws.region" is not a valid property, you want "connect.s3.aws.region"

Some of the properties you are using are deprecated, too. Please check the link.

davidsloan avatar May 23 '23 08:05 davidsloan