kafka-connect-storage-common icon indicating copy to clipboard operation
kafka-connect-storage-common copied to clipboard

Add ability to write multiple topics into single output path

Open OneCricketeer opened this issue 5 years ago • 1 comments

From StackOverflow

Naturally, one might try to use RegexRouter to send multiple topics to a single directory. Say, data coming from JDBC Source connector

    "topics": "SQLSERVER-TEST-TABLE_TEST",

    "transforms":"dropPrefix",      
    "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",  
    "transforms.dropPrefix.regex":"SQLSERVER-TEST-(.*)",  
    "transforms.dropPrefix.replacement":"$1"

But this will throw a NPE

Caused by: java.lang.NullPointerException
    at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
    ... 10 more

And if debugging (the S3 Connector, specifically), we see that the data that's needed to generate the top level folder is available, but the storage writer cannot access it from the map.

There is a HashMap with the original topic name (SQLSERVER_TEST_TABLE_TEST-0), and the transform has already been applied (TABLE-TEST-0), so if we lookup the "new" topicname, it cannot find the S3 writer for the TopicPartition.

image

I think adding a separate config in the storage-common module for performing the logic of the RegexRouter outside of the SMT pipeline will help solve this problem, and can be patched into the Hadoop, S3, and other storage connectors

OneCricketeer avatar Dec 03 '18 07:12 OneCricketeer