sea-streamer
sea-streamer copied to clipboard
Owned consumer stream
Motivation
Since the message is behind a lifetime, it is currently not possible to return impl Stream<Item =?>
from a function using consumer stream.
The following cannot be compiled (cannot return value referencing temporary value returns a value referencing data owned by the current function):
async fn my_stream(&self, options: SeaConsumerOptions) -> impl Stream<Item = String> {
self
.streamer // SeaStreamer
.create_consumer(&[StreamKey::new("my_key").unwrap()], options).unwrap()
.stream()
.filter_map(|message| {
message.ok().and_then(|message| {
message
.message()
.as_str()
.ok()
.and_then(|message| Some(message.to_string()))
})
})
}
Proposed Solutions
I would want something like the two functions that redis provides for pubsub streams: one that consumes the stream with self
and returns only Stream
without the lifetime, and another employing &mut self
that returns Stream + '_
.
into_stream
or something similar.