kafka-connect-storage-common
kafka-connect-storage-common copied to clipboard
Add ability to write multiple topics into single output path
Naturally, one might try to use RegexRouter to send multiple topics to a single directory. Say, data coming from JDBC Source connector
"topics": "SQLSERVER-TEST-TABLE_TEST",
"transforms":"dropPrefix",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"SQLSERVER-TEST-(.*)",
"transforms.dropPrefix.replacement":"$1"
But this will throw a NPE
Caused by: java.lang.NullPointerException
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
... 10 more
And if debugging (the S3 Connector, specifically), we see that the data that's needed to generate the top level folder is available, but the storage writer cannot access it from the map.
There is a HashMap with the original topic name (SQLSERVER_TEST_TABLE_TEST-0), and the transform has already been applied (TABLE-TEST-0), so if we lookup the "new" topicname, it cannot find the S3 writer for the TopicPartition.

I think adding a separate config in the storage-common module for performing the logic of the RegexRouter outside of the SMT pipeline will help solve this problem, and can be patched into the Hadoop, S3, and other storage connectors
Will do, I run those whole thing by using docker directly and EKS (kubernetes). Now I need to compile the project and set up the local environment, it may take some time 😄