stream-reactor icon indicating copy to clipboard operation
stream-reactor copied to clipboard

Data Mapping between Kafka and Influx DB in Infux DB Sink

Open xuzikun2003 opened this issue 4 years ago • 7 comments

Issue Guidelines

Please review these questions before submitting any issue?

What version of the Stream Reactor are you reporting this issue for?

The latest version

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

No

Have you read the docs?

Yes

What is the expected behavior?

I am trying to use Influx DB sink to move data from Kafka to Influx DB. Here is a correspondence between the data in Kafka to the data in Influx DB Kafka topic -> Influx DB name key in a topic -> Influx DB measurement name values in a topic -> the rows in a measurement

It looks like KCQL query does not allow me to achieve the above type of data mapping between Kafka and Influx DB. Is what I am looking for currently supported by InfluxDB sink or not?

What was observed?

What is your Connect cluster configuration (connect-avro-distributed.properties)?

What is your connector properties configuration (my-connector.properties)?

Please provide full log files (redact and sensitive information)

xuzikun2003 avatar Jan 12 '21 09:01 xuzikun2003

@xuzikun2003 no its not currently supported if you want each topic to go to a separate influxdb. PRs welcome! But how would you handle credentials per db in KCQL securely.

andrewstevenson avatar Jan 12 '21 11:01 andrewstevenson

@xuzikun2003 no its not currently supported if you want each topic to go to a separate influxdb. PRs welcome! But how would you handle credentials per db in KCQL securely.

Let's make it simple and say we only have one topic, topic1. In this topic, we have 1000 keys, key1, key2, ......, key1000, and we want to map each key to a measurement and map the values corresponding to each key to the rows of this measurement.

xuzikun2003 avatar Jan 12 '21 18:01 xuzikun2003

@xuzikun2003 In your example you would have too many measurements and only one field for each measurement in InfluxDB which is not a good schema design.

Trying to make the analogy with a table, the InfluxDB measurement is the table, the InfluxDB field and tag keys are the columns and the InfluxDB field and tag values are the rows.

I use the following correspondence with the influxDB Sink connector:

Kafka topic -> InfluxDB measurement Kafka topic keys -> InfluxDB field keys Kafka topic values -> InfluxDB field values

does that seem more reasonable?

You can also add InfluxDB tags using KSQL depending on the InfluxDB schema you want to achieve.

afausti avatar Jan 12 '21 22:01 afausti

@xuzikun2003 In your example you would have too many measurements and only one field for each measurement in InfluxDB which is not a good schema design.

Trying to make the analogy with a table, the InfluxDB measurement is the table, the InfluxDB field and tag keys are the columns and the InfluxDB field and tag values are the rows.

I use the following correspondence with the influxDB Sink connector:

Kafka topic -> InfluxDB measurement Kafka topic keys -> InfluxDB field keys Kafka topic values -> InfluxDB field values

does that seem more reasonable?

You can also add InfluxDB tags using KSQL depending on the InfluxDB schema you want to achieve.

Each measurement would have many fields, not just one field. For example, in Kafka topic1, I have the following messages key1: {"a": 1, "b": 1, "c": 1} key2: {"a": 2, "b": 2, "c": 2} key3: {"a": 3, "b": 3, "c": 3} key1: {"a": 4, "b": 4, "c": 4} key2: {"a": 5, "b": 5, "c": 5} key3: {"a": 6, "b": 6, "c": 6}

Then I expect to have three tables (measurements), key1, key2, key3. These three measurements will look like

key1: a b c 1 1 1 4 4 4

key2: a b c 2 2 2 5 5 5

key3: a b c 3 3 3 6 6 6

xuzikun2003 avatar Jan 12 '21 22:01 xuzikun2003

Right, thanks for the clarification. My understanding from the InfluxDB Sink documentation is that the current version of the connector does not support nested fields in the payload schema.

afausti avatar Jan 12 '21 22:01 afausti

However see also #329, if the name of the keys key1, key2, key3 are fixed you could try a KSQL query like this:

connect.influx.kcql="INSERT INTO key1 SELECT key1.* FROM topic1 WITHTIMESTAMP sys_time(); INSERT INTO key2 SELECT key2.* FROM topic1 WITHTIMESTAMP sys_time(); INSERT INTO key3 SELECT key3.* FROM topic1 WITHTIMESTAMP sys_time()"

afausti avatar Jan 12 '21 23:01 afausti

However see also #329, if the name of the keys key1, key2, key3 are fixed you could try a KSQL query like this:

connect.influx.kcql="INSERT INTO key1 SELECT key1.* FROM topic1 WITHTIMESTAMP sys_time(); INSERT INTO key2 SELECT key2.* FROM topic1 WITHTIMESTAMP sys_time(); INSERT INTO key3 SELECT key3.* FROM topic1 WITHTIMESTAMP sys_time()"

But in my case there could be a lot of keys, and the number of keys is not bounded. I need a loop to iterate over all the keys. How hard is to support nested fields in payload schema? Do we need to extend the KCQL language to support nested fields?

xuzikun2003 avatar Jan 13 '21 07:01 xuzikun2003