vertx-kafka-client icon indicating copy to clipboard operation
vertx-kafka-client copied to clipboard

Lost records on frequent commits and pause/resume

Open Seraksab opened this issue 5 years ago • 10 comments

Hi

We are currently experiencing lost records from kafka that are not delivered to the registered data handler. We did some further investigating and testing and there probably seems to be an issue on how commits are handled.

Our general procedure looks the following:

  • read records up until a certain threshold
  • call pause on the consumer
  • process the batch of records
  • call commit on consumer
  • call resume on consumer
  • repeat

The consumer is subscribed to one topic only.

If we work our way through a topic with tens of thousands of records, the first few iterations always succeed but at some point records are skipped and not deliverd to the registred data handler (hundreds in some cases!).

Simply adding a delay of 500ms after calling commit seems to fix the problem and we are receiving all of the records correctly.

In a further attempt we tried replacing the added delay by calling commit with an completion handler instead and waiting for the operation to complete before continuing. However the passed handler is never called. After adding a timeout timer that completes the future in case the commit completion handler is not called in time, we see the following exception right after the timer fires and completes the future: (the timeout seems to not matter at all, as we've tried with values from 500ms up to 30s and the result is the same)

io.vertx.core.impl.ContextImpl
SEVERE: Unhandled exception
java.lang.IllegalStateException: Result is already complete: succeeded
	at io.vertx.core.impl.FutureImpl.complete(FutureImpl.java:94)
	at com.kpibench.kafka.KafkaTool$1.lambda$commit$0(KafkaTool.java:235)
	at io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl.lambda$commit$5(KafkaConsumerImpl.java:342)
	at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$null$1(KafkaReadStreamImpl.java:119)
	at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

Seraksab avatar Dec 14 '18 10:12 Seraksab

@ppatierno ping

vietj avatar Dec 14 '18 11:12 vietj

Just a clarification on the last part concerning the exception, as reading it back made me realize it was expressed pretty inaccurately.

The excpetion was a result of the following snipped: (as stated due to our attempt on waiting for the commit callback and having a timeout as the callback seems not to be called reliably)

Future<Void> f = Future.future();
consumer.commit(h -> f.complete());
vertx.setTimer(5000, h -> {
  if (!f.isComplete()) {
    f.complete();
  }
});
return f;

As soon as the timer fires and completes the future, the exception occures (for every call and as stated, no matter the timout value). so this would indicate the callback is somehow not triggered until the future is completed, which in turn results in our procedure (as explained above) continuing by calling resume and starting to accept records again.

Seraksab avatar Dec 17 '18 09:12 Seraksab

can you provide tests for this ?

vietj avatar Dec 17 '18 09:12 vietj

Here's a quick test, representing a stripped down version of our current setup. In this case, the errors start to appear after about 50 iterations (= processed batches), which in this case equals to 50.000 records. so you will need a topic with quite a few records for testing.

@RunWith(VertxUnitRunner.class)
public class ExampleTest {

  private static final String TOPIC = "";
  private static final String GROUP = "Test" + System.currentTimeMillis();
  private static final String SERVERS = ""; // bootstrap.servers

  private static final int BATCH_SIZE = 1000;

  private static final Logger logger = LoggerFactory.getLogger(ExampleTest.class);

  private Long prevOffset = null;
  private Vertx vertx;
  private Async async;
  private TestContext context;

  @Test
  public void test(TestContext context) {
    this.vertx = Vertx.vertx();
    this.context = context;
    this.async = context.async();
    Map<String, String> consumerConfig = createConsumerConfig();
    KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, consumerConfig);
    TestBatchHandler handler = new TestBatchHandler(BATCH_SIZE, 10000, this::handleAndCheckData, consumer);
    consumer.handler(handler::add);
    consumer.exceptionHandler(e -> logger.error("consumer error", e));
    consumer.subscribe(TOPIC);
  }

  private Future<Void> handleAndCheckData(List<KafkaConsumerRecord> lst) {
    logger.info("got {} values", lst.size());
    for (KafkaConsumerRecord record : lst) {
      long offset = record.offset();
      if (prevOffset != null) {
        long expected = prevOffset + 1;
        if (offset != expected) {
          logger.error("offset {}, expected: {}, diff: {}", offset, expected, offset - expected);
          context.assertEquals(expected, offset);
        }
      }
      prevOffset = offset;
    }
    if (lst.size() < BATCH_SIZE) {
      async.complete();
    }
    return Future.succeededFuture();
  }

  private Map<String, String> createConsumerConfig() {
    Map<String, String> consumerConfig = new HashMap<>();
    consumerConfig.put("bootstrap.servers", SERVERS);
    consumerConfig.put("group.id", GROUP);
    consumerConfig.put("session.timeout.ms", "30000");
    consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("auto.offset.reset", "earliest");
    consumerConfig.put("enable.auto.commit", "false");
    return consumerConfig;
  }

  private class TestBatchHandler<T> {

    private ValueHandler<T> valueHandler;
    private KafkaConsumer consumer;
    private ArrayDeque<T> records;
    private int batchSize;
    private long timeout;
    private long timerId;

    public TestBatchHandler(int batchSize, long timeout, ValueHandler<T> valueHandler, KafkaConsumer consumer) {
      this.consumer = consumer;
      this.batchSize = batchSize;
      this.timeout = timeout;
      this.valueHandler = valueHandler;
      this.records = new ArrayDeque<>(batchSize + 1);
      this.timerId = vertx.setTimer(timeout, t -> stopReceiving());
    }

    public void add(T value) {
      synchronized (records) {
        records.add(value);
        if (records.size() >= batchSize) {
          if (vertx.cancelTimer(timerId)) {
            stopReceiving();
          } else {
            consumer.pause();
          }
        }
      }
    }

    private void startReceiving() {
      consumer.resume();
      timerId = vertx.setTimer(timeout, t -> stopReceiving());
    }

    private void stopReceiving() {
      consumer.pause();
      handleBatch().setHandler(h -> startReceiving());
    }

    private Future<Void> handleBatch() {
      synchronized (records) {
        Future<Void> future = Future.future();
        List<T> lst = new ArrayList<>(records);
        valueHandler.handle(lst).setHandler(r -> {
          try {
            synchronized (records) {
              if (records.size() == lst.size()) {
                records.clear();
              } else {
                lst.forEach(records::remove);
              }
            }
          } finally {
            consumer.commit(); // these 2 lines result in lost entries
            future.complete(); //

            // commit().setHandler(future.completer());
          }
        });
        return future;
      }
    }


    /**
     * Using this block results in the mentioned exception,
     * indicating the handler may not be called correctly?
     */
    private Future<Void> commit() {
      Future<Void> f = Future.future();
      consumer.commit(h -> f.complete());
      vertx.setTimer(5000, h -> {
        if (!f.isComplete()) {
          f.complete();
        }
      });
      return f;
    }
  }

  @FunctionalInterface
  interface ValueHandler<T> {
    Future<Void> handle(List<T> values);
  }
}

