confluent-kafka-python
confluent-kafka-python copied to clipboard
Assertion failed: rd_slice_abs_offset
Description
While reading data from a consumer and writing to a sink I get the following error:
Assertion failed: rd_slice_abs_offset(new_slice) <= new_slice->end (rdbuf.c: rd_slice_narrow_copy: 1027)
No additional information is provided. One thing I've noticed is that the message is not always the last one. I've upgraded the client and librdkafka to 1.2.0 and this is still occurring.
Thank you
Checklist
Please provide the following information:
- [ ] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): - [ ] Apache Kafka broker version:
- [ ] Client configuration:
{...} - [ ] Operating system:
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
Can you reproduce this with 'debug': protocol,msg,fetch,broker`' and provide the logs?
Please also fill out the issue checklist (consumer config, broker version)
Thank you for the quick response! Below are the requested logs, the brokers (3) are all running Kafka 2.3.0.
%7|1570700466.709|PROTOUFLOW|rdkafka#consumer-1| [thrd:10.133.33.153:9092/bootstrap]: 10.133.33.153:9092/1: Protocol read buffer underflow at 16777700/16777736 (rd_kafka_msgset_reader_v2:1028): expected 225 bytes > 36 remaining bytes (truncated response from broker (ok)): my_topic [45] MessageSet at offset 1400062 payload size 225
%7|1570700466.798|CONSUME|rdkafka#consumer-1| [thrd:10.133.33.153:9092/bootstrap]: 10.133.33.153:9092/1: Enqueue 8323 message(s) (622215 bytes, 8323 ops) on my_topic [45] fetch queue (qlen 8323, v6, last_offset 14000 61, 0 ctrl msgs) │
%7|1570700466.798|FETCH|rdkafka#consumer-1| [thrd:10.133.33.153:9092/bootstrap]: 10.133.33.153:9092/1: Topic my_topic [48] MessageSet size 1048576, error "Success", MaxOffset 948711, LSO 948711, Ver 6/6
Assertion failed: rd_slice_abs_offset(new_slice) <= new_slice->end (rdbuf.c: rd_slice_narrow_copy: 1027)
Thank you.
It would be very useful to know where exactly this is happening in the code, could you try the following?
$ ulimit -c unlimited
$ <reproduce the issue>
$ gdb $(/usr/bin/env python) core # on osx the core file will be in /cores/core.<pid>
gdb) bt
gdb) exit
and provide us with the output?
This originally was running in a Docker container in our K8s cluster. I created a VM and ran it in there with the same parameters. The output for gdb is:
(gdb) bt
#0 0x00007f79af2227a1 in abort () from /lib/ld-musl-x86_64.so.1
#1 0x0000558b0b03f7e0 in ?? ()
#2 0x0000000000000020 in ?? ()
#3 0x0000000000000000 in ?? ()
Okay, that backtrace isn't very helpful unfortunately :(
Another way that might work is this:
$ gdb $(/usr/bin/env python)
gdb) run /path/to/your/script.py and -its arguments
# wait for crash
gdb) bt full
And if this method works please stay in gdb for me to instruct on more commands.
Unfortunately, it looks pretty much the same:
(gdb) bt full
#0 0x00007f5fa9f217a1 in abort () from /lib/ld-musl-x86_64.so.1
No symbol table info available.
#1 0x000056447b8607e0 in ?? ()
No symbol table info available.
#2 0x0000000000000020 in ?? ()
No symbol table info available.
#3 0x0000000000000000 in ?? ()
No symbol table info available.
Ouch, okay, too bad :(
Can you provide your full consumer configuration?
Do you know if the topic transactional messages on it? (from e.g., kafka streams, ksql)
I think I got it fixed by adding the api.version.request and broker.version.fallback to the Consumer arguments.
args = {'bootstrap.servers': self.brokers,
'group.id': self.group_id,
'enable.auto.commit': False,
'debug': 'protocol,msg,fetch,broker',
'api.version.request': False,
'broker.version.fallback': '0.9.0.0'}
There's nobody else doing transactional messages on the topic. What's worth mentioning is that we are actually consuming data from two topics.
Well it actually stopped crashing but there's a more visible error now:
KafkaError{code=_BAD_MSG,val=-199,str="Local: Bad message format"}
What versions of librdkafka have you tried?
So far only these two: 1.1.0 and 1.2.0 We are reading messages in batch, 1000 at the time. Could this be related to this? Maybe the buffer is not large enough?
This is in the FetchResponse parsing code, it is executed prior to the application asking for the messages, so not likely to be related.
Are you building librdkafka separately or using the binary wheels of the confluent python client?
Do you know if these messages are compressed in the topic?
We're using the librdkafka package from Alpine and the messages are not compressed.
Would you be ok with building your own version of librdkafka, to get some debug symbols?
$ apk del librdkafka librdkafka-dev
$ apk add gcc make openssl-dev zlib-dev git # and possibly others
$
$ git clone https://github.com/edenhill/librdkafka
$ cd librdkafka
$ ./configure --disable-optimization --enable-devel --prefix=/usr
$ make
$ sudo make install
$
$ gdb $(/usr/bin/env python)
gdb) run /your/py/script.py
# wait for crash
gdb) bt full
at this point keep the terminal open as I might want to extract some object fields.
Has the root cause of this issue identified?
I am getting the same error in librdkafka 1.2.2-r0 from alpine 3.11
Assertion failed: rd_slice_abs_offset(new_slice) <= new_slice->end (rdbuf.c: rd_slice_narrow_copy: 980)
confluent-kafka-go version 100
Hello! Same question about version 1.3.0. There is backtrace for failed thread:
#0 0x00007f922a510337 in raise () from /lib64/libc.so.6
#1 0x00007f922a511a28 in abort () from /lib64/libc.so.6
#2 0x00007f922a509156 in __assert_fail_base () from /lib64/libc.so.6
#3 0x00007f922a509202 in __assert_fail () from /lib64/libc.so.6
#4 0x00007f9209c2af8a in rd_slice_narrow_copy (orig=<optimized out>, new_slice=<optimized out>, size=<optimized out>) at rdbuf.c:1027
#5 0x00007f9209c2afad in rd_slice_narrow_copy_relative (orig=orig@entry=0x7f91ec0029d0, new_slice=new_slice@entry=0x7f9201ff92f0, relsize=<optimized out>) at rdbuf.c:1038
#6 0x00007f9209c10f05 in rd_kafka_msgset_reader_msg_v0_1 (msetr=0x7f9201ff9890) at rdkafka_msgset_reader.c:571
#7 0x00007f9209c0ce02 in rd_kafka_msgset_reader (msetr=<optimized out>) at rdkafka_msgset_reader.c:1256
#8 rd_kafka_msgset_reader_run (msetr=msetr@entry=0x7f9201ff9890) at rdkafka_msgset_reader.c:1316
#9 0x00007f9209c137ff in rd_kafka_msgset_parse (rkbuf=<optimized out>, request=<optimized out>, rktp=0x7f91f00060b0, aborted_txns=<optimized out>, tver=<optimized out>) at rdkafka_msgset_reader.c:1424
#10 0x00007f9209ba4b49 in rd_kafka_fetch_reply_handle (rkb=rkb@entry=0x7f91f00030d0, rkbuf=0x7f91ec002960, request=0x7f91e54ea4e0) at rdkafka_broker.c:4166
#11 0x00007f9209ba76c3 in rd_kafka_broker_fetch_reply (rk=<optimized out>, rkb=0x7f91f00030d0, err=RD_KAFKA_RESP_ERR_NO_ERROR, reply=<optimized out>, request=<optimized out>, opaque=<optimized out>)
at rdkafka_broker.c:4226
#12 0x00007f9209bbcf9b in rd_kafka_buf_callback (rk=<optimized out>, rkb=<optimized out>, err=<optimized out>, response=0x7f91ec002960, request=0x7f91e54ea4e0) at rdkafka_buf.c:456
#13 0x00007f9209b9946c in rd_kafka_recv (rkb=rkb@entry=0x7f91f00030d0) at rdkafka_broker.c:1525
#14 0x00007f9209bbaa60 in rd_kafka_transport_io_event (rktrans=rktrans@entry=0x7f91ec002890, events=events@entry=1) at rdkafka_transport.c:759
#15 0x00007f9209bbb4ab in rd_kafka_transport_io_serve (rktrans=0x7f91ec002890, timeout_ms=235) at rdkafka_transport.c:818
#16 0x00007f9209ba042c in rd_kafka_broker_ops_io_serve (rkb=0x7f91f00030d0, abs_timeout=15121557049788) at rdkafka_broker.c:3029
#17 0x00007f9209ba0a8f in rd_kafka_broker_serve (rkb=0x7f91f00030d0, timeout_ms=<optimized out>) at rdkafka_broker.c:4544
#18 0x00007f9209ba1fc5 in rd_kafka_broker_thread_main (arg=arg@entry=0x7f91f00030d0) at rdkafka_broker.c:4798
#19 0x00007f9209bfae17 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#20 0x00007f922b0cee65 in start_thread () from /lib64/libpthread.so.0
#21 0x00007f922a5d888d in clone () from /lib64/libc.so.6
Please upgrade to v1.7.0 and try to reproduce.
@edenhill, thank you, I'll try to test our application with 1.7.0.
I've analyzed coredump from 1.3.0 and found out that the thread was reading a slice with length seg_size = 1048662 at position rof = 542333:
(gdb) p rkbuf->rkbuf_reader
$141 = {buf = 0x7f91ec002988, seg = 0x7f91ecebef88, rof = 542333, start = 8, end = 1048670}
(gdb) p *rkbuf->rkbuf_reader.seg // 0x7f91ecebef88
$145 = {seg_link = {tqe_next = 0x0, tqe_prev = 0x7f91ecebef40}, seg_p = 0x7f91ec72dae0 "", seg_of = 1048662, seg_size = 1048662, seg_absof = 8, seg_free = 0x7f9209c27240 <rd_free>, seg_flags = 0}
But there is zero-filled memory at this offset after an absolutely valid previous message:
(gdb) x/32bx rkbuf->rkbuf_reader->seg->seg_p + 542333 - 16
0x7f91ec7b214d: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00
0x7f91ec7b2155: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00
0x7f91ec7b215d: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00
0x7f91ec7b2165: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00
As result we fell back to MagicByte=0 handler and tried to copy zero-length slice.
I've clarified the size of zero-filled memory. It's about 400 bytes of zeroes, then a partial record with a corrupted start, then valid records up to the end of the slice. The last valid record before zeroes has offset 12183895 and empty offset delta, the first valid record after the zeroes has offset 12183899.
FWIW, we're seeing this same assertion failure in both librdkafka 1.8.2 and librdkafka 2.3.0 (in C++, not in Python).
rdbuf.c:1099: rd_slice_narrow_copy: Assertion `rd_slice_abs_offset(new_slice) <= new_slice->end' failed.
Either as a cause of the problem, or as an effect of it — we're not sure yet — at the time of the crash there were like 2500 threads running, all starving each other out. This caused us to see this message in the logs, prior to the crash:
ApiVersionRequest failed: Local: Timed out: probably due to broker version < 0.10 (see api.version.request configuration)
But we weren't actually running a super old broker version; in fact our two brokers were running Apache Kafka 2.6.1 and 2.6.3 respectively. So my working hypothesis is that "Timed out" here had nothing to do with broker version; it had to do with the thread getting starved out. We've seen librdkafka suffer from "misbehavior on unexpected CPU starvation" before — see https://github.com/confluentinc/librdkafka/issues/4100 . Is it possible that, under unexpected CPU starvation, we reach a "This should never happen" codepath which then goes on to read uninitialized memory or otherwise act on garbage data, and that's how we end up in rd_slice_narrow_copy with bad data?
Here's our 1.8.2 backtrace (if I've picked the right thread), up to where it starts showing as ?? (presumably due to mismatched debug symbols). Compare to @Stolb27's backtrace above.
#32 0x00000000007e455b in rd_slice_read (slice=0x36a0c6a01dd42989, slice@entry=0x7f6319aacd30, dst=0xee799387bc7faa9f, dst@entry=0x7f6319aacdc7, size=7216559512305723901, size@e>
#33 0x00000000007e5117 in rd_slice_peek (slice=<optimized out>, offset=<optimized out>, dst=0x7f6319aacdc7, size=1) at rdbuf.c:940
#34 0x00000000007b1fd4 in rd_kafka_msgset_reader (msetr=0x674bf) at rdkafka_msgset_reader.c:1279
#35 rd_kafka_msgset_reader_run (msetr=0x674bf, msetr@entry=0x7f6319aad030) at rdkafka_msgset_reader.c:1339
#36 0x00000000007bcde9 in rd_kafka_msgset_parse (rkbuf=rkbuf@entry=0x7f60f831a860, request=request@entry=0x7f60f85e1cb0, rktp=rktp@entry=0x22df080, aborted_txns=aborted_txns@ent>
#37 0x000000000072df4a in rd_kafka_fetch_reply_handle (rkb=rkb@entry=0x7f65c81e6a90, rkbuf=0x7f60f831a860, request=<optimized out>) at rdkafka_broker.c:4664
#38 0x00000000007322a1 in rd_kafka_broker_fetch_reply (rk=<optimized out>, rkb=0x7f65c81e6a90, err=RD_KAFKA_RESP_ERR_NO_ERROR, reply=<optimized out>, request=<optimized out>, op>
#39 0x000000000074d604 in rd_kafka_buf_callback (rk=0x7f65c804be20, rkb=0x7f65c81e6a90, err=RD_KAFKA_RESP_ERR_NO_ERROR, response=0x7f60f831a860, request=0x7f60f85e1cb0) at rdkaf>
#40 0x000000000072a6c3 in rd_kafka_req_response (rkbuf=0x7f60f831a860, rkb=0x7f60f831a860) at rdkafka_broker.c:1822
#41 rd_kafka_recv (rkb=rkb@entry=0x7f65c81e6a90) at rdkafka_broker.c:1942
#42 0x000000000074b208 in rd_kafka_transport_io_event (rktrans=rktrans@entry=0x7f65c42ef570, events=events@entry=1) at rdkafka_transport.c:759
#43 0x000000000074c1df in rd_kafka_transport_io_serve (rktrans=0x7f65c42ef570, timeout_ms=<optimized out>) at rdkafka_transport.c:818
#44 0x0000000000735d76 in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0x7f65c81e6a90, abs_timeout=1005503178131) at rdkafka_broker.c:3444
#45 0x00000000007362c9 in rd_kafka_broker_consumer_serve (rkb=rkb@entry=0x7f65c81e6a90, abs_timeout=abs_timeout@entry=1005503178131) at rdkafka_broker.c:5042
#46 0x0000000000737927 in rd_kafka_broker_serve (rkb=rkb@entry=0x7f65c81e6a90, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:5185
#47 0x0000000000737fcd in rd_kafka_broker_thread_main (arg=arg@entry=0x7f65c81e6a90) at rdkafka_broker.c:5358
#48 0x00000000007a03b7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#49 0x00007f6608f9aea5 in start_thread () from /lib64/libpthread.so.0