kafka handles keeps topic handles alive and refreshes their metadata beyond subscription
Description
When an rd_kafka_t consumer handle is reused for consuming multiple topics, it keeps references to all topics it ever listened to and keeps refreshing their metadata even after app stopped consuming them and potentially after topics have been deleted.
In my use case, we reuse kafka consumer handle to consume multiple topics, some of which are short lived. After few days, kafka consumer repeated polls metadata for thousands of already deleted topics a second. We have been rolling our consumer processes to reset consumer state to workaround this issue.
Might not be the root case, but the logs shows some rd_kafka_toppar_t instances that keep the topic ref counter positive, but they never get deleted. In my reproduction, there were two toppar created: one for RD_KAFKA_PARTITION_UA and one for the partition we consumer. They don't get freed until the consumer handle is destroyed
How to reproduce
I've used the rdkafka_example to reproduce the problem. The code is in https://github.com/notnoop/librdkafka/commit/ef06a2c0454c20f7c50debc63bdb7e80573b6e61 . In short, it blocks between topic and kafka handle destruction to allow examining state in between.
Reproduction steps:
# prepare code, and provision kafka instance
./configure --debug --enable-refcnt-debug && make
docker-compose up -d
docker-compose exec kafka kafka-topics --bootstrap-server localhost:29092 --create --topic test1
# publish sample messages
echo message1 | ./examples/rdkafka_example -P -t test1 -b localhost:29092 -p 0
echo message2 | ./examples/rdkafka_example -P -t test1 -b localhost:29092 -p 0
# start consuming (100 ms isn't not our production value, and retrospect should have been larger value)
./examples/rdkafka_example -C -t test1 -b localhost:29092 -p 0 -X topic.metadata.refresh.interval.ms=100 -Xdebug=all 2>verboselog
# in another terminal
# after awhile
pkill -SIGUSR1 rdkafka_example
# delete topic
docker-compose exec kafka kafka-topics --bootstrap-server localhost:29092 --delete --topic test1
# eventually kill process before handle destruction
pkill -9 rdkafka_example
My full output is found at https://gist.github.com/notnoop/007b16362b7e8cb5e73d5f16fdcdc300 .
We observe the following:
- Destroying the topic decreases refcnt from 3 to 2 (and app_refcnt from 1 to 0) but toppar isn't removed
#### After destroying topic
rd_kafka_t 0x5612dfea2e90: rdkafka#consumer-1
producer.msg_cnt 0 (0 bytes)
rk_rep reply queue: 0 ops
brokers:
rd_kafka_broker_t 0x5612dfeaaa30: :0/internal NodeId -1 in state INIT (for 0.060s)
refcnt 3
outbuf_cnt: 0 waitresp_cnt: 0
0 messages sent, 0 bytes, 0 errors, 0 timeouts
0 messages received, 0 bytes, 0 errors
0 messageset transmissions were retried
0 toppars:
rd_kafka_broker_t 0x5612dfeab870: localhost:29092/1 NodeId 1 in state UP (for 0.050s)
refcnt 6
outbuf_cnt: 0 waitresp_cnt: 1
4 messages sent, 273 bytes, 0 errors, 0 timeouts
3 messages received, 775 bytes, 0 errors
0 messageset transmissions were retried
1 toppars:
test1 [0] broker localhost:29092/1, leader_id localhost:29092/1
refcnt 4
msgq: 0 messages
xmit_msgq: 0 messages
total: 0 messages, 0 bytes
cgrp:
topics:
test1 with 1 partitions, state exists, refcnt 2
test1 [-1] broker none, leader_id none
refcnt 1
msgq: 0 messages
xmit_msgq: 0 messages
total: 0 messages, 0 bytes
desired partitions:
Metadata cache with 1 entries:
test1 (inserted 49ms ago, expires in 250ms, 1 partition(s), valid)
- After the topic is deleted, the metadata is still getting refreshed (as seen in verboselog and through SIGUSR1 output):
Metadata cache with 1 entries:
test1 (inserted 96ms ago, expires in 3ms, 0 partition(s), valid) error: Broker: Unknown topic or partition
...
Metadata cache with 1 entries:
test1 (inserted 67ms ago, expires in 32ms, 0 partition(s), valid) error: Broker: Unknown topic or partition
1678370040.023 RDKAFKA-7-TOPPARNEW: rdkafka#consumer-1: [thrd:app]: NEW test1 [-1] 0x5612dfeac760 refcnt 0x5612dfeac7f0 (at rd_kafka_topic_new0:468)
1678370040.023 RDKAFKA-7-TOPPARNEW: rdkafka#consumer-1: [thrd:app]: NEW test1 [0] 0x5612dfeacb80 refcnt 0x5612dfeacc10 (at rd_kafka_toppar_desired_add:636)
- confirmed that the topic
rkt_app_refcntgot decremented to 0 after the destruction, but the internalrk_refcntwent to 2.
REFCNT DEBUG: &rkt->rkt_app_refcnt 1 -1: 0x5612dfea2380: rd_kafka_topic_destroy_app:81
REFCNT DEBUG: &rkt->rkt_refcnt 3 -1: 0x5612dfea22b8: rd_kafka_topic_destroy0:198
- After process is killed before handle destruction is called,
verboselogshows two toppar creation without corresponding removal
$ grep TOPPAR ./verboselog
1678370040.023 RDKAFKA-7-TOPPARNEW: rdkafka#consumer-1: [thrd:app]: NEW test1 [-1] 0x5612dfeac760 refcnt 0x5612dfeac7f0 (at rd_kafka_topic_new0:468)
1678370040.023 RDKAFKA-7-TOPPARNEW: rdkafka#consumer-1: [thrd:app]: NEW test1 [0] 0x5612dfeacb80 refcnt 0x5612dfeacc10 (at rd_kafka_toppar_desired_add:636)
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka version (release number or git tag): repro case with https://github.com/notnoop/librdkafka/commit/ef06a2c0454c20f7c50debc63bdb7e80573b6e61 based on latest master c75eae84846b1023422b75798c41d4b6b1f8b0b7 .
- [x] Apache Kafka version: Using latest docker image, currently 7.3.2
- [x] librdkafka client configuration: default example cases with
-X topic.metadata.refresh.interval.ms=100 -Xdebug=allto ease debugging - [x] Operating system: Ubuntu 20.04.5
- [x] Provide logs (with
debug=..as necessary) from librdkafka: https://gist.github.com/notnoop/007b16362b7e8cb5e73d5f16fdcdc300 - [ ] Provide broker log excerpts
- [ ] Critical issue
@notnoop Thanks for reporting this!
Your issue seems very similar to the one fixed in #4187, that also happened when a topic is removed. Could you try it to see if it solves this issue?
Thanks for the pointer. Just re-run my scripts against latest master and it behaved the same way.
One issue I see is circular dependency between the topic (rkt) and its unprovisioned partition (rkt->rkt_ua). They keep their refcnt over 1 until rd_kafka_topic_partitions_remove is called at rd_kafka_t termination: https://github.com/confluentinc/librdkafka/blob/578589db0feb5961f4ca02a0ef2587b659bcab78/src/rdkafka_topic.c#L1446-L1449 .
Yes, it seems a different problem and could be fixed in the same point. We're checking this, thanks!
Check if there is an update? I researched the problem further to understand it, prototyped a workaround that pass my tests in https://github.com/confluentinc/librdkafka/compare/master...notnoop:librdkafka:free_partitions_with_topic .
The issue seems that when app destroys the last topic app reference, there are many loose referencee (some being circular through partitions/ops) that only get freed at rd_kafka_t termination. The change moves the removal of partitions and kicks off RD_KAFKA_OP_PARTITION_LEAVE operation to free the resources.
The workaround PR has two main limitations:
- It doesn't handle subscribers (via rd_kafka_assign) . The assign and unassign APIs don't take a
rd_kafka_topic_treference, and it creates new instances without incrementing the app refcnt. If the app mixes uses of rd_topic_t directly and assign api, the topic queues may be prematurely closed while subscriptions are still valid. We can mitigate this by ensuring assign calls increment the app refcnt. - it doesn't address publishers: many tests and consumers expect to call
rd_kafka_producethen immediately destroys topic without waiting for acknowledgement; so immediately purging queues isn't an option.
@emasab Checking for an update if you have any. We appreciate feedback on the branch linked above - I can turn into it a proper fix with feedback.
Also, thank you for the label - I feel very honored :yay:.
BTW - I've iterated through the branch above and tried different methods for deleting topic handles; however, I ran into some concurrency issues where some operations stalled or failed unexpectedly. My hunch is that unlocked traversals of rk topics (e.g. in rd_kafka_topic_find* invocations with do_lock = 0) is the cause.
Pursuing a smaller/safer change, a change to simply skip querying metadata of deleted topics. I've discovered that deleted topics get their metadata fetched every second without backoff. Such change is good enough for us for now. The change we are running locally is https://github.com/confluentinc/librdkafka/pull/4310 .