EDIT: updated test code

Seraksab avatar Dec 17 '18 12:12 Seraksab

sorry to be late guys, I am taking a look at it right now

ppatierno avatar Dec 20 '18 10:12 ppatierno

@Seraksab I also read ...

In a further attempt we tried replacing the added delay by calling commit with an completion handler instead and waiting for the operation to complete before continuing. However the passed handler is never called.

It seems to be a different bug. Did you notice this one in your use case only or in general?

ppatierno avatar Dec 20 '18 10:12 ppatierno

@Seraksab the example is not usable as it is because it has some classes that I guess come from your application (GenericData, Kafka.ValueHandler, ...) but I will try to change it a little bit for reproducing the issue.

ppatierno avatar Dec 20 '18 11:12 ppatierno

@Seraksab I changed my mind, I think it's better having from you something that we can actually running in a simpler way. I would remove the schema registry dependency as well, using messages with key and values as strings. Is it something you can provide?

ppatierno avatar Dec 20 '18 11:12 ppatierno

@ppatierno

It seems to be a different bug. Did you notice this one in your use case only or in general?

So far I only noticed this in this specific use case. However, we did not make use of this completion handler prior to this issue, so it's hard to tell.

he example is not usable as it is because it has some classes that I guess come from your application (GenericData, Kafka.ValueHandler, ...) but I will try to change it a little bit for reproducing the issue.

Sorry, my fault. I just quickly stripped down our internal test and seem to have missed a few classes. I have them removed now and also got rid of the schema registry and also simplified the messages to plain strings, as you requested. Oddly enough, this results in the error occurring sooner. In my case now after about 25.000 records.

I have updated my previously posted example to avoid spamming the comments with code.

Seraksab avatar Dec 20 '18 12:12 Seraksab

@ppatierno

It seems to be a different bug. Did you notice this one in your use case only or in general?

just a quick follow up on the commit handler not beeing called. this can be reproduced quite easily and i extracted it into this short test, that does nothing but call 'pause()' and 'commit()' after some time.

In my case this always runs into the set timeout without the specified completionHandler being called.

However, removing the 'pause()' call seems to resolve this issue and the handler is called as expected.

Is this behaviour intended this way, as i cant' find it in the documentation? If this is indeed the case, what would you suggest as best practice for the mentioned use case of batch processing? As i need to pause the consumer at some point, as processing may take some time.

@RunWith(VertxUnitRunner.class)
public class ExampleTest {
  private static final String TOPIC = "";
  private static final String GROUP = "Test" + System.currentTimeMillis();
  private static final String SERVERS = "";

  private static final Logger logger = LoggerFactory.getLogger(ExampleTest.class);

  private Vertx vertx;
  private Async async;

  @Test
  public void test(TestContext context) {
    this.vertx = Vertx.vertx();
    this.async = context.async();
    Map<String, String> consumerConfig = createConsumerConfig();
    KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, consumerConfig);
    consumer.exceptionHandler(e -> logger.error("consumer error", e));
    consumer.handler(h -> {});
    consumer.subscribe(TOPIC);

    vertx.setTimer(2000, timer -> {
      logger.info("pause");
      consumer.pause();
    });

    vertx.setTimer(3000, timer -> {
      logger.info("commit");
      consumer.commit(commit -> {
        logger.info("committed");
        async.complete();
      });
    });

    async.awaitSuccess(30000);
  }

  private Map<String, String> createConsumerConfig() {
    Map<String, String> consumerConfig = new HashMap<>();
    consumerConfig.put("bootstrap.servers", SERVERS);
    consumerConfig.put("group.id", GROUP);
    consumerConfig.put("session.timeout.ms", "30000");
    consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("auto.offset.reset", "earliest");
    consumerConfig.put("enable.auto.commit", "false");
    return consumerConfig;
  }
}

Seraksab avatar Jan 07 '19 10:01 Seraksab