sea-streamer icon indicating copy to clipboard operation
sea-streamer copied to clipboard

Owned consumer stream

Open carlocorradini opened this issue 1 month ago • 2 comments

Motivation

Since the message is behind a lifetime, it is currently not possible to return impl Stream<Item =?> from a function using consumer stream.

The following cannot be compiled (cannot return value referencing temporary value returns a value referencing data owned by the current function):

async fn my_stream(&self, options: SeaConsumerOptions) -> impl Stream<Item = String> {
    self
        .streamer // SeaStreamer
        .create_consumer(&[StreamKey::new("my_key").unwrap()], options).unwrap()
        .stream()
        .filter_map(|message| {
            message.ok().and_then(|message| {
                message
                    .message()
                    .as_str()
                    .ok()
                    .and_then(|message| Some(message.to_string()))
            })
        })
}

Proposed Solutions

I would want something like the two functions that redis provides for pubsub streams: one that consumes the stream with self and returns only Stream without the lifetime, and another employing &mut self that returns Stream + '_. into_stream or something similar.

carlocorradini avatar May 22 '24 13:05 carlocorradini