stream-reactor icon indicating copy to clipboard operation
stream-reactor copied to clipboard

Feat/InfluxDB - Increase time precision to microseconds for timestamp fields

Open afausti opened this issue 2 years ago • 4 comments

#626 assumes Unix timestamp in seconds if the type is double and coerce to Long in milliseconds. In this PR we increase the time precision to microseconds for timestamp fields.

I think a mechanism to set the time precision as proposed in #850 would be a better solution though.

afausti avatar May 06 '22 22:05 afausti

Hi, thanks for the pull request.

However I do not believe it is necessary.

I would have thought you should be able to achieve what you are looking to do by adding TIMESTAMPUNIT to the KCQL.

INSERT INTO table SELECT col1,col2 FROM topic TIMESTAMPUNIT=MICROSECONDS

We would prefer to keep this user-configurable via KCQL than to have it hard-coded to a particular unit.

Having looked at the documentation it does seem this is missing, so I will make a note to add this in the near future.

davidsloan avatar May 09 '22 10:05 davidsloan

Thanks @davidsloan!

Indeed I just tested the TIMESTAMPUNIT to specify the precision of the timestamp field and that worked, so I reverted the following:

@@ -72,7 +72,7 @@ object InfluxPoint {
       .flatMap { path =>
         record
           .field(path)
-          .map(coerceTimeStamp(_, path.value).map(details.timestampUnit -> _))
+          .map(coerceTimeStamp(_, path.value).map(TimeUnit.MICROSECONDS -> _))
       }
       .getOrElse(Try(TimeUnit.NANOSECONDS -> nanoClock.getEpochNanos))

But I'm still multiplying the double timestamp in seconds by 1E6 to convert it to microseconds for this to work.

My Scala is not good enough to make this more generic, but I think the idea is to use the TIMESTAMPUNIT value to multiply the double timestamp by the correct factor before coercing it to Long.

Here's the connector configuration I've used with this PR

{
    "connect.influx.db": "mydb",
    "connect.influx.error.policy": "THROW",
    "connect.influx.kcql": "INSERT INTO Test.logevent_heartbeat SELECT * FROM Test.logevent_heartbeat WITHTIMESTAMP private_efdStamp TIMESTAMPUNIT=MICROSECONDS",
    "connect.influx.max.retries": "10",
    "connect.influx.password": "",
    "connect.influx.retry.interval": "60000",
    "connect.influx.url": "http://influxdb:8086",
    "connect.influx.username": "-",
    "connect.progress.enabled": "false",
    "connector.class": "com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector",
    "name": "influxdb-sink",
    "tasks.max": "1",
    "topics": "Test.logevent_heartbeat"
}
docker-compose exec influxdb influx -database mydb -execute 'SELECT private_efdStamp FROM "Test.logevent_heartbeat"'
1652209485367887000 1652209485.367887
1652209486389710000 1652209486.3897102
1652209487411820000 1652209487.41182
1652209488434421000 1652209488.4344213
1652209489456561000 1652209489.4565613
1652209490478031000 1652209490.4780314
1652209491500193000 1652209491.500193
1652209492521670000 1652209492.52167

Can I get some help to make this more generic?

I'll edit issue #850 to explain that TIMESTAMPUNIT is currently not working with double timestamps from the tests above.

afausti avatar May 10 '22 19:05 afausti

Let me have a little more of a think, try a few scenarios and get back to you.

My first thought is that maybe the TimestampConverter SMT could help here, as it is quite versatile.

davidsloan avatar May 11 '22 16:05 davidsloan

@davidsloan I was looking at TimestampConverter and looks like it cannot change the timestamp precision when writing Unix timestamps.

I think the path forward is to use the TIMESTAMPUNIT information to multiply the double timestamp in seconds by the corresponding factor. I'm happy to discuss this idea further if needed.

afausti avatar Jul 01 '22 17:07 afausti