librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

Error while Consuming kafka topic

Open manik3pg opened this issue 4 years ago • 16 comments

Description:

We are intermittently getting following error while consuming kakfa atopic

*** rdkafka_partition.c:364:rd_kafka_toppar_set_fetch_state: assert: thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread) ***

Once we get above error the application crashes.

Kafka Topic configuration Partition Count = 1 Replication Factor = 3 Min In Sync Replicas = 2

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [x] librdkafka version (release number or git tag): 1.8.0
  • [x] Apache Kafka version: 2.8.0
  • [x] librdkafka client configuration: StatisticsIntervalMs = 0, SessionTimeoutMs = 10000, EnablePartitionEof = true, AutoOffsetReset = AutoOffsetReset.Latest,EnableAutoOffsetStore = false,SocketKeepaliveEnable = true, SocketTimeoutMs = 1500
  • [x] Operating system: AWS Fargate 1.4.0
  • [x] logs : *** rdkafka_partition.c:364:rd_kafka_toppar_set_fetch_state: assert: thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread) ***
  • [x] Provide broker log excerpts - No broker logs
  • [x] Critical issue

manik3pg avatar Oct 04 '21 12:10 manik3pg

Please fill out the checklist instead of removing it.

edenhill avatar Oct 04 '21 18:10 edenhill

I have filled the checklist can you please provide an update on this issue?

manik3pg avatar Oct 13 '21 12:10 manik3pg

Thank you, haven't seen this assert before. Can I ask you to reproduce this issue with Debug = "all,-fetch" and provide the logs (or at least some 100 or so lines prior to the assert)?

edenhill avatar Oct 13 '21 18:10 edenhill

The service is crashing with Debug = "all,-fetch" We reproduced the issue with Debug = "fetch" and please find the logs at the end of the comment

Some more info regarding our Kafka Cluster Number of AWS MSK kafka brokers = 3 Replication factor = 3 Min in sync replicas = 2 unclean.leader.election.enable = false replica.selector.class = org.apache.kafka.common.replica.RackAwareReplicaSelector

Logs -

