gtrs icon indicating copy to clipboard operation
gtrs copied to clipboard

How to XREAD last element

Open sedyh opened this issue 1 year ago • 6 comments

As far as I understand, gtrs consumer reads data via xread, I would like to be able to read the last element of the stream. As far as I understand, this cannot be done now, right? https://github.com/redis/redis/issues/7388

Reads all records:

NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "0-0"})

Reads only new records:

NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "$"})

sedyh avatar Feb 28 '24 15:02 sedyh

Hello, any update on this?

sedyh avatar Mar 12 '24 16:03 sedyh

Hi. Sorry this got lost in the pile of incoming notifications 😓

Wow, this is a really new feature 😮 Currently, gtrs code is as follows:

for {
  entry = read(lastId)
  chan <- entry
  lastId = entry.Id
}

So it works both when you specify $ and id-id. Theoretically it should also work with +, as after reading the last entry you'll fetch its id and then continue reading from it.


that's right, IIUC the user will use + only on the first call, and then use $ on later calls. the first call will only block if the stream is empty.

Yet I also don't understand this comment from oranagra.... If I use $ after +, it means that I can skip an entry that was added in-between reading + and $ 🤷🏻


Can you please try it out? If not, I'll publish a fix to what the issue is 🙂

dranikpg avatar Mar 13 '24 07:03 dranikpg

Hello, sorry for the long delay. I'll try it today or tomorrow. Do you mean I can just specify stream-id like "+-$" and it could work?

sedyh avatar Mar 18 '24 09:03 sedyh

So it works both when you specify $ and id-id. Theoretically it should also work with +, as after reading the last entry you'll fetch its id and then continue reading from it. Can you please try it out? If not, I'll publish a fix to what the issue is.

The problem that "+" don't works in xread, only in xrange/xrevrange. So I can read only all records "0-0" or only new records "$". Any other id will result to: ERR Invalid stream ID specified as stream command argument.

sedyh avatar Mar 28 '24 13:03 sedyh

I guess it was added later, so you probably need to update your dependencies. https://github.com/redis/redis/pull/13117

I also know I can pass last id from xrevrange to gtrs consumer, but I'm not sure how to make that id inclusive (i.e. include the specified id in stream after start listening).

sedyh avatar Mar 28 '24 14:03 sedyh

The only way it will work right now. @dranikpg Please check if https://github.com/redis/redis/pull/13117 has any impact on the go-redis dependency.

stream := "main"
id := LastMessageID(ctx, redis, stream)
stream := gtrs.NewConsumer[Data](ctx, redis, gtrs.StreamIDs{stream: id})
func LastMessageID(ctx context.Context, client redis.Cmdable, stream string) string {
	messages, err := client.XRevRangeN(ctx, stream, "+", "-", 1).Result()
	if err != nil {
		return "$", nil
	}

	// If there are no messages, we can only wait for the next one
	if len(messages) == 0 {
		return "$", nil
	}

	id, err := strconv.Atoi(str.LeftPart(messages[0].ID, "-"))
	if err != nil {
		return "$", nil
	}

	// We are looking for the closest entry before this one.
	// We assume that no one will have time to send 1 million records in 1 ms.
	return fmt.Sprintf("%d-%d", id-1, 99999), nil
}

sedyh avatar Apr 11 '24 17:04 sedyh