rabbitmq-stream-dotnet-client icon indicating copy to clipboard operation
rabbitmq-stream-dotnet-client copied to clipboard

TimestampOffset

Open suprunka opened this issue 2 years ago • 1 comments

I have a stream which max age is 1 minute, and segment size is 50MB, when message size is around 25MB. In stream there are around 30 messages. I try to get messages from a stream with offset 1 minute, but I get all messages from start of stream (even though that were added to stream 1 hour ago). I am not sure if that is a problem with the library or my configuration is incorrect. Can you help me with that?
image

suprunka avatar Jul 15 '22 11:07 suprunka

Hi @suprunka Sorry for the delay. We will look as soon as possible.

Gsantomaggio avatar Sep 29 '22 14:09 Gsantomaggio

Hello, I am having this issue too. I do OffsetSpec = new OffsetTypeTimestamp(...) and get all messages from the beginning.

Mcpolu avatar Nov 15 '22 14:11 Mcpolu

I am just guessing, I think the issue may be in class RawConsumer, method MaybeDispatch. I think there should be another branch in the switch statement for OffsetTypeTimestamp

Mcpolu avatar Nov 15 '22 14:11 Mcpolu

@Mcpolu would you please prepare a PR?

Gsantomaggio avatar Nov 15 '22 14:11 Gsantomaggio

Sorry, I don't know how to convert from timestamp to offsetValue. Also I would have to ask for written permission from my employer before contributing code, it is not worth the hassle for such a small thing. A thousand apologies, I feel great appreciation for RabbitMQ software and team and I would love to be more useful.

When I retire I promise to come back and write lots of code for you!

Mcpolu avatar Nov 15 '22 15:11 Mcpolu

Don't worry :)! We are working on it!

Gsantomaggio avatar Nov 15 '22 15:11 Gsantomaggio

I did some tests, and it worked as expected. @Mcpolu important to remember that the TimeStamp is for chunk and not for message so the behaviour is different from the Offset. It is not possible to use MaybeDispatch

Suppose you have a code like:

     var count = 0;
        var consumer = await system.CreateRawConsumer(
            new RawConsumerConfig(stream)
            {
                OffsetSpec = new OffsetTypeTimestamp(0),
                MessageHandler = async (consumer, ctx, message) =>
                {
                    Console.WriteLine($"TimeStamp: {ctx.Timestamp.TotalMilliseconds} ms {count++}");
                }
            }
        );

you have a result like:

TimeStamp: 1668563788470 ms 0
TimeStamp: 1668563788967 ms 1
TimeStamp: 1668563789472 ms 2
TimeStamp: 1668563789977 ms 3
TimeStamp: 1668563790483 ms 4
TimeStamp: 1668563790989 ms 5

if you use one of that times as a start-offset you will see the filter.

To have a behaviour like offset you need to implement it client side. You can add datetime to the message properties and then exclude it consumer side, something like:

var message = new Message(new byte[10])
            {
                ApplicationProperties = new ApplicationProperties()
                {
                    ["timestamp"] = DateTime.Now.ToUniversalTime(), // or the long format you prefer
                }
            };

when you consume:

                MessageHandler = async (consumer, ctx, message) =>
                {
                    if (message.ApplicationProperties["timestamp"] < my_time)
                    {
/// exclude 
                    }

Gsantomaggio avatar Nov 16 '22 10:11 Gsantomaggio

The timestamp used to attach the consumer to is the one from the chunk (a chunk contains a batch of messages), this is why you can get messages a bit before the expected timestamp.

Imagine you ask for timestamp 20, the broker finds the chunk right before 20 has a timestamp of 15, so it sends the chunk and you'll get messages from this chunk. Maybe some of them are before 20, so the application code can filter them out. If the broker were to return the next chunk, let's at timestamp 30, you may miss a bunch of messages which actually match the timestamp requirement.

Note again that the timestamp is the one from the chunk (it's the one the broker indexes), because messages may not have any timestamp. Messages are just opaque arrays of bytes for the broker, they don't contain any timestamp information for it. The client library can try to extract a timestamp from messages, but what if it cannot do it for one, should it let it through? It's hard to come up with a solution for the general case, so the client library dispatches all the messages from the chunk because it does not have an obvious filtering solution like with an offset subscription specification.

acogoluegnes avatar Nov 16 '22 14:11 acogoluegnes

Hello, thank you for promptly looking into this issue.

Gsantomaggio said: the TimeStamp is for chunk and not for message acogoluegnes said: The timestamp used to attach the consumer to is the one from the chunk

This behavior is surprising for me. I think even if this were documented prominently users will be confused and you may receive the same question again and again over time.

acogoluegnes said: messages may not have any timestamp

But MessageContext has a TimeSpan Timestamp property? I understand this property to be a unix time timestamp of the message? In which circumstances will this property have a valid value and in which it won't?

new ConsumerConfig(system, stream) { OffsetSpec = new OffsetTypeTimestamp(myTimestamp), MessageHandler = async (sourceStream, consumer, ctx, message) => { // I expect messages only where ctx.Timestamp >= mytimestamp Debug.Assert(ctx.Timestamp >= mytimestamp); }

Thank you.

Mcpolu avatar Nov 16 '22 15:11 Mcpolu

But MessageContext has a TimeSpan Timestamp property? I understand this property to be a unix time timestamp of the message? In which circumstances will this property have a valid value and in which it won't?

TheMessageContext has TimeSpan not the Message. The TimeSpan is for chunk.

You can see the protocol specification https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_stream/docs/PROTOCOL.adoc#deliver

 Timestamp => int64 // erlang system time in milliseconds, since epoch 
 ....
 Messages => [Message] // no int32 for the size for this array; the size is defined by NumEntries field above

Gsantomaggio avatar Nov 17 '22 07:11 Gsantomaggio

When I run the code I get a different value on MessageContext.Timestamp for every message. If this timestamp was of the chunk I would be getting the same timestamp for a couple million messages, then a different timestamp for the next couple million messages and so on.

Mcpolu avatar Nov 17 '22 13:11 Mcpolu

it depends form the rate you publish, for example:

 for (ulong i = 0; i < numberOfMessages; i++)
        {
            var message = new Message(new byte[10])
            {
            };
            await rawProducer.Send(i, message);
            // Thread.Sleep(100);
        
        }

you will have:

 MessageHandler = async (consumer, ctx, message) =>
{
    Console.WriteLine($"TimeStamp: {ctx.Timestamp.TotalMilliseconds} ms {count++}");

.....

output

               
TimeStamp: 1668621168446 ms 0
TimeStamp: 1668621168446 ms 1
TimeStamp: 1668621168446 ms 2
TimeStamp: 1668621168446 ms 3
TimeStamp: 1668621168446 ms 4
TimeStamp: 1668621168446 ms 5
TimeStamp: 1668621168446 ms 6
TimeStamp: 1668621168446 ms 7
TimeStamp: 1668621168446 ms 8
TimeStamp: 1668621168446 ms 9

it is enough to uncomment the sleep to see the different.

A chunk can contain max 8192 messages

Gsantomaggio avatar Nov 17 '22 14:11 Gsantomaggio

Closing as this is working as expected. Please use discussions to ask additional questions. Thanks!

https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/discussions

lukebakken avatar Nov 17 '22 15:11 lukebakken