kafka-connect-storage-common icon indicating copy to clipboard operation
kafka-connect-storage-common copied to clipboard

Add ability to write multiple topics into single output path

Open OneCricketeer opened this issue 7 years ago • 1 comments

From StackOverflow

Naturally, one might try to use RegexRouter to send multiple topics to a single directory. Say, data coming from JDBC Source connector

    "topics": "SQLSERVER-TEST-TABLE_TEST",

    "transforms":"dropPrefix",      
    "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",  
    "transforms.dropPrefix.regex":"SQLSERVER-TEST-(.*)",  
    "transforms.dropPrefix.replacement":"$1"

But this will throw a NPE

Caused by: java.lang.NullPointerException
    at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
    ... 10 more

And if debugging (the S3 Connector, specifically), we see that the data that's needed to generate the top level folder is available, but the storage writer cannot access it from the map.

There is a HashMap with the original topic name (SQLSERVER_TEST_TABLE_TEST-0), and the transform has already been applied (TABLE-TEST-0), so if we lookup the "new" topicname, it cannot find the S3 writer for the TopicPartition.

image

I think adding a separate config in the storage-common module for performing the logic of the RegexRouter outside of the SMT pipeline will help solve this problem, and can be patched into the Hadoop, S3, and other storage connectors

OneCricketeer avatar Dec 03 '18 07:12 OneCricketeer

Will do, I run those whole thing by using docker directly and EKS (kubernetes). Now I need to compile the project and set up the local environment, it may take some time 😄

dongxiaohe avatar Dec 06 '18 03:12 dongxiaohe