gtrs
gtrs copied to clipboard
How to XREAD last element
As far as I understand, gtrs consumer reads data via xread, I would like to be able to read the last element of the stream. As far as I understand, this cannot be done now, right? https://github.com/redis/redis/issues/7388
Reads all records:
NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "0-0"})
Reads only new records:
NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "$"})
Hello, any update on this?
Hi. Sorry this got lost in the pile of incoming notifications 😓
Wow, this is a really new feature 😮 Currently, gtrs code is as follows:
for {
entry = read(lastId)
chan <- entry
lastId = entry.Id
}
So it works both when you specify $ and id-id. Theoretically it should also work with +, as after reading the last entry you'll fetch its id and then continue reading from it.
that's right, IIUC the user will use + only on the first call, and then use $ on later calls. the first call will only block if the stream is empty.
Yet I also don't understand this comment from oranagra.... If I use $ after +, it means that I can skip an entry that was added in-between reading + and $ 🤷🏻
Can you please try it out? If not, I'll publish a fix to what the issue is 🙂
Hello, sorry for the long delay. I'll try it today or tomorrow. Do you mean I can just specify stream-id like "+-$" and it could work?
So it works both when you specify
$andid-id. Theoretically it should also work with+, as after reading the last entry you'll fetch its id and then continue reading from it. Can you please try it out? If not, I'll publish a fix to what the issue is.
The problem that "+" don't works in xread, only in xrange/xrevrange. So I can read only all records "0-0" or only new records "$". Any other id will result to: ERR Invalid stream ID specified as stream command argument.
I guess it was added later, so you probably need to update your dependencies. https://github.com/redis/redis/pull/13117
I also know I can pass last id from xrevrange to gtrs consumer, but I'm not sure how to make that id inclusive (i.e. include the specified id in stream after start listening).
The only way it will work right now. @dranikpg Please check if https://github.com/redis/redis/pull/13117 has any impact on the go-redis dependency.
stream := "main"
id := LastMessageID(ctx, redis, stream)
stream := gtrs.NewConsumer[Data](ctx, redis, gtrs.StreamIDs{stream: id})
func LastMessageID(ctx context.Context, client redis.Cmdable, stream string) string {
messages, err := client.XRevRangeN(ctx, stream, "+", "-", 1).Result()
if err != nil {
return "$", nil
}
// If there are no messages, we can only wait for the next one
if len(messages) == 0 {
return "$", nil
}
id, err := strconv.Atoi(str.LeftPart(messages[0].ID, "-"))
if err != nil {
return "$", nil
}
// We are looking for the closest entry before this one.
// We assume that no one will have time to send 1 million records in 1 ms.
return fmt.Sprintf("%d-%d", id-1, 99999), nil
}