Large Number of Consumers Prevent Test Completion (Kafka Driver)
Hello OpenMessaging maintainers - thank you for your work on this project!
I'm running some very large benchmarks (20k consumers) spread out over 8 very large machines to simulate a large scale test. As the test nears completion (I suspect when results are aggregated) there are numerous consumer errors. These result in the WorkloadGenerator timing out.
Even with smaller tests, I see the timeouts due consumer errors at the end of the tests.
For example, I can see the aggregate high level stats are OK:
e.g.
15:30:11.050 [main] INFO WorkloadGenerator - Pub rate 8983.0 msg/s / 8.8 MB/s | Pub err 0.0 err/s | Cons rate 8983.0 msg/s / 8.8 MB/s | Backlog: 0.0 K | Pub Latency (ms) avg: 2.0 - 50%: 2.0 - 99%: 2.5 - 99.9%: 6.7 - Max: 31.0 | Pub Delay Latency (us) avg: 57.1 - 50%: 57.0 - 99%: 61.0 - 99.9%: 65.0 - Max: 1462.0
15:30:21.169 [main] INFO WorkloadGenerator - Pub rate 8988.9 msg/s / 8.8 MB/s | Pub err 0.0 err/s | Cons rate 8988.5 msg/s / 8.8 MB/s | Backlog: 0.0 K | Pub Latency (ms) avg: 2.0 - 50%: 2.0 - 99%: 2.5 - 99.9%: 4.6 - Max: 18.3 | Pub Delay Latency (us) avg: 57.3 - 50%: 57.0 - 99%: 64.0 - 99.9%: 67.0 - Max: 124.0
15:30:21.255 [main] INFO WorkloadGenerator - ----- Aggregated Pub Latency (ms) avg: 2.0 - 50%: 2.0 - 95%: 2.3 - 99%: 2.5 - 99.9%: 6.7 - 99.99%: 18.3 - Max: 61.4 | Pub Delay (us) avg: 57.9 - 50%: 57.0 - 95%: 60.0 - 99%: 64.0 - 99.9%: 70.0 - 99.99%: 1462.0 - Max: 20531.0
There are no errors, no backlog, and steady throughput at the rate I've specified. However, the test times out and the consumer logs show errors. Note that with high throughput tests and fewer numbers of consumers I do not see the issue.
Example Test Setup
Driver:
name: MyTestDriver
driverClass: io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkDriver
# Kafka client-specific configuration
replicationFactor: 3
reset: true
topicConfig: |
min.insync.replicas=2
commonConfig: |
bootstrap.servers=myserver:9092
# add additional time for large scale tests.
request.timeout.ms=480000
socket.connection.setup.timeout.ms = 30000
socket.connection.setup.timeout.max.ms = 60000
producerConfig: |
acks=all
linger.ms=1
batch.size=1048576
consumerConfig: |
auto.offset.reset=earliest
enable.auto.commit=false
max.partition.fetch.bytes=10485760
# try to fix the issue of consumer not consuming messages
max.poll.records = 50
max.poll.interval.ms = 300000
#session.timeout.ms = 90000
#group.min.session.timeout.ms=6000
#group.max.session.timeout.ms=92000
Workload:
name: services-1
# overprovision partitions to rule out contention
partitionsPerTopic: 15
messageSize: 1024
payloadFile: "payload/payload-1Kb.data"
subscriptionsPerTopic: 1
consumerPerSubscription: 12
producersPerTopic: 1
producerRate: 9000
consumerBacklogSizeGB: 0
warmupDurationMinutes: 1
testDurationMinutes: 3
Consumer Errors
Some of the consumer errors include:
15:24:15.049 [pool-116-thread-1] ERROR ConsumerCoordinator - [Consumer clientId=consumer-sub-000-Pp8hAjg-115, groupId=sub-000-Pp8hAjg] Offset commit with offsets {test-topic-0000022-tQSNfvA-0=OffsetAndMetadata{offset=2082, leaderEpoch=null, metadata=''}} failed
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException
15:24:14.921 [pool-13-thread-1] ERROR ConsumerCoordinator - [Consumer clientId=consumer-sub-000-trBz_PI-12, groupId=sub-000-trBz_PI] Offset commit with offsets {test-topic-0000141-ZzmWoJA-0=OffsetAndMetadata{offset=2080, leaderEpoch=null, metadata=''}} failed
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
I've attached output and logs of a test that exhibits these symptoms.
benchmark-output.txt sanitized-benchmark-worker.log
Would you have any suggestions for running the benchmarks with extremely large numbers of consumers? Happy to provide more information if you need it.
Edit: Perhaps all of the rebalancing is blocking some requests around shutdown/closing consumers?
Thanks!
FYI, I tried setting the internal property, internal.leave.group.on.close = false which didn't seem to make a difference on the system I was benchmarking.
Adding some instrumentation showed that the Kafka driver's close API was taking up to 800 milliseconds to complete. Waiting for all to consumers to close would have taken quite a long time.
One more data point - as an experiment, I commented out consumer.close() in the Kafka driver, and this test made it completion.
I'm wondering if gathering stats and writing the output file before closing the consumers would work, followed by invoking the consumer.close() APIs in an executor to parallelize work that occurs during the close.
Another option might be to use the close API that accepts a duration, but with 20k consumers I'm not sure if that'd help enough on its own.
wdyt?
I notice theworker.stopAll() API is called in the WorkloadGenerator.run() before the method exits (and before the results file is generated). This prevents the results from from being generated as worker.stopAll() takes a very long time to complete and the benchmark times out. Note that worker.stopAll() is also called later on during the workload shutdown.
Removing this line allows me generate the results file. The benchmark still times out from the subsequent worker.stopAll() call is made though, but at least I can get results.
Is worker.stopAll() necessary in WorkloadGenerator.run()?
This issue has been automatically marked as stale because it has not had recent activity. It will be closed after 5 days if no further activity occurs. Thank you for your contributions.