redpanda
redpanda copied to clipboard
Broker crashes when trying to consume topic through Pandaproxy
Version & Environment
Redpanda version: (use rpk version): v22.1.6 (docker image on K8s 1.20)
What went wrong?
I’ve been experimenting with Pandaproxy consumers. I’m following the blog post, but trying to read from my own topics. Consuming the topic keeps crashing my brokers:
curl -s \ <<<
"http://localhost:8082/consumers/my_group/instances/my_consumer/records?timeout=1000&max_bytes=100000"\
-H "Accept: application/vnd.kafka.json.v2+json" | jq .
# empty response....
Broker logs:
INFO 2022-08-04 02:24:08,817 [shard 1] kafka/client - consumer.cc:143 - Consumer: type=member, member_id={}, name={my_consumer}: initialize
INFO 2022-08-04 02:24:10,812 [shard 1] kafka/client - consumer.cc:238 - Consumer: type=leader, member_id={test_client-4710509a-5e53-4e47-b29b-472da1ebb01b}, name={my_consumer}: join: members: {{test_client-4710509a-5e53-4e47-b29b-472da1ebb01b}}, topics: {}
INFO 2022-08-04 02:24:10,812 [shard 1] kafka/client - consumer.cc:109 - Consumer: type=leader, member_id={test_client-4710509a-5e53-4e47-b29b-472da1ebb01b}, name={my_consumer}: start
INFO 2022-08-04 02:24:28,772 [shard 1] kafka/client - consumer.cc:238 - Consumer: type=leader, member_id={test_client-4710509a-5e53-4e47-b29b-472da1ebb01b}, name={my_consumer}: join: members: {{test_client-4710509a-5e53-4e47-b29b-472da1ebb01b}}, topics: {{test}}
INFO 2022-08-04 02:24:28,772 [shard 1] kafka/client - consumer.cc:109 - Consumer: type=leader, member_id={test_client-4710509a-5e53-4e47-b29b-472da1ebb01b}, name={my_consumer}: start
ERROR 2022-08-04 02:24:39,531 [shard 1] assert - Assert failure: (/vectorized/include/rapidjson/writer.h:484) 'type == kStringType' Rapidjson
ERROR 2022-08-04 02:24:39,531 [shard 1] assert - Backtrace below:
0x3ff5424 0x1406ad7 0x21dca08 0x21daa4d 0x21d956e 0x21d7a23 0x21d6b8b 0x21a7148 0x21ba8da 0x3d66b2f 0x3d69f87 0x3dba5e8 0x3d1651f /opt/redpanda/lib/libpthread.so.0+0x9298 /opt/redpanda/lib/libc.so.6+0x1006a2
--------
seastar::internal::coroutine_traits_base<pandaproxy::server::reply_t>::promise_type
--------
seastar::continuation<seastar::internal::promise_base_with_type<void>, seastar::smp_message_queue::async_work_item<seastar::future<pandaproxy::server::reply_t> seastar::sharded<kafka::client::client>::invoke_on<pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4, seastar::future<pandaproxy::server::reply_t> >(unsigned int, seastar::smp_submit_to_options, pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4&&)::'lambda'()>::run_and_dispose()::'lambda'(pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4), seastar::futurize<pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4>::type seastar::future<pandaproxy::server::reply_t>::then_wrapped_nrvo<void, seastar::smp_message_queue::async_work_item<seastar::future<pandaproxy::server::reply_t> seastar::sharded<kafka::client::client>::invoke_on<pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4, seastar::future<pandaproxy::server::reply_t> >(unsigned int, seastar::smp_submit_to_options, pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4&&)::'lambda'()>::run_and_dispose()::'lambda'(pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4)>(seastar::smp_message_queue::async_work_item<seastar::future<pandaproxy::server::reply_t> seastar::sharded<kafka::client::client>::invoke_on<pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4, seastar::future<pandaproxy::server::reply_t> >(unsigned int, seastar::smp_submit_to_options, pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4&&)::'lambda'()>::run_and_dispose()::'lambda'(pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4)&&)::'lambda'(seastar::internal::promise_base_with_type<void>&&, seastar::smp_message_queue::async_work_item<seastar::future<pandaproxy::server::reply_t> seastar::sharded<kafka::client::client>::invoke_on<pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4, seastar::future<pandaproxy::server::reply_t> >(unsigned int, seastar::smp_submit_to_options, pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4&&)::'lambda'()>::run_and_dispose()::'lambda'(pandaproxy::rest::consumer_fetch(pandaproxy::ctx_server<pandaproxy::rest::proxy>::request_t, pandaproxy::server::reply_t)::$_4)&, seastar::future_state<pandaproxy::server::reply_t>&&), pandaproxy::server::reply_t>
Original discussion: https://redpandacommunity.slack.com/archives/C01AJDUT88N/p1659580193496039
cc @mattschumpert @BenPope
I suspect that your topic contains records that are not valid json. Can you share an example of a record on the topic?
Having said that, it would be desirable if we didn't crash.