erlkaf icon indicating copy to clipboard operation
erlkaf copied to clipboard

Producer timestamp

Open lud opened this issue 2 years ago • 11 comments

Hi,

I am asked to add a timestamp to messages when producing a message. I am currently using brod but would like to migrate to erlkaf eventually.

I would like to know if it is supported by erlkaf or if support is planned.

Thank you!

lud avatar Dec 21 '21 17:12 lud

Why don't you add the timestamp in message header ? you can add there whatever metadata you want

silviucpp avatar Dec 21 '21 18:12 silviucpp

More specific: https://github.com/silviucpp/erlkaf/blob/master/src/erlkaf.erl#L131

silviucpp avatar Dec 21 '21 18:12 silviucpp

Well the timestamp is a specific piece of data (from what I understand) that is used to order messages accros partitions. It is not a custom metadata that would fit on some header but an attribute of the Kafka event, just like the key, the value, the headers or the offset.

lud avatar Dec 21 '21 20:12 lud

Hello,

I see what do you mean. For sure is not used to order the messages because for this you have the offset. I see there is a property form broker version v0.10 or newer where each message has a timestamp when was received by the broker. Not sure on what they are using it internally.

I can expose this info even if I'm a bit reticent because will decrease a bit the performances for those that are not using this property and honestly I don't see why you cannot set yourself a timestamp when the message was actually produced which is a bit more correct than using this.

To give you an example: you are producing message X at T1 and at that moment let's suppose the kafka broker is down (or connection between your client and the broker). erlkaf will queue (by default) the message until when your connection is up again and send the message at that moment. The gap between T1 and the timestamp when broker got the message can be very big..

You know better on what you are using this info but personally I won't implement my logic on it.

silviucpp avatar Dec 21 '21 22:12 silviucpp

From what I have been told by a Java team this timestamp is used to order messages properly using Kafka Streams (the Java library) when data is consolidated from multiple partitions (so the offset is not enough).

To give you an example: you are producing message X at T1 and at that moment let's suppose the kafka broker is down (or connection between your client and the broker). erlkaf will queue (by default) the message until when your connection is up again and send the message at that moment. The gap between T1 and the timestamp when broker got the message can be very big..

Yes but I think that is the point. The consumers of the message will see the timestamp that you have set and use it as the "official" timestamp for the validity of the message data. So if in the meantime the broker receives newer messages, or receives multiples messages representing the same data (after connection recovery), that timestamp allows the consumer to select the last one. That is the role of the offset but here we are talking about multi-partitions KGlobalTables or Streams (honestly I only have a basic understanding of Java's Kafka Streams).

will decrease a bit the performances for those that are not using this property

You mean that event if there was a produce/6 function, you need to support that new value all along the data path towards librdkafka even from produce/5 ?

lud avatar Dec 22 '21 00:12 lud

Hello, to be clear: we don't support kafka streams. librdkafka is not planning to support this as well.

For the timestamp: I will add it when I will have some spare time. It's not a big deal. This is not part of the producer API and won't impact the produce in any way.

Basically when you consume a message you will be able to retrieve the timestamp property as was populated by the broker (if any - for broker older than 0.10 is not available) so basically in the erlkaf_msg record you will have a new property called timestamp.

silviucpp avatar Dec 22 '21 15:12 silviucpp

Hello, to be clear: we don't support kafka streams. librdkafka is not planning to support this as well.

I think that librdkafka does support that feature : https://github.com/edenhill/librdkafka/issues/1016#issuecomment-274038214 . But it's fine if you don't want to in your own library,

For the timestamp: I will add it when I will have some spare time. It's not a big deal. This is not part of the producer API and won't impact the produce in any way.

Basically when you consume a message you will be able to retrieve the timestamp property as was populated by the broker (if any - for broker older than 0.10 is not available) so basically in the erlkaf_msg record you will have a new property called timestamp.

My personal need is to be able to set the timestamp as the producer level, which is supported by Kafka, instead of letting the broker set its own timestamp. So if you're doing that just for me maybe wait but otherwise I guess it can be useful for anyone :)

Thanks!

lud avatar Dec 22 '21 16:12 lud

I think you misunderstand this or maybe I'm doing it. The only api in librdkafka is to get the timestamp that's assigned by the broker. There is no api to set yourself the timestamp on your own.

silviucpp avatar Dec 22 '21 20:12 silviucpp

Maybe, I don't know C nor Kafka very well, but the issue comment I linked above mentioned that the function rd_kafka_producev supports it and it looks like it is possible to set a timestamp looking at the code : https://github.com/edenhill/librdkafka/blob/15d3e7ea8589fcf0a81f5631e9d74f88630fa7bf/tests/0052-msg_timestamps.c#L79

Is that so ? Or is that something else ?

lud avatar Dec 22 '21 20:12 lud

Hello @lud ,

I had some time yesterday - today to look into this and librdkafka source code and yes it can be done you are right. Seems if not specified the value of timestamp is internally 0 which translates to current time.

I'll allocate some time and implement this feature soon.

silviucpp avatar Jan 13 '22 08:01 silviucpp

Hello, Thanks for the update, it is good to hear!

Thanks :+1:

lud avatar Jan 13 '22 17:01 lud

The feature is available in last master

silviucpp avatar Oct 10 '22 13:10 silviucpp

Thank you very much :)

lud avatar Oct 12 '22 15:10 lud