vertx-kafka-client icon indicating copy to clipboard operation
vertx-kafka-client copied to clipboard

auto.offset.reset ignored

Open owahab opened this issue 6 years ago • 5 comments

Thanks to your awesome project, my consumer was up and running in minutes.

However, I needed to re-consume the entire queue so did a consumer-group offset reset, then started my consumer but it consumed only the last messages from the queue.

This is my config:

import io.vertx.kafka.client.consumer.KafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
...
    @Override
    public void start() {
        Map<String, String> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUri);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        if (!kafkaUsername.isEmpty() && !kafkaPassword.isEmpty()) {
            config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            config.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
            config.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" " + "password=\"%s\";",
                kafkaUsername,
                kafkaPassword
            ));
        }

        kafkaConsumer = KafkaConsumer.create(vertx, config);
}

I tried disabling the auto commit, tried changing the consumer group but nothing worked.

What am I missing or doing wrong?

owahab avatar Jun 05 '18 09:06 owahab

I see the answer on SO as well, please don't duplicate the stuff :-) Have you already tried what happens using the native Kafka client as consumer?

ppatierno avatar Jun 05 '18 16:06 ppatierno

@ppatierno just tried native consumer and it works as expected.

owahab avatar Jun 05 '18 16:06 owahab

can you post the configuration of the native consumer please?

ppatierno avatar Jun 05 '18 16:06 ppatierno

@ppatierno it's the exact same config I used with vertx-kafka-client.

owahab avatar Jun 06 '18 12:06 owahab

@owahab the auto.offset.reset config parameter defines what to do when there is no initial offset in Kafka or if the current offset does not exist (as stated by the Kafka doc). It means that when you start the consumer for the first time, there is no offset committed and it starts to get messages. If you stop the consumer and then you start it again with "earliest", because the committed offset is already available no messages are re-read. Of course it's related to the fact that the consumer is in the same consumer group as before. If you start the consumer in a different consumer group it gets the messages from the beginning. In the case you want to use the same consumer group you have to use the seekAtBeginning method. I tried it with native Kafka client as well, and the behavior is the same because it's how Kafka works. How did you do the test with native Kafka client?

ppatierno avatar Dec 20 '18 11:12 ppatierno