parallel-consumer
parallel-consumer copied to clipboard
Batching not working as expected
Hi Team,
I mainly wanted to use the batching feature of Parallel Consumer so started doing a POC around it.
Currently the Kafka topic has 6 partitions and each partition has around 15k messages.
I wanted to consume the data in batches and each batch will be of 10 messages. Below are the code snippets of current parallel consumer configuration. Wanted the data to be consumed in a ordered way.
ParallelConsumerOptions
final Consumer<String, String> consumer = new KafkaConsumer<>(appProperties);
final ParallelConsumerOptions options = ParallelConsumerOptions.<String, String>builder()
.consumer(consumer)
.ordering(PARTITION)
.batchSize(10)
.build();
ParallelStreamProcessor<String, String> eosStreamProcessor = createEosStreamProcessor(options);
appProperties
# Consumer properties
bootstrap.servers=localhost:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.interval.ms=300000
enable.auto.commit=false
auto.offset.reset=earliest
# Application-specific properties
input.topic.name=<name of the topic>
Consumer Poll
public void runConsume(final Properties appProperties) {
String topic = appProperties.getProperty("input.topic.name");
LOGGER.info("Subscribing Parallel Consumer to consume from {} topic", topic);
parallelConsumer.subscribe(Collections.singletonList(topic));
LOGGER.info("Polling for records. This method blocks", topic);
parallelConsumer.poll(context -> {
List<String> payload = context.stream().map(this::preparePayload).collect(Collectors.toList());
System.out.println("********* " + payload.size() + " **********");
});
}
preparePayload
private String preparePayload(RecordContext<String, String> stringStringRecordContext) {
ConsumerRecord<String, String> consumerRecords = stringStringRecordContext.getConsumerRecord();
int failureCount = stringStringRecordContext.getNumberOfFailedAttempts();
System.out.println("Value: " + consumerRecords.key() + " - Partition: " + consumerRecords.partition() + " Offset: " + consumerRecords.offset());
return msg("{}, {}", consumerRecords, failureCount);
}
Now despite setting the batching to 10, the data is being consumed in random size of batches(1,2,3 < 10). Could someone please help me out.
Thanks in advance.
Regards, Dixit
Hi,
The batching logic is not designed to wait for batches to be full before processing - its more optimization for when there is more data to process (i.e. processing is slower than consuming from Kafka) and processing function supports batching - send in batches.
So it looks like potentially your processing is faster than polling from Kafka - in that case you wont have full batches of records as there is not enough records accumulated in the queues yet.
One thing to check is - consumer options for fetch size, max poll records etc - to make sure you are feeding enough records into Parallel Consumer per poll. Another - what processing are you doing? - if you are only logging them for testing the flow - you will be processing them (logging) faster than you can read from Kafka - you can add a sleep / wait to replicate the processing that you plan to do and time it takes to be more realistic.
Even with fast processing - if Topic partitions have enough data on them already and consumer is configured to return 10+ records per poll - you should get the batch filled.
You could test polling parameters using plain Kafka Consumer and checking how many records per poll it actually returns - in test stub / simple application etc.
Hi, we have the same problem I think: we have a topic with a lot of pre-produced data and use partition ordering with batch size of 1000. Still we just get 1 message per partition per poll. Since we want to process the messages per partition with batched database calls this is much slower if done one-by-one. Any chance that we change the configuration and get real batches per partition here? Polling itself does not seem to be a problem here since I configured the KafkaConsumer with batch size of 10.000 and I can see that this is fetched in ConsumerManager as well. Something in between reduces the batch to 1 per partition in the poll() call. Thanks :)
In our case, we dropped the idea of using the batching mechanism in parallel consumer. But while doing analysis I have few observations given below.
- If you are using the order as PARTITION then you will end up 1 message per partition.
- For other orders(KEY, UNORDER) you can play around these Kafka configs to get more messages in a batch.
max.poll.records fetch.max.bytes max.partition.fetch.bytes fetch.min.bytes fetch.max.wait.ms
- We also tried the above properties with normal consumer and batching mechanism was more promising.
Consume records like this:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
HTH!
The problem appears to be that "work" (i.e., polled records) is being queued based on shards:
https://github.com/confluentinc/parallel-consumer/blob/7231f62ba352a26ba67183ebce4a9ca1a60c796f/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java#L250-L263
For processing order PARTITION
, each partition is a shard. For processing order KEY
, each key is a shard.
For both PARTITION
and KEY
, shard.getWorkIfAvailable(remainingToGet)
can only ever return a single record:
https://github.com/confluentinc/parallel-consumer/blob/7231f62ba352a26ba67183ebce4a9ca1a60c796f/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java#L133-L138
This means that, for all modes except UNORDERED
, the maximum batch size is limited by:
- Number of partitions (for
PARTITION
) - Number of unique keys (for
KEY
)
As @doppelrittberger mentioned, this is counter-intuitive, as batching is most useful for multiple records in the same shard. Batching currently does not help me deal with many records of the same key, or many records in the same partition.
@rkolesnev, are there any major downsides to lifting the above restriction? I think even ordered shards should be able to return multiple records, but I can't fully grasp the impact this may have on the system (e.g. offset tracking).