callysto icon indicating copy to clipboard operation
callysto copied to clipboard

Does CStream need to have item type Option?

Open nastynaz opened this issue 1 year ago • 0 comments

First off, thank you guys for this repo. I've learned a lot from it.

I noticed when implementing a variant of CStream myself I didn't need to have the item type as Option:

impl KConsumer {
    pub fn new<T: AsRef<str>, V: AsRef<str>>(
        config: &Config,
        topic_name: V,
        consumer_group_id: T,
    ) -> Self {
        let consumer_config = config.build_consumer_config(consumer_group_id);
        let consumer: BaseConsumer<_> = consumer_config.create().expect("Consumer creation error");
        consumer
            .subscribe(&[topic_name.as_ref()])
            .expect("Can't subscribe to specified topic");

        Self {
            consumer: Arc::new(consumer),
        }
    }

    pub fn stream(&self) -> KStream {
        let (sender, receiver) = crossbeam::channel::unbounded();
        let consumer = self.consumer.clone();
        Self::gen_stream(sender, receiver, consumer)
    }

    fn gen_stream(
        sender: Sender<Option<OwnedMessage>>,
        receiver: Receiver<Option<OwnedMessage>>,
        consumer: Arc<BaseConsumer>,
    ) -> KStream {
        let _handle = thread::Builder::new()
            .name("kstream-gen".into())
            .spawn(move || {
                for m in consumer.iter() {
                    let msg = match m {
                        Ok(bm) => Some(bm.detach()),
                        Err(e) => {
                            tracing::error!("{}", e);
                            None
                        }
                    };

                    let _ = sender.send(msg);
                }
            });

        KStream { receiver }
    }
}

pin_project! {
    #[derive(Clone)]
    #[must_use = "streams do nothing unless polled"]
    pub struct KStream {
        #[pin]
        receiver: Receiver<Option<OwnedMessage>>,
    }
}

impl Stream for KStream {
    type Item = OwnedMessage; // this is no longer `Option<OwnedMessage>`

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        if let Ok(inner) = this.receiver.recv() {
            Poll::Ready(inner)
        } else {
            Poll::Pending
        }
    }
}

The stream no longer returns a doubly-nested Option. This allows for slightly more ergonomic iteration:

while let Some(msg) = stream.next().await {
// msg is no longer an Option here
}

I'm wondering if there was a reason you intended to return Options instead of just the message (note that the stream still returns an Option anyway)

nastynaz avatar May 02 '24 11:05 nastynaz