stream-reactor
stream-reactor copied to clipboard
Formatting/Converting columns of cassandra source connector
Hi,
I have been using cassandra source connector and so far I am able to get it to work by pushing data from a cassandra table to a kafka topic using kcql.
In one of the case I have a timestamp data type in cassandra and when sending it to kafka I want to send this as unixtimestamp (number of (mili) seconds since epoch). In cassandra cql there is a function TOUNIXTIMESTAMP
to do this.
I want to know how (or if at all possible), this can be done with help of kcql?
Thanks Sachin
@sjmittal this would require an update to KCQL since it doesn't handle the functions yet in the parser.
@sjmittal The Cassandra source fires a CQL query. Have you tried putting the TOUNIXTIMESTAMP function in the select statement?
No. I have not changed the parser to process TOUNIXTIMESTAMP
rather I have modified https://github.com/lensesio/stream-reactor/blob/master/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/source/CassandraTableReader.scala to use row.getTimestamp(d.getName).getTime
if the field is of type DataType.timestamp
.
Since in my case I need to push milliseconds instead of data time, this workaround works for me.
@sjmittal are you up for contributing this back? Probably with a configuration option to control the behaviour?
I have created a PR https://github.com/lensesio/stream-reactor/pull/680/. However this should not be merged the way it stands. I need to see how best to make this generic. If we add configuration option to support each and every function, this may get hard to manage.
I suppose we can update the parse to support cassandra functions as defined in https://cassandra.apache.org/doc/latest/cql/functions.html
I will probably update the PR in case I make updates to the same.