fluvio icon indicating copy to clipboard operation
fluvio copied to clipboard

feat: close producer when topic is deleted

Open fraidev opened this issue 1 year ago • 4 comments

Related: https://github.com/infinyon/fluvio/issues/3836

We already close the consumer when the topic is deleted, now we will close the producer too.

Given:

fluvio produce my-topic   

When:

fluvio topic delete my-topic   

Then:

Topic "my-topic" was deleted

fraidev avatar Aug 21 '24 23:08 fraidev

I don't think this is right approach. This should work for all client not just CLI. client should detect if topic is closed then close connection

Make sense! But the producer is never what is blocking. In this case, the blocker is the stdin and for connectors the source stream that is blocking:

#[connector(source)]
async fn start(config: CustomConfig, producer: TopicProducerPool) -> Result<()> {
    let source = TestJsonSource::new(&config)?;
    let mut stream = source.connect(None).await?;
    while let Some(item) = stream.next().await {
        println!("producing a value: {}", &item);
        producer.send(RecordKey::NULL, item).await?;
    }
    Ok(())
}

Maybe we can change TopicProducerPool to detect an error when the topic is deleted and also add a method to detect it, but we will also need to add something like select! and change uses of the producer.

Another solution that I had is have a method that receives a stream and returns a stream that handles it.

async fn from_stream(stream: GenericStream) -> ProducerStream

But we will need to change implementations too:

#[connector(source)]
async fn start(config: CustomConfig, producer: TopicProducerPool) -> Result<()> {
    let source = TestJsonSource::new(&config)?;
    let mut stream = source.connect(None).await?;
    while let Some(item) = producer.from_stream(stream).await {
        println!("producing a value: {}", &item);
        producer.send(RecordKey::NULL, item).await?;
    }
    Ok(())
}

What do you think?

fraidev avatar Aug 22 '24 17:08 fraidev

Is cluster side socket already closed by the SPU? I think that's the first thing to handle for this case. In addition, if possible it would be nice for the SPU to send a message that the produce stream is being closed due to topic delete, and then close the spu side socket.

digikata avatar Aug 26 '24 15:08 digikata

Is cluster side socket already closed by the SPU? I think that's the first thing to handle for this case. In addition, if possible it would be nice for the SPU to send a message that the produce stream is being closed due to topic delete, and then close the spu side socket.

Yeah, I think so. The SPU already send a TopicNotFound error when does not find it.

But I think that we will have the same problem. All usage of producer seems to be:

  • Await an input
  • Produce it

The Produce it part will return the SPU error, but it will be trigger only after the input.

I think that this issue aims to be notified before the input.

fraidev avatar Aug 26 '24 16:08 fraidev

Stale pull request message

github-actions[bot] avatar Oct 26 '24 11:10 github-actions[bot]