*** rdkafka_partition.c:364:rd_kafka_toppar_set_fetch_state: assert: thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread) *** [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: Topic LeaderElectionTopic_Dev [0]: migrating from broker 2 to 3 (leader is 3): reverting from preferred replica to leader [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch 1/1/1 toppar(s) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch topic tlnTopic_Dev [0] at offset 6715549697 (v4) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Enqueue 167 message(s) (279133 bytes, 167 ops) on tlnTopic_Dev [0] fetch queue (qlen 0, v4, last_offset 6715549696, 0 ctrl msgs, uncompressed) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch 1/1/1 toppar(s) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch topic gpsTopic_Dev [0] at offset 25401963 (v4) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch 1/1/1 toppar(s) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch 1/1/1 toppar(s) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch topic syncTopic_Dev [0] at offset 36290959 (v4) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch topic tlnTopic_Dev [2] at offset 6714519613 (v4) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Enqueue 1 message(s) (10 bytes, 1 ops) on syncTopic_Dev [0] fetch queue (qlen 0, v4, last_offset 36290958, 0 ctrl msgs, uncompressed) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch 1/1/1 toppar(s) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch topic tlnTopic_Dev [1] at offset 6721672948 (v4) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Enqueue 172 message(s) (301426 bytes, 172 ops) on tlnTopic_Dev [1] fetch queue (qlen 0, v4, last_offset 6721672947, 0 ctrl msgs, uncompressed) [thrd:b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092]: b-2.pillxstack-dev.feos7n.c2.kafka.us-east-2.amazonaws.com:9092/2: Fetch 1/1/1 toppar(s)

manik3pg avatar Oct 14 '21 10:10 manik3pg

Hey,

Is there any update on this?

Is it in any way related to Rack awareness feature of kafka that we started using?

manik3pg avatar Oct 20 '21 12:10 manik3pg

fwiw we're getting the same issue but only when fetch-from-replicas is enabled on rdkafka 1.8.2

scanterog avatar Feb 15 '22 14:02 scanterog

I actually need the full debug log, please repro with Debug="all"

edenhill avatar Apr 08 '22 11:04 edenhill

I have a similar error with the following log (I removed the topic name/ip):

%7|1649898117.491|OFFSET|rdkafka#consumer-2| [thrd:main]: Topic <topic> [32]: timed offset query for END in state offset-query
%7|1649898117.491|OFFREQ|rdkafka#consumer-2| [thrd:main]: <ip>:9092/10009: Partition <topic> [32]: querying for logical offset END (opv 19) 
%7|1649898117.491|PARTSTATE|rdkafka#consumer-2| [thrd:main]: Partition <topic> [32] changed fetch state offset-query -> offset-wait
%7|1649898117.491|RECV|rdkafka#consumer-2| [thrd:<ip1>:9092/10009]: <ip1>:9092/10009: Received FetchResponse (v11, 89 bytes, CorrId 3686, rtt 500.49ms)
%7|1649898117.491|FETCH|rdkafka#consumer-2| [thrd:<ip1>:9092/10009]: <ip1>:9092/10009: Topic <topic> [40] MessageSet size 0, error "Success", MaxOffset 376, LSO 376, Ver 39/39
%7|1649898117.491|BROKER|rdkafka#consumer-2| [thrd:<ip1>:9092/10009]: Topic <topic> [32]: Reverting from preferred replica 10009 to leader 10005
%7|1649898117.491|TOPICUPD|rdkafka#consumer-2| [thrd:<ip1>:9092/10009]: Topic <topic> [32]: migrating from broker 10009 to 10005 (leader is 10005): reverting from preferred replica to leader
%7|1649898117.491|BRKDELGT|rdkafka#consumer-2| [thrd:<ip1>:9092/10009]: <topic> [32]: delegate to broker <ip2>:9092/10005 (rktp 0x7f5e30025d20, term 0, ref 5)
%7|1649898117.491|BRKDELGT|rdkafka#consumer-2| [thrd:<ip1>:9092/10009]: <topic> [32]: no longer delegated to broker <ip1>:9092/10009
%7|1649898117.491|BRKDELGT|rdkafka#consumer-2| [thrd:<ip1>:9092/10009]: <topic> [32]: delegating to broker <ip2>:9092/10005 for partition with 0 messages (0 bytes) queued
%7|1649898117.491|OFFSET|rdkafka#consumer-2| [thrd:<ip1>:9092/10009]: <topic> [32]: migrating to new broker: (re)starting offset query timer for offset END
*** rdkafka_partition.c:364:rd_kafka_toppar_set_fetch_state: assert: thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread) ***

I have the full log but it's 40+ MB and I'd have to find a way to anonymize it before sharing, so I thought I'd start with this. The change from offset-query -> offset-wait happened on the right thread (which was apparently the main thread) but the second time, it seemed to try the same operation from another thread. If you need more details, let me know.

acharles1533 avatar Apr 14 '22 18:04 acharles1533

@edenhill Is more information needed or is the above log sufficient to investigate what's going on? Thanks.

acharles1533 avatar Apr 19 '22 01:04 acharles1533

Thanks @acharles1533 , that bit of debug logs helped me identify the issue, just need to figure out how to fix it.

edenhill avatar Apr 19 '22 09:04 edenhill

Any progress on the fix or any ETA? Thanks again.

acharles1533 avatar Apr 26 '22 20:04 acharles1533

@edenhill any news on the fix?

alfa07 avatar May 04 '22 19:05 alfa07

What's actually causing this issue? We are seeing it in case there are any under-replicated partitions.

With disabled rack awareness, it also crashes but with different error:

Fatal (Local: Fatal error) error_code="reading_message" error_type="reader_failed" stage="receiving"
thread 'vector-worker' panicked at 'called `Option::unwrap()` on a `None` value', /cargo/registry/src/github.com-1ecc6299db9ec823/rdkafka-0.27.0/src/topic_partition_list.rs:201:43

ref: https://github.com/vectordotdev/vector/issues/13665

fpytloun avatar Jul 21 '22 17:07 fpytloun

I am facing the same error.

*** rdkafka_partition.c:352:rd_kafka_toppar_set_fetch_state: assert: thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread) ***

Kafka 3.0.0 librdkafka 1.9.1

saketmv avatar Oct 31 '22 11:10 saketmv

It'd be interesting to get a backtrace, any chance there's a corefile? If so, open it up with gdb and do bt full.

edenhill avatar Nov 01 '22 11:11 edenhill

Ah, I might have found what is causing this issue, at least for me.

I was doing a manual offset commit (on a separate goroutine), without setting enable.auto.commit to false. This probably leads to a race condition that causes this error on high-traffic topics. I turned off auto commits, and since then I have not observed this error.

saketmv avatar Nov 02 '22 10:11 saketmv