rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

BaseProducer Poll and Flush no longer clearing queue after `Move to Event-based API` commit

Open brentmjohnson opened this issue 9 months ago • 0 comments

Upgraded to 0.36.2, and identified that rdkafka::error::RDKafkaErrorCode::QueueFull is continually returned after each send, even when explicitly calling BaseProducer.poll().

Additionally, explicit calls to BaseProducer.flush() leaves un-published events in queue - these can be observed when re-using the BaseProducer after flushing - ending thread - starting new thread using the same thread-safe BaseProducer.

Related code:

loop {
    let producer_future = kafka_producer.lock().unwrap().send(
        BaseRecord::to(topic_name.as_ref().unwrap())
            .key(&())
            .payload(&interval_subscription),
    );
    match producer_future {
        Ok(_) => break,
        Err((KafkaError::MessageProduction(rdkafka::error::RDKafkaErrorCode::QueueFull), _)) => {
            let poll_amt = kafka_producer.lock().unwrap().poll(Timeout::Never);
            println!("poll: {:?}", poll_amt);
        },
        Err((e, _)) => {
            println!("Error {:?}", e);
            break;
        }
    }
}

Downgrading to 0.35.0 to avoid https://github.com/fede1024/rust-rdkafka/commit/19f32bf824f5c5bd4268d6dcc6adb69a7afc0b08 resolves this.

Have seen a few other issues that may be related, like https://github.com/fede1024/rust-rdkafka/issues/638

Let me know if more information would be helpful, but I believe this can be observed with some minor changes to the kafka-benchmark project: https://github.com/fede1024/kafka-benchmark/blob/c04d8cee98b0aa47e1580a27d4db5ba90e6318b6/src/producer/mod.rs#L67-L87

brentmjohnson avatar Apr 30 '24 15:04 brentmjohnson