rust-rdkafka icon indicating copy to clipboard operation
rust-rdkafka copied to clipboard

Upgrade to 0.28.0, receive new message timeout

Open rts-gordon opened this issue 3 years ago • 5 comments

Hi there. Kafka Server version: AWS MSK 2.7.1; set auto.create.topics.enable=true rdkafka version: 0.28.0 rust version: 1.59

My program send a new message to Kafka that message's topic doesn't exist, Kafka Server will create the topic first, then dispatch this new message. When I use rdkafka 0.26.0, everything is normal: send new message, Kafka Server create topic, the consumer will receive the message, all processing completed within 500ms.

But when I upgraded rdkafka to 0.28.0, I found something out of the ordinary: send new message, Kafka Server create topic, consumer still can receive message, but all processing completed within 5 MINUTES. It is too late to lost 5 minutes message.

Is there any big change in rdkafka 0.28.0, and how I can fix this issue, great appreciated for anyone's help.

Kafka producer config code:

pub fn create_producer(brokers: &str) -> FutureProducer {
    ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("message.timeout.ms", "5000")
        .set("queue.buffering.max.kbytes", "20000000") 
        .set("queue.buffering.max.messages", "5000000") 
        .set("message.send.max.retries", "3")
        .set("request.required.acks", "1")
        .create()
        .expect("Producer creation error")
}

Kafka consumer config:

pub fn create_consumer(
    brokers: &str,
    group_id: &str,
    topics: &[&str],
) -> StreamConsumer<CustomContext> {
    let consumer: StreamConsumer<CustomContext> = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("group.id", group_id)
        .set("enable.auto.commit", "false") 
        .set("auto.offset.reset", "largest") 

        .create_with_context(CustomContext)
        .expect("Consumer creation failed");

    consumer
        .subscribe(topics)
        .expect("Can't subscribe to specified topics");
    consumer
}

rts-gordon avatar Apr 05 '22 02:04 rts-gordon

@fede1024 @niedhui I know that rdkafka added new feature: cooperative-rebalancing from version 0.27.0, I am not sure if this will be effort Consumer receiving message, if does, how to set configurations to avoid this issue? I didn't find any way from examples. Would you like to take a look at this? Thank you all very much.

rts-gordon avatar Apr 06 '22 00:04 rts-gordon

I was not able to reproduce this issue. FWIW I created a consumer that subscribed into a non-existing topic and then a producer instance wrote a single message to that topic. The topic was created on the server side and the consumer was able to receive the message within a second.

Could you provide an example (for instance based on the roundtrip example) where this gets consistently reproduced?

scanterog avatar Apr 26 '22 20:04 scanterog

Hi @scanterog Thanks for your reply.

In my scenario, Consumer subscribe topics using regex like this: 1-[a-zA-Z].*$, then Producer send a message to a non-existing topic 1-testtopic, Consumer will receive this message from topic 1-testtopic after 5 minutes.

When Producer send message to a non-existing topic, Kafka will create this topic first, then will start a rebalance process for Consumer(this process will take almost 5 minutes), Consumer will re-subscribe all topics include the new topic 1-testtopic after rebalance. Before finishing rebalance, messages will be lost from Producer.

I think it is not an issue of 0.28.0, but I don't know how to reduce the rebalance time, 5 minutes is too long. Maybe it is a feature of Kafka?

rts-gordon avatar Apr 27 '22 00:04 rts-gordon

then will start a rebalance process for Consumer (this process will take almost 5 minutes),

ah ok, so I'll need to try to repro this with a consumer group of N instances. How many members/instances do you have on your consumer group?

Before finishing rebalance, messages will be lost from Producer.

Production and consumption are decoupled. Irrespective of the rebalance, the producer should be able to write to a given topic/partition unless this is an application that consumes from kafka and writes back to kafka after being able to ack some messages (which you won't be able to during a rebalance). is that the case?

scanterog avatar Apr 27 '22 14:04 scanterog

1, How many members/instances do you have on your consumer group? There are 600 topics and less than 10 consumer, and new topics will add into Kafka continuous.

2, Production and consumption are decoupled. Yes, Producer should be able to write to topic, even a new one, but Consumer will not receive message if the topic is a new one(just created), until the rebalance is finished.

In my scenario, Consumer doesn't know new topic names, so I have to subscribe with Regex, maybe this will occur issue?

rts-gordon avatar Apr 28 '22 00:04 rts-gordon

@CHCP did you find the solution for this issue?

i-am-chauhan avatar Sep 06 '22 10:09 i-am-chauhan

@i-am-chauhan
I found that this issue occurred by Kafka rebalance, we must wait for the rebanlance finished. No solution for this issue.

rts-gordon avatar Sep 08 '22 05:09 rts-gordon