kafka-connect-storage-common
kafka-connect-storage-common copied to clipboard
Add ability to write multiple topics into single output path
Naturally, one might try to use RegexRouter
to send multiple topics to a single directory. Say, data coming from JDBC Source connector
"topics": "SQLSERVER-TEST-TABLE_TEST",
"transforms":"dropPrefix",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"SQLSERVER-TEST-(.*)",
"transforms.dropPrefix.replacement":"$1"
But this will throw a NPE
Caused by: java.lang.NullPointerException
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
... 10 more
And if debugging (the S3 Connector, specifically), we see that the data that's needed to generate the top level folder is available, but the storage writer cannot access it from the map.
There is a HashMap with the original topic name (SQLSERVER_TEST_TABLE_TEST-0
), and the transform has already been applied (TABLE-TEST-0
), so if we lookup the "new" topicname, it cannot find the S3 writer for the TopicPartition
.
I think adding a separate config in the storage-common
module for performing the logic of the RegexRouter
outside of the SMT pipeline will help solve this problem, and can be patched into the Hadoop, S3, and other storage connectors