librdkafka
librdkafka copied to clipboard
Quadratic behavior in timer scheduling
Description
Timer scheduling is O(n^2) in the number of timers (under a reasonable assumption about the interval distribution).
How to reproduce
Consume from any topic with 20k partitions with a non-zero statistics interval configured. Note a large amount of time spent in rd_kafka_timer_schedule_next (usually actually inside rd_kafka_timer_cmp which is inlined), both during initial creation of the topic and then even during idle (with poll being called periodically), with a typical stack for the latter case shown below:
rdk:main 905626 339264.203807: 32365636 cycles:
55648d5c00fc rd_kafka_timer_cmp+0x4c (inlined)
55648d5c00fc rd_kafka_timer_schedule_next+0x4c (/home/tdowns/dev/librdkafka-travis/examples/many_consumers)
55648d5c0908 rd_kafka_timer_schedule+0x1e8 (inlined)
55648d5c0908 rd_kafka_timer_schedule+0x1e8 (inlined)
55648d5c0908 rd_kafka_timers_run+0x1e8 (/home/tdowns/dev/librdkafka-travis/examples/many_consumers)
55648d5947e8 rd_kafka_thread_main+0x228 (/home/tdowns/dev/librdkafka-travis/examples/many_consumers)
7f8f785c2809 start_thread+0x139 (/usr/lib/x86_64-linux-gnu/libc.so.6)
The problem is that timer scheduling is quadratic, because each time a timer is popped from the front, we do a linear search through the list to find the new position, which in this case is always the end of the list (the timers all have the same interval so a just popped timer will reschedule with a time older than the others). So in one interval 20k timers will be resheduled each of which will look through 20k entries.
The overwhelming amount time is spent in this hot assembly loop:

