parallel-consumer
parallel-consumer copied to clipboard
MockConsumer does not work
Parallel Consumer wrapper does not work with MockConsumer. It keeps on skipping messages with error message "Record in buffer for a partition no longer assigned. Dropping."
mockconsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(topic, 0);
startOffsets.put(tp, 0L);
mockconsumer.updateBeginningOffsets(startOffsets);
mockconsumer.schedulePollTask(() -> {
mockconsumer.subscribe(Arrays.asList(new String[] {topic}));
/* Tried this one as well
mockconsumer.subscribe(Arrays.asList(new String[] {topic}), (ParallelEoSStreamProcessor)eosStreamProcessor);
*/
mockconsumer.rebalance(Collections.singletonList(new TopicPartition(topic, 0)) );
});
... doesn't work without scheduled task also
mockconsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(topic, 0);
startOffsets.put(tp, 0L);
mockconsumer.updateBeginningOffsets(startOffsets);
mockconsumer.subscribe(Arrays.asList(new String[] {topic}));
/* Tried this one as well
mockconsumer.subscribe(Arrays.asList(new String[] {topic}), (ParallelEoSStreamProcessor)eosStreamProcessor);
*/
mockconsumer.rebalance(Collections.singletonList(new TopicPartition(topic, 0)) );
Above code works fine without ParallelStreamProcessor
and done polling directly on consumer.
Oh very interesting. Should be a straight forward fix. Can you sub a PR for a failing test?
@astubbs You mean PR with new test using mockconsumer which is failing?
Yes, that shows the sort of use you're looking for.
Question - do you need a MockConsumer specifically, or would a MockParallelConsumer also work?
Ideally MockConsumer. In my case actual consumer implementation is controlled by flag (ramp etc). But if it is becoming complicated then separate MockParallelConsumer can also work.
FYI just pushed #470 - this works fine as far as I can see, as expected. But using MockConsumer is difficult (because it's not a complete implementation). Use LongPollingMockConsumer instead.
Let me know if you still have questions.