parallel-consumer
parallel-consumer copied to clipboard
Different computational results obtained with different max concurrency configurations for the same parallel consumer
Hi @astubbs and PC Team,
I have been facing an issue with the parallel consumer that I recently introduced in one of my applications. The application is a microservice deployed as pod within kubernetes cluster, 6 pods deployed that host consumer that consume from a 2 topics with 6 partitions each, so 1 partition per pod. Earlier used regular kafka consumer and recently switched to parallel consumer (version 0.5.2.2), and used a separate parallel consumer for the 2 topics, with KEY ordering. This is an analytics application so based on the inflow of events, it does certain computations and saves the results into database from where reports are printed.
Results:
-
For topic1, it contains thousands of unique keys, that get divided across 6 partitions, thus hundreds of unique keys per partition. Used maxConcurrency setting to 3 for this PC. No lag seen on the PC and the computation end results are comparable to that of regular kafka consumer.
-
For topic2, it contains around 20 unique that don't get distributed across partitions evenly (due to kafka's default partitioning strategy) where some partition gets 5 unique keys while some hardly get 1 or 2 unique keys. Used maxConcurrency setting to 6 for this PC. No lag seen on the PC but computation end results are lesser than expected to that of regular kafka consumer. Then I reduced the maxConcurrency to 3 for this PC and end results looked much better and much comparable to that of regular kafka consumer. This was surprising that for different maxConcurrency settings, the results were different.
I still need to dig into logs to see if PC caused data loss for some of the events when maxConcurrency was 6, will do that later once we have all the logs available. However, is there any such limitation of parallel consumer wherein the maxConcurrency, if greater than number of unique keys in the topic, would cause any inconsistency issues while processing the incoming events with KEY order guarantee?
What version did you use?
However, is there any such limitation of parallel consumer wherein the maxConcurrency, if greater than number of unique keys in the topic, would cause any inconsistency issues while processing the incoming events with KEY order guarantee?
No. Can you show your code?
What version did you use?
0.5.2.2
However, is there any such limitation of parallel consumer wherein the maxConcurrency, if greater than number of unique keys in the topic, would cause any inconsistency issues while processing the incoming events with KEY order guarantee?
No. Can you show your code?
//parallel consumer flag enabled then use Confluent Parallel Consumer, else go with old regular kafka consumer
if (kafkaTuningConfig.isParallelConsumer()) {
LOG.info("parallel consumer is enabled for measure processor events, hence starting parallel consumer instead of regular kafka consumer");
Set<String> subscriptionTopics = subscriptionManager().getSubscriptionTopics(analyticsProperties.getInputTopic().getName());
Map<String, Object> parallelConsumerProperties = getParallelConsumerProperties();
MeasureProcessorEventsReceiverListener eventsReceiverListener = new MeasureProcessorEventsReceiverListener(measureProcessorEventHandler());
List<String> heavyDutyTopics = getHeavyDutyTopics();
List<String> lowDutyTopics = new ArrayList<String>();
for (String subscriptionTopic : subscriptionTopics) {
if (heavyDutyTopics.contains(subscriptionTopic)) {//this is heavy duty topic hence need a separate parallel consumer
//special attention to Queue and QueueByChannel dimension for concurrency since these seem to be lag during performance runs
ParallelConsumerOptions<String, MeasureInputEvent> options = ParallelConsumerOptions.<String, MeasureInputEvent>builder()
.ordering(KEY)
.maxConcurrency((subscriptionTopic.equalsIgnoreCase(QUEUE_TOPIC) || subscriptionTopic.equalsIgnoreCase(QUEUEBYCHANNEL_TOPIC))
? kafkaTuningConfig.getQueueTopicMaxConcurrency() : kafkaTuningConfig.getParallelConsumerMaxConcurrency())
.consumer(new KafkaConsumer<>(getParallelConsumerProperties()))
.retryDelayProvider(workContainer -> Duration.ofSeconds((long)(10 + Math.pow(Math.E, workContainer.getNumberOfFailedAttempts()))))
.build();
ParallelStreamProcessor<String, MeasureInputEvent> eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options);
eosStreamProcessor.subscribe(List.of(subscriptionTopic));
separateParallelConsumers.add(eosStreamProcessor);
new ParallelReceiverMode(eosStreamProcessor, eventsReceiverListener).execute();
LOG.info("Started separate parallel consumer for the heavy duty topic {}", subscriptionTopic);
} else {//not a heavy duty topic hence add it to common parallel consumer instead of separate consumer
lowDutyTopics.add(subscriptionTopic);
}
}
the topic that i am talking about above is QUEUE_TOPIC, for which maxConcurrency reduced down to 3 from 6 and it gave better results, comparable to regular kafka consumer
is it possible for you to test this scenario specifically in your test bed, does you test bed test and verify the ordering and end result of PC for such scenarios?
Hi @astubbs can you comment on above request regarding the scenario test in your test bed? we want to enable the PC in production however want to make sure that this is tested before doing that
Hi @pdeole , Current threw a spanner in the works! I'm just finished working on a couple other things for PC, and will look more closely at this, this week.
Is it possible for you to test this scenario specifically in your test bed, does you test bed test and verify the ordering and end result of PC for such scenarios?
Yes, def I intend to make sure there is good coverage for this scenario. I'm pretty sure there already is, but I will do an audit.
However, is there any such limitation of parallel consumer wherein the maxConcurrency, if greater than number of unique keys in the topic, would cause any inconsistency issues while processing the incoming events with KEY order guarantee?
No, it's specifically written to allow for this scenario while enforcing KEY ordering. It's quite a simple part of the code which does this. But as I said, I'll double check.
Clone my fork if you're curious where current efforts are going.
When you say:
No lag seen on the PC but computation end results are lesser than expected to that of regular kafka consumer.
What exactly do you mean by computation end results?
Hi @pdeole , I wrote this test, can you take a look at it? Is this the situation you describe?
- #450
If you like, check out the branch and try playing with the values...
Thanks @astubbs i was on vacation, and just got back. I will try it out however while doing mvn build for the repository, I get below error: seems this library is something that is not accessible/downloadable from my companies maven repository:
Ah yes, I really must get that published to central. Work around - clone that repo and install the version it needs locally?
https://github.com/astubbs/truth-generator
I cloned the above repo and did a build. It installed the truthgenerator plugin, however it added snapshot to the jar. Also, the parallelconsumer is referencing 0.1.1 version in its pom whereas the plugin jar version is generated 0.1.2-snapshot. Not sure which one should be used. Should i update parallel consumer pom to use 0.1.2-snapshot instead of 0.1.1?
After cloning, you should check out the release tag, instead of installing from master.
@astubbs I will try the work around, however will you be able get the jars published to central so they will be fetched during maven build? how long will it take to publish the libraries to central?
Since i resolved the truthgenerator error however getting a new error now while building parallelconsumercore:
Maven does fetch them, but not from maven central. I don't have an estimate on when truth generator will be published to maven central.
Seems your proxy is also blocking that dependency too.
I got past the truthgenerator error however got into another one... any suggestions on this error, how can i get this library:
Hi @astubbs I got past above error by adding the jar manually in my local m2 directory, and the build is running with tests. I see some failures though, which i didn't expect. When I keep below configuration in the lessKeysThanThreads test method in ParallelEoSStreamProcessorTest, then following test failures occur:
void lessKeysThanThreads() {
setupParallelConsumerInstance(ParallelConsumerOptions.<String, String>builder()
.ordering(KEY)
// use many more threads than keys
.maxConcurrency(3)
.build());
// use a small set of keys, over a large set of records
final int keySetSize = 5;
var keys = range(keySetSize).list();
final int total = 20_000;
If I just make a change to total = 50_000;
I get following failures in the build:
In both above runs, there are not failures in ParallelEoSStreamProcessorTest.
However, when I increased total to 100_000, I did get failures in the lessKeysThanThreads() method, wherein it timed out on condition. I did increase timeout to 300 seconds, still no luck, which means we probably losing some events here?
// count how many we've received so far
await().atMost(300, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(counter.get()).isEqualTo(total));
Are you running a docker host locally? It needs a docker host to run.
I should add a check for that...
No, I am not running docker host locally, however I still have a passing test with first 2 configurations I shared. I would like you to try the third configuration I shared above.
ok, I'll take a look at it. The BrokerIntegrationTests need a docker host to run.
On my laptop, with total= 100k, test passes in 1m22s (with the timeouts turned up).
Which all timeouts did you turn up and where, apart from the one I changed to 300 second, since the error message in failure says "timed out after 1 min", so not sure which timeout it is referring to?
That will be the class level timeout setting:
@Timeout(value = 10, unit = MINUTES)
which I increased from 1 to 10 minutes
thanks @astubbs that worked for me... I would also like to understand more on this test, so as I understand you have used mock classes here for kafka testing. The ktu is basically a KafkaTestUtil class that produces onto a test topic INPUT_TOPIC that has is single partition topic while consumerSpy is an instance of LongPollingMockConsumer. What do these mocks represent? Do they simulate the real world producers and consumers and can we rely that they would produce same results as real world consumers?
I would also like to try the topic with more than 1 partition, so is that possible with this test?
What do these mocks represent?
AK Consumer
Do they simulate the real world producers and consumers
Yes, but just the api. They don't talk to brokers. They focus on testing how PC handles data, as opposed to what the broker does.
and can we rely that they would produce same results as real world consumers?
Depends what you're testing. For what is being tested here, it should suffice.
If you want to run the same test on the real things - port the test to a new test class that extends BrokerIntegrationTest - that will use real everything. You will need docker.
I would also like to try the topic with more than 1 partition, so is that possible with this test?
Yes, but might need some editing. Just hard code the topic construction to whatever partition count you want - look for the line:
topicPartition = new TopicPartition(INPUT_TOPIC, 0);
in the super class.
I am just trying to get acquainted with those API, and looking into javadoc for TopicPartition API. So, as I understand the parameter 0 there is partition number and I will need to create another instance of TopicPartition with parameter 1, assigning that to another variable e.g. topicPartition1 or something, is that correct?
That is just a reference to a specific partition, but generally yes
Let me know how you get on, can reopen if we find an issue.
@astubbs I discussed this within my organization since we have to take a decision on whether to enable the parallel consumer in production however there were concerns raised by our executive management about the test coverage of this library and whether this library has enough test coverage in BrokerIntegrationTest for various scenarios that I described above. The reason of going into this discussion was the results with parallel consumer were bit different than for regular consumer. Having said that, can this library include additional test coverage to include additional tests for the scenarios I mentioned above, and also the test should include multiple partitions, order of processing of events, no data loss etc. Without this test coverage, the management thinks that it is risk to enable this in production since there is no official support from Confluent on this library.
All of the scenarios you describe have coverage