stream-reactor
stream-reactor copied to clipboard
Feat/InfluxDB - Increase time precision to microseconds for timestamp fields
#626 assumes Unix timestamp in seconds if the type is double and coerce to Long in milliseconds. In this PR we increase the time precision to microseconds for timestamp fields.
I think a mechanism to set the time precision as proposed in #850 would be a better solution though.
Hi, thanks for the pull request.
However I do not believe it is necessary.
I would have thought you should be able to achieve what you are looking to do by adding TIMESTAMPUNIT to the KCQL.
INSERT INTO table SELECT col1,col2 FROM topic TIMESTAMPUNIT=MICROSECONDS
We would prefer to keep this user-configurable via KCQL than to have it hard-coded to a particular unit.
Having looked at the documentation it does seem this is missing, so I will make a note to add this in the near future.
Thanks @davidsloan!
Indeed I just tested the TIMESTAMPUNIT to specify the precision of the timestamp field and that worked, so I reverted the following:
@@ -72,7 +72,7 @@ object InfluxPoint {
.flatMap { path =>
record
.field(path)
- .map(coerceTimeStamp(_, path.value).map(details.timestampUnit -> _))
+ .map(coerceTimeStamp(_, path.value).map(TimeUnit.MICROSECONDS -> _))
}
.getOrElse(Try(TimeUnit.NANOSECONDS -> nanoClock.getEpochNanos))
But I'm still multiplying the double
timestamp in seconds by 1E6 to convert it to microseconds for this to work.
My Scala is not good enough to make this more generic, but I think the idea is to use the TIMESTAMPUNIT value to multiply the double
timestamp by the correct factor before coercing it to Long
.
Here's the connector configuration I've used with this PR
{
"connect.influx.db": "mydb",
"connect.influx.error.policy": "THROW",
"connect.influx.kcql": "INSERT INTO Test.logevent_heartbeat SELECT * FROM Test.logevent_heartbeat WITHTIMESTAMP private_efdStamp TIMESTAMPUNIT=MICROSECONDS",
"connect.influx.max.retries": "10",
"connect.influx.password": "",
"connect.influx.retry.interval": "60000",
"connect.influx.url": "http://influxdb:8086",
"connect.influx.username": "-",
"connect.progress.enabled": "false",
"connector.class": "com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector",
"name": "influxdb-sink",
"tasks.max": "1",
"topics": "Test.logevent_heartbeat"
}
docker-compose exec influxdb influx -database mydb -execute 'SELECT private_efdStamp FROM "Test.logevent_heartbeat"'
1652209485367887000 1652209485.367887
1652209486389710000 1652209486.3897102
1652209487411820000 1652209487.41182
1652209488434421000 1652209488.4344213
1652209489456561000 1652209489.4565613
1652209490478031000 1652209490.4780314
1652209491500193000 1652209491.500193
1652209492521670000 1652209492.52167
Can I get some help to make this more generic?
I'll edit issue #850 to explain that TIMESTAMPUNIT is currently not working with double
timestamps from the tests above.
Let me have a little more of a think, try a few scenarios and get back to you.
My first thought is that maybe the TimestampConverter SMT could help here, as it is quite versatile.
@davidsloan I was looking at TimestampConverter and looks like it cannot change the timestamp precision when writing Unix timestamps.
I think the path forward is to use the TIMESTAMPUNIT information to multiply the double timestamp in seconds by the corresponding factor. I'm happy to discuss this idea further if needed.