stream-reactor icon indicating copy to clipboard operation
stream-reactor copied to clipboard

Formatting/Converting columns of cassandra source connector

Open sjmittal opened this issue 5 years ago • 5 comments

Hi, I have been using cassandra source connector and so far I am able to get it to work by pushing data from a cassandra table to a kafka topic using kcql. In one of the case I have a timestamp data type in cassandra and when sending it to kafka I want to send this as unixtimestamp (number of (mili) seconds since epoch). In cassandra cql there is a function TOUNIXTIMESTAMP to do this. I want to know how (or if at all possible), this can be done with help of kcql?

Thanks Sachin

sjmittal avatar Jan 20 '20 04:01 sjmittal

@sjmittal this would require an update to KCQL since it doesn't handle the functions yet in the parser.

andrewstevenson avatar Jan 20 '20 11:01 andrewstevenson

@sjmittal The Cassandra source fires a CQL query. Have you tried putting the TOUNIXTIMESTAMP function in the select statement?

andrewstevenson avatar Apr 27 '20 08:04 andrewstevenson

No. I have not changed the parser to process TOUNIXTIMESTAMP rather I have modified https://github.com/lensesio/stream-reactor/blob/master/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/source/CassandraTableReader.scala to use row.getTimestamp(d.getName).getTime if the field is of type DataType.timestamp. Since in my case I need to push milliseconds instead of data time, this workaround works for me.

sjmittal avatar Apr 27 '20 09:04 sjmittal

@sjmittal are you up for contributing this back? Probably with a configuration option to control the behaviour?

andrewstevenson avatar Apr 27 '20 09:04 andrewstevenson

I have created a PR https://github.com/lensesio/stream-reactor/pull/680/. However this should not be merged the way it stands. I need to see how best to make this generic. If we add configuration option to support each and every function, this may get hard to manage.

I suppose we can update the parse to support cassandra functions as defined in https://cassandra.apache.org/doc/latest/cql/functions.html

I will probably update the PR in case I make updates to the same.

sjmittal avatar Apr 27 '20 12:04 sjmittal