parallel-consumer icon indicating copy to clipboard operation
parallel-consumer copied to clipboard

MockConsumer does not work

Open jagrutmehta opened this issue 3 years ago • 5 comments

Parallel Consumer wrapper does not work with MockConsumer. It keeps on skipping messages with error message "Record in buffer for a partition no longer assigned. Dropping."

		
            mockconsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
	    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
	    TopicPartition tp = new TopicPartition(topic, 0);
	    startOffsets.put(tp, 0L);
	    mockconsumer.updateBeginningOffsets(startOffsets);
             mockconsumer.schedulePollTask(() -> {
	          mockconsumer.subscribe(Arrays.asList(new String[] {topic}));
                  /* Tried this one as well
                  mockconsumer.subscribe(Arrays.asList(new String[] {topic}), (ParallelEoSStreamProcessor)eosStreamProcessor);
                  */
	    	mockconsumer.rebalance(Collections.singletonList(new TopicPartition(topic, 0)) );
	    });

... doesn't work without scheduled task also

            mockconsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
	    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
	    TopicPartition tp = new TopicPartition(topic, 0);
	    startOffsets.put(tp, 0L);
	    mockconsumer.updateBeginningOffsets(startOffsets);
 	    mockconsumer.subscribe(Arrays.asList(new String[] {topic}));
            /* Tried this one as well
            mockconsumer.subscribe(Arrays.asList(new String[] {topic}), (ParallelEoSStreamProcessor)eosStreamProcessor);
            */
           mockconsumer.rebalance(Collections.singletonList(new TopicPartition(topic, 0)) );


Above code works fine without ParallelStreamProcessor and done polling directly on consumer.

jagrutmehta avatar Nov 09 '21 00:11 jagrutmehta

Oh very interesting. Should be a straight forward fix. Can you sub a PR for a failing test?

astubbs avatar Nov 15 '21 11:11 astubbs

@astubbs You mean PR with new test using mockconsumer which is failing?

jagrutmehta avatar Dec 27 '21 18:12 jagrutmehta

Yes, that shows the sort of use you're looking for.

astubbs avatar Dec 27 '21 18:12 astubbs

Question - do you need a MockConsumer specifically, or would a MockParallelConsumer also work?

astubbs avatar Jul 15 '22 12:07 astubbs

Ideally MockConsumer. In my case actual consumer implementation is controlled by flag (ramp etc). But if it is becoming complicated then separate MockParallelConsumer can also work.

jagrutmehta avatar Jul 18 '22 16:07 jagrutmehta

FYI just pushed #470 - this works fine as far as I can see, as expected. But using MockConsumer is difficult (because it's not a complete implementation). Use LongPollingMockConsumer instead.

Let me know if you still have questions.

astubbs avatar Nov 03 '22 13:11 astubbs