vertx-kafka-client
vertx-kafka-client copied to clipboard
auto.offset.reset ignored
Thanks to your awesome project, my consumer was up and running in minutes.
However, I needed to re-consume the entire queue so did a consumer-group offset reset, then started my consumer but it consumed only the last messages from the queue.
This is my config:
import io.vertx.kafka.client.consumer.KafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
...
@Override
public void start() {
Map<String, String> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUri);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
if (!kafkaUsername.isEmpty() && !kafkaPassword.isEmpty()) {
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
config.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
config.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" " + "password=\"%s\";",
kafkaUsername,
kafkaPassword
));
}
kafkaConsumer = KafkaConsumer.create(vertx, config);
}
I tried disabling the auto commit, tried changing the consumer group but nothing worked.
What am I missing or doing wrong?
I see the answer on SO as well, please don't duplicate the stuff :-) Have you already tried what happens using the native Kafka client as consumer?
@ppatierno just tried native consumer and it works as expected.
can you post the configuration of the native consumer please?
@ppatierno it's the exact same config I used with vertx-kafka-client.
@owahab the auto.offset.reset
config parameter defines what to do when there is no initial offset in Kafka or if the current offset does not exist (as stated by the Kafka doc). It means that when you start the consumer for the first time, there is no offset committed and it starts to get messages. If you stop the consumer and then you start it again with "earliest", because the committed offset is already available no messages are re-read.
Of course it's related to the fact that the consumer is in the same consumer group as before. If you start the consumer in a different consumer group it gets the messages from the beginning.
In the case you want to use the same consumer group you have to use the seekAtBeginning
method.
I tried it with native Kafka client as well, and the behavior is the same because it's how Kafka works.
How did you do the test with native Kafka client?