This is just doing the linked list traversal and looking for the end of the list or the position to insert the element. At best, this loop can execute one iteration every 4 cycles, so 20k reshedules costs 20,000^2 * 4 = 1.6 billion cycles, or half a second of CPU time on a 3.2 GHz machine. With a higher partition count or more clients, the CPU can be saturated just running timers.
In this case, the timers were the statistics callbacks rktp_consumer_lag_tmr but it could apply to any timer. As a workaround for this specific high partition count + stats case, we can disable statistics.
This occurs on current master f7f527d8f2ff7f5bd86856ddc43115eb4dfbba97.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka version (release number or git tag): f7f527d8f2ff7f5bd86856ddc43115eb4dfbba97
- [x] Redpanda 22
- [x] librdkafka client configuration:
# Global config
builtin.features = gzip,snappy,sasl,regex,lz4,sasl_plain,plugins
client.id = rdkafka
client.software.name = librdkafka
message.max.bytes = 1000000
message.copy.max.bytes = 65535
receive.message.max.bytes = 100000000
max.in.flight.requests.per.connection = 1000000
metadata.request.timeout.ms = 10
topic.metadata.refresh.interval.ms = 300000
metadata.max.age.ms = 900000
topic.metadata.refresh.fast.interval.ms = 250
topic.metadata.refresh.fast.cnt = 10
topic.metadata.refresh.sparse = true
topic.metadata.propagation.max.ms = 30000
debug =
socket.timeout.ms = 60000
socket.blocking.max.ms = 1000
socket.send.buffer.bytes = 0
socket.receive.buffer.bytes = 0
socket.keepalive.enable = false
socket.nagle.disable = false
socket.max.fails = 1
broker.address.ttl = 1000
broker.address.family = any
socket.connection.setup.timeout.ms = 30000
connections.max.idle.ms = 0
enable.sparse.connections = true
reconnect.backoff.jitter.ms = 0
reconnect.backoff.ms = 100
reconnect.backoff.max.ms = 10000
statistics.interval.ms = 1000
enabled_events = 0
error_cb = 0x5643d7876600
throttle_cb = 0x5643d7876250
stats_cb = 0x5643d78767d0
log_cb = 0x5643d7878fd0
log_level = 6
log.queue = false
log.thread.name = true
enable.random.seed = true
log.connection.close = true
socket_cb = 0x5643d788e800
open_cb = 0x5643d78b1770
default_topic_conf = 0x5643d85407d0
internal.termination.signal = 29
api.version.request = true
api.version.request.timeout.ms = 10000
api.version.fallback.ms = 0
broker.version.fallback = 0.10.0
security.protocol = plaintext
ssl.ca.certificate.stores = Root
ssl.engine.id = dynamic
enable.ssl.certificate.verification = true
ssl.endpoint.identification.algorithm = none
sasl.mechanisms = GSSAPI
sasl.kerberos.service.name = kafka
sasl.kerberos.principal = kafkaclient
sasl.kerberos.kinit.cmd = kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}
sasl.kerberos.min.time.before.relogin = 60000
enable.sasl.oauthbearer.unsecure.jwt = false
enable_sasl_queue = false
sasl.oauthbearer.method = default
test.mock.num.brokers = 0
test.mock.broker.rtt = 0
group.id = g2
partition.assignment.strategy = roundrobin
session.timeout.ms = 45000
heartbeat.interval.ms = 3000
group.protocol.type = consumer
coordinator.query.interval.ms = 600000
max.poll.interval.ms = 300000
enable.auto.commit = true
auto.commit.interval.ms = 5000
enable.auto.offset.store = false
queued.min.messages = 100000
queued.max.messages.kbytes = 65536
fetch.wait.max.ms = 500
fetch.message.max.bytes = 1048576
fetch.max.bytes = 52428800
fetch.min.bytes = 1
fetch.error.backoff.ms = 500
offset.store.method = broker
isolation.level = read_committed
offset_commit_cb = 0x5643d7876290
enable.partition.eof = false
check.crcs = false
allow.auto.create.topics = false
client.rack =
transaction.timeout.ms = 60000
enable.idempotence = false
enable.gapless.guarantee = false
queue.buffering.max.messages = 100000
queue.buffering.max.kbytes = 1048576
queue.buffering.max.ms = 5
message.send.max.retries = 2147483647
retry.backoff.ms = 100
queue.buffering.backpressure.threshold = 1
compression.codec = none
batch.num.messages = 10000
batch.size = 1000000
delivery.report.only.error = false
sticky.partitioning.linger.ms = 10
# Topic config
request.required.acks = -1
request.timeout.ms = 30000
message.timeout.ms = 300000
queuing.strategy = fifo
produce.offset.report = false
partitioner = consistent_random
compression.codec = inherit
compression.level = -1
auto.commit.enable = true
auto.commit.interval.ms = 60000
auto.offset.reset = smallest
offset.store.path = .
offset.store.sync.interval.ms = -1
offset.store.method = broker
consume.callback.max.messages = 0
- [x] Operating system: Ubuntu 21.10
This is a great bug report, @travisdowns!
Having that many partitions for a small number of topics is a bit of a special use-case which librdkafka is not optimized for (obviously), but this should be fixed nontheless. Another problem with consumer lag monitoring is that it creates one OffsetsRequest per partition, even though the same broker may be leader for multiple partitions, so in your case that's also a very large amount of protocol requests that will be sent - which in turn may cause head of line blocking for more necessary requests.
So, two action points:
- reimplement timers, perhaps using a timer wheel like https://25thandclement.com/~william/projects/timeout.c.html
- try to consolidate consumer_lag requests
Having that many partitions for a small number of topics is a bit of a special use-case which librdkafka is not optimized for (obviously), but this should be fixed nontheless.
Yes, it is a bit special, but unfortunately we cannot always control the number of partitions someone chooses to use and there may be some valid reasons for such a high number (real world case: need to distribute among 10,000s of clients, need at least one partition per client if using a dynamic consumer group).
Your suggestions sound great. I also think even something O(log(n)) like a priority heap would be fine, but timer wheel looks even more specialized to this problem (not sure about the overhead at very small counts, priority heap has similar overhead to a linked list for small counts, which is nice).