rabbitmq-stream-dotnet-client
rabbitmq-stream-dotnet-client copied to clipboard
TimestampOffset
I have a stream which max age is 1 minute, and segment size is 50MB, when message size is around 25MB. In stream there are around 30 messages.
I try to get messages from a stream with offset 1 minute, but I get all messages from start of stream (even though that were added to stream 1 hour ago).
I am not sure if that is a problem with the library or my configuration is incorrect. Can you help me with that?
Hi @suprunka Sorry for the delay. We will look as soon as possible.
Hello, I am having this issue too. I do OffsetSpec = new OffsetTypeTimestamp(...) and get all messages from the beginning.
I am just guessing, I think the issue may be in class RawConsumer, method MaybeDispatch. I think there should be another branch in the switch statement for OffsetTypeTimestamp
@Mcpolu would you please prepare a PR?
Sorry, I don't know how to convert from timestamp to offsetValue. Also I would have to ask for written permission from my employer before contributing code, it is not worth the hassle for such a small thing. A thousand apologies, I feel great appreciation for RabbitMQ software and team and I would love to be more useful.
When I retire I promise to come back and write lots of code for you!
Don't worry :)! We are working on it!
I did some tests, and it worked as expected.
@Mcpolu important to remember that the TimeStamp is for chunk and not for message so the behaviour is different from the Offset
. It is not possible to use MaybeDispatch
Suppose you have a code like:
var count = 0;
var consumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
OffsetSpec = new OffsetTypeTimestamp(0),
MessageHandler = async (consumer, ctx, message) =>
{
Console.WriteLine($"TimeStamp: {ctx.Timestamp.TotalMilliseconds} ms {count++}");
}
}
);
you have a result like:
TimeStamp: 1668563788470 ms 0
TimeStamp: 1668563788967 ms 1
TimeStamp: 1668563789472 ms 2
TimeStamp: 1668563789977 ms 3
TimeStamp: 1668563790483 ms 4
TimeStamp: 1668563790989 ms 5
if you use one of that times as a start-offset you will see the filter.
To have a behaviour like offset
you need to implement it client side. You can add datetime to the message properties and then exclude it consumer side, something like:
var message = new Message(new byte[10])
{
ApplicationProperties = new ApplicationProperties()
{
["timestamp"] = DateTime.Now.ToUniversalTime(), // or the long format you prefer
}
};
when you consume:
MessageHandler = async (consumer, ctx, message) =>
{
if (message.ApplicationProperties["timestamp"] < my_time)
{
/// exclude
}
The timestamp used to attach the consumer to is the one from the chunk (a chunk contains a batch of messages), this is why you can get messages a bit before the expected timestamp.
Imagine you ask for timestamp 20, the broker finds the chunk right before 20 has a timestamp of 15, so it sends the chunk and you'll get messages from this chunk. Maybe some of them are before 20, so the application code can filter them out. If the broker were to return the next chunk, let's at timestamp 30, you may miss a bunch of messages which actually match the timestamp requirement.
Note again that the timestamp is the one from the chunk (it's the one the broker indexes), because messages may not have any timestamp. Messages are just opaque arrays of bytes for the broker, they don't contain any timestamp information for it. The client library can try to extract a timestamp from messages, but what if it cannot do it for one, should it let it through? It's hard to come up with a solution for the general case, so the client library dispatches all the messages from the chunk because it does not have an obvious filtering solution like with an offset subscription specification.
Hello, thank you for promptly looking into this issue.
Gsantomaggio said: the TimeStamp is for chunk and not for message acogoluegnes said: The timestamp used to attach the consumer to is the one from the chunk
This behavior is surprising for me. I think even if this were documented prominently users will be confused and you may receive the same question again and again over time.
acogoluegnes said: messages may not have any timestamp
But MessageContext has a TimeSpan Timestamp property? I understand this property to be a unix time timestamp of the message? In which circumstances will this property have a valid value and in which it won't?
new ConsumerConfig(system, stream) { OffsetSpec = new OffsetTypeTimestamp(myTimestamp), MessageHandler = async (sourceStream, consumer, ctx, message) => { // I expect messages only where ctx.Timestamp >= mytimestamp Debug.Assert(ctx.Timestamp >= mytimestamp); }
Thank you.
But MessageContext has a TimeSpan Timestamp property? I understand this property to be a unix time timestamp of the message? In which circumstances will this property have a valid value and in which it won't?
TheMessageContext
has TimeSpan not the Message
. The TimeSpan
is for chunk.
You can see the protocol specification https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_stream/docs/PROTOCOL.adoc#deliver
Timestamp => int64 // erlang system time in milliseconds, since epoch
....
Messages => [Message] // no int32 for the size for this array; the size is defined by NumEntries field above
When I run the code I get a different value on MessageContext.Timestamp for every message. If this timestamp was of the chunk I would be getting the same timestamp for a couple million messages, then a different timestamp for the next couple million messages and so on.
it depends form the rate you publish, for example:
for (ulong i = 0; i < numberOfMessages; i++)
{
var message = new Message(new byte[10])
{
};
await rawProducer.Send(i, message);
// Thread.Sleep(100);
}
you will have:
MessageHandler = async (consumer, ctx, message) =>
{
Console.WriteLine($"TimeStamp: {ctx.Timestamp.TotalMilliseconds} ms {count++}");
.....
output
TimeStamp: 1668621168446 ms 0
TimeStamp: 1668621168446 ms 1
TimeStamp: 1668621168446 ms 2
TimeStamp: 1668621168446 ms 3
TimeStamp: 1668621168446 ms 4
TimeStamp: 1668621168446 ms 5
TimeStamp: 1668621168446 ms 6
TimeStamp: 1668621168446 ms 7
TimeStamp: 1668621168446 ms 8
TimeStamp: 1668621168446 ms 9
it is enough to uncomment the sleep to see the different.
A chunk can contain max 8192
messages
Closing as this is working as expected. Please use discussions to ask additional questions. Thanks!
https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/discussions