rust-rdkafka
rust-rdkafka copied to clipboard
BaseProducer Poll and Flush no longer clearing queue after `Move to Event-based API` commit
Upgraded to 0.36.2, and identified that rdkafka::error::RDKafkaErrorCode::QueueFull is continually returned after each send, even when explicitly calling BaseProducer.poll().
Additionally, explicit calls to BaseProducer.flush() leaves un-published events in queue - these can be observed when re-using the BaseProducer after flushing - ending thread - starting new thread using the same thread-safe BaseProducer.
Related code:
loop {
let producer_future = kafka_producer.lock().unwrap().send(
BaseRecord::to(topic_name.as_ref().unwrap())
.key(&())
.payload(&interval_subscription),
);
match producer_future {
Ok(_) => break,
Err((KafkaError::MessageProduction(rdkafka::error::RDKafkaErrorCode::QueueFull), _)) => {
let poll_amt = kafka_producer.lock().unwrap().poll(Timeout::Never);
println!("poll: {:?}", poll_amt);
},
Err((e, _)) => {
println!("Error {:?}", e);
break;
}
}
}
Downgrading to 0.35.0 to avoid https://github.com/fede1024/rust-rdkafka/commit/19f32bf824f5c5bd4268d6dcc6adb69a7afc0b08 resolves this.
Have seen a few other issues that may be related, like https://github.com/fede1024/rust-rdkafka/issues/638
Let me know if more information would be helpful, but I believe this can be observed with some minor changes to the kafka-benchmark project: https://github.com/fede1024/kafka-benchmark/blob/c04d8cee98b0aa47e1580a27d4db5ba90e6318b6/src/producer/mod.rs#L67-L87