feat: close producer when topic is deleted
Related: https://github.com/infinyon/fluvio/issues/3836
We already close the consumer when the topic is deleted, now we will close the producer too.
Given:
fluvio produce my-topic
When:
fluvio topic delete my-topic
Then:
Topic "my-topic" was deleted
I don't think this is right approach. This should work for all client not just CLI. client should detect if topic is closed then close connection
Make sense! But the producer is never what is blocking. In this case, the blocker is the stdin and for connectors the source stream that is blocking:
#[connector(source)]
async fn start(config: CustomConfig, producer: TopicProducerPool) -> Result<()> {
let source = TestJsonSource::new(&config)?;
let mut stream = source.connect(None).await?;
while let Some(item) = stream.next().await {
println!("producing a value: {}", &item);
producer.send(RecordKey::NULL, item).await?;
}
Ok(())
}
Maybe we can change TopicProducerPool to detect an error when the topic is deleted and also add a method to detect it, but we will also need to add something like select! and change uses of the producer.
Another solution that I had is have a method that receives a stream and returns a stream that handles it.
async fn from_stream(stream: GenericStream) -> ProducerStream
But we will need to change implementations too:
#[connector(source)]
async fn start(config: CustomConfig, producer: TopicProducerPool) -> Result<()> {
let source = TestJsonSource::new(&config)?;
let mut stream = source.connect(None).await?;
while let Some(item) = producer.from_stream(stream).await {
println!("producing a value: {}", &item);
producer.send(RecordKey::NULL, item).await?;
}
Ok(())
}
What do you think?
Is cluster side socket already closed by the SPU? I think that's the first thing to handle for this case. In addition, if possible it would be nice for the SPU to send a message that the produce stream is being closed due to topic delete, and then close the spu side socket.
Is cluster side socket already closed by the SPU? I think that's the first thing to handle for this case. In addition, if possible it would be nice for the SPU to send a message that the produce stream is being closed due to topic delete, and then close the spu side socket.
Yeah, I think so. The SPU already send a TopicNotFound error when does not find it.
But I think that we will have the same problem. All usage of producer seems to be:
- Await an input
- Produce it
The Produce it part will return the SPU error, but it will be trigger only after the input.
I think that this issue aims to be notified before the input.
Stale pull request message