rust-rdkafka
rust-rdkafka copied to clipboard
Mocking doesn't work
We use this library in production and it's been great so far. We noticed that there was a new mocking module and upgraded to help extend test coverage to ensure stability.
I'm getting the error KafkaError (Client creation error: librdkafka failed to create consumer queue) which I found sits really close to the C code in the base library. Our mocks look like the following and have used the example in examples/mocking.rs for reference.
fn create_mock_cluster<'c>() -> MockCluster<'c, DefaultProducerContext> {
let cluster = MockCluster::new(3).expect("failed to create mock cluster");
cluster
.create_topic("<our topic>", 32, 3)
.expect("failed to set topic");
cluster
}
fn create_mock_producer(bootstrap_servers: String) -> FutureProducer {
ClientConfig::new()
.set("bootstrap.servers", bootstrap_servers)
.create()
.expect("failed to create mock producer")
}
fn create_mock_consumer(bootstrap_servers: String) -> StreamingConsumer {
StreamingConsumer::mock(bootstrap_servers)
}
We use the new type pattern on StreamConsumer for additional mocking and expectations such as committing. That's where StreamingConsumer comes from.
Because of the above error, I cloned the repo and attempted to run the example mocking.rs locally and failed with the following.
Running `target/debug/examples/mocking`
%5|1699901725.314|CONFWARN|rdkafka#producer-1| [thrd:app]: No `bootstrap.servers` configured: client will not be able to connect to Kafka cluster
Warming up for 10s...
Recording for 10s...
measurements: 1938
mean latency: 255.35655314757489ms
p50 latency: 255ms
p90 latency: 456ms
p99 latency: 501ms
%3|1699901745.548|FAIL|rdkafka#producer-2| [thrd:127.0.0.1:42977/bootstrap]: 127.0.0.1:42977/1: Connect to ipv4#127.0.0.1:42977 failed: Connection refused (after 0ms in state CONNECT)
%3|1699901745.548|FAIL|rdkafka#producer-2| [thrd:127.0.0.1:33881/bootstrap]: 127.0.0.1:33881/2: Connect to ipv4#127.0.0.1:33881 failed: Connection refused (after 0ms in state CONNECT)
%3|1699901745.548|FAIL|rdkafka#producer-2| [thrd:127.0.0.1:37597/bootstrap]: 127.0.0.1:37597/3: Connect to ipv4#127.0.0.1:37597 failed: Connection refused (after 0ms in state CONNECT)
Because the example failed, I'm optimistic that this is something on my end but I can't find any documentation surrounding this.
Thanks for looking into this
@connormullett The example actually works and finishes as expected though it's indeed confusing we're getting bootstrap connection refused failures being logged (that might be a bug on librdkafka). But if you check the code, the consumer actually receives messages and the latency is actually measured.
Client creation error: librdkafka failed to create consumer queue
This sounds like group.id was not provided to the consumer. What is StreamingConsumer::mock doing?
This sounds like
group.idwas not provided to the consumer. What isStreamingConsumer::mockdoing?
This was the issue and it works!
Does is make sense here to have a message to check configuration in case of these types of errors?
Does is make sense here to have a message to check configuration in case of these types of errors?
mm I need to check but I think the latest does it already. Which version are you using?
Which version are you using?
0.35.0
Something else seems funny about the Mock consumer, given this diff:
git --no-pager diff
diff --git a/examples/mocking.rs b/examples/mocking.rs
index c4a1d1c4..8b967e25 100644
--- a/examples/mocking.rs
+++ b/examples/mocking.rs
@@ -38,7 +38,7 @@ async fn main() {
.send_result(
FutureRecord::to(TOPIC)
.key(&i.to_string())
- .payload("dummy")
+ .payload(&i.to_string())
.timestamp(now()),
)
.unwrap()
@@ -54,6 +54,7 @@ async fn main() {
println!("Warming up for 10s...");
loop {
let message = consumer.recv().await.unwrap();
+ println!("{:?}", message.payload_view::<str>().unwrap());
let then = message.timestamp().to_millis().unwrap();
if start.elapsed() < Duration::from_secs(10) {
// Warming up.
The first message I see via the consumer is around ~i=500. This makes it useful for benchmarking, but rough for testing messages produced by some bit of code.
If I only send a small handful of events my consumer never finishes the first recv().await call. (i.e., change the example to produce 1 message.) I messed with consumer configuration for a while (I was using the default, exactly like the example), thinking this had to do with fetch.min.bytes or fetch.wait.max.ms or something, but nothing seemed to work.
Plus, I feel like the fact that this example (with my diff) doesn't start by printing 0, but instead only picks up at some random point around ~500, is proof that messages produced to the mock cluster are being lost? It's like some buffer has to be filled, but then a big chunk of the first messages in the topic are lost?
Ahhh, I finally checked out the mocking code, and saw the unit test inside, which publishes and consumes 1 item and works. The only difference was .set("auto.offset.reset", "earliest")
I guess there's something about Mocking topic creation where latest (the default) doesn't work as I'd expect. I just created the topic and subscribed with the consumer before I published anything, and I didn't see anything.
Changing to earliest worked, which is good enough for my needs.