broadway_kafka
broadway_kafka copied to clipboard
No rejoin after "payload connection down :shutdown, :tcp_closed}"
Hello! We have some issue with brod and broadway_kafka in kubernetes. When the service starts all consumers join to group and works fine. But after some time we got
2022-07-24 16:05:04.207 pid=<0.7117.24> [info] Group member (admin,coor=#PID<0.7117.24>,cb=#PID<0.20534.22>,generation=172):
re-joining group, reason:{:connection_down, {:shutdown, :tcp_closed}}
2022-07-24 16:05:04.207 pid=<0.7119.24> [info] client Client: payload connection down kafka-host:9092
reason:{:shutdown, :tcp_closed}
and after this brod doesn't reconnect and kafka-consumer-groups.sh says that consumer group has no active members.
We have been using raw brod quite a long time and usually he did reconnect after network issues.
What could be wrong? Maybe broadway_kafka doesn't rejoin after brod client down?
Hi @anoskov! I recommend reaching out to ElixirForum for general help/questions, as it is more likely someone with a similar issue as yours can see your report.
Thank you for answer. I asked a question there.
I noticed that disconnect occurs 10 minutes after connection. This value is simillar to connections.max.idle.ms
and looks like kafka disonnects idle connections. I know that some kafka clients supports reconnect on idle
feature. Does the brod/brodway_kafka support it?
UPD:
brod_client process is alive and BroadwayKafka.BrodClient.connected?/1
says that it is connected and :brod.fetch/4
also returns messages. But in state we see dead_since
iex(admin@)32> :sys.get_state(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_0.Client))
{:state, Admin.Kafka.Consumer.Broadway.Producer_0.Client, [{"kafka", 9092}],
#PID<0.31736.27>,
[
{:conn, {"kafka-0", 9092},
{:dead_since, {1658, 257147, 463027}, {:shutdown, :tcp_closed}}}
], #PID<0.5428.4>, #PID<0.5430.4>, [connect_timeout: 10000],
Admin.Kafka.Consumer.Broadway.Producer_0.Client}
iex(admin@7)33> BroadwayKafka.BrodClient.connected?(Admin.Kafka.Consumer.Broadway.Producer_0.Client)
true
looks like brod client reconnects but consumer doesn't rejoin
UPD2:
I checked Producer state and got timeout on :sys.get_state
because it stuck in handle_info callback on BrodClient.stop_group_coordinator -> :brod_group_coordinator.stop
iex(admin@)13> :erlang.process_info(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_1))
[
registered_name: Admin.Kafka.Consumer.Broadway.Producer_1,
current_function: {:brod_group_coordinator, :stop, 1},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 4,
links: [#PID<0.3880.0>],
dictionary: [
{63, []},
{62, []},
{61, []},
{60, []},
{59, []},
{58, []},
{57, []},
{56, []},
{55, []},
{54, []},
{53, []},
{52, []},
{51, []},
{50, []},
{49, []},
{:"$initial_call", {GenStage, :init, 1}},
{48, []},
{:"$ancestors",
[Admin.Kafka.Consumer.Broadway.ProducerSupervisor,
Admin.Kafka.Consumer.Broadway.Supervisor, Admin.Kafka.Consumer,
Admin.Supervisor, #PID<0.3569.0>]},
{47, []},
{46, []},
{45, []},
{44, []},
{43, []},
{42, []},
{41, []},
{40, []},
{39, []},
{38, []},
{37, []},
{36, []},
{35, []},
{34, []},
{33, []},
{32, []},
{31, []},
{30, []},
{29, []},
{28, []},
{27, []},
{26, []},
{25, []},
{24, ...},
{...},
...
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.3568.0>,
total_heap_size: 20338,
heap_size: 2586,
stack_size: 29,
reductions: 20656517,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 65535,
minor_gcs: 2858
],
suspending: []
]
@josevalim @slashmili Hello! It still occurs on 'main' branch. If for some time consumer doesn't receive message kafka disconnects idle connections. After this broadway producer and brod coordinator deadlock each other
consumer group info
I have no name!@kafka-0:/opt/bitnami/kafka$ ./bin/kafka-consumer-groups.sh --describe --group messaging --bootstrap-server localhost:9092
Consumer group 'messaging' has no active members.
brod client state
{:state, Messaging.Events.Kafka.Consumer.Broadway.Producer_0.Client,
[{"kafka", 9092}], :undefined,
[
{:conn, {"kafka-0.cluster.local", 9092},
{:dead_since, {1675, 174014, 110349}, {:shutdown, :tcp_closed}}}
], #PID<0.12743.1>, #PID<0.12744.1>,
[connect_timeout: 10000, request_timeout: 240000],
Messaging.Events.Kafka.Consumer.Broadway.Producer_0.Client}
trying get producer state
** (exit) exited in: :sys.get_state(#PID<0.12240.1>)
** (EXIT) time out
but it stuck on :brod_group_coordinator.stop
[
registered_name: Messaging.Events.Kafka.Consumer.Broadway.Producer_0,
current_function: {:brod_group_coordinator, :stop, 1},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 3,
links: [#PID<0.5133.0>],
dictionary: [
{63, []},
{62, []},
{61, []},
{60, []},
{59, []},
{58, []},
{57, []},
{56, []},
{55, []},
{54, []},
{53, []},
{52, []},
{51, []},
{50, []},
{49, []},
{48, []},
{47, []},
{46, []},
{45, []},
{44, []},
{43, []},
{42, []},
{41, []},
{40, []},
{:"$initial_call", {GenStage, :init, 1}},
{39, []},
{38, []},
{37, []},
{:"$ancestors",
[Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4877.0>]},
{36, []},
{35, []},
{34, []},
{33, []},
{32, []},
{31, []},
{30, []},
{29, []},
{28, []},
{27, []},
{26, []},
{25, []},
{24, ...},
{...},
...
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4876.0>,
total_heap_size: 32885,
heap_size: 4185,
stack_size: 29,
reductions: 3299758,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 20,
minor_gcs: 7
],
suspending: []
]
coordinator stuck on assigments_revoked call
[
current_function: {:gen, :do_call, 4},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 3,
links: [],
dictionary: [
"$initial_call": {:brod_group_coordinator, :init, 1},
"$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_1,
Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4877.0>]
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4876.0>,
total_heap_size: 17734,
heap_size: 6772,
stack_size: 48,
reductions: 19501,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 20,
minor_gcs: 3
],
suspending: []
]
coordinator stacktrace
{:current_stacktrace,
[
{:gen, :do_call, 4, [file: 'gen.erl', line: 237]},
{GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1035]},
{BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
[file: 'lib/broadway_kafka/producer.ex', line: 532]},
{:telemetry, :span, 3,
[file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 320]},
{:brod_group_coordinator, :stabilize, 3,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 502
]},
{:brod_group_coordinator, :handle_info, 2,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 372
]},
{:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
{:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]}
]}
producer stacktrace
{:current_stacktrace,
[
{:brod_group_coordinator, :stop, 1,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 311
]},
{BroadwayKafka.Producer, :terminate, 2,
[file: 'lib/broadway_kafka/producer.ex', line: 540]},
{:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 1158]},
{:gen_server, :terminate, 10, [file: 'gen_server.erl', line: 1348]},
{:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
]}
@anoskov can you please try the main branch again?
@anoskov can you please try the main branch again?
Thank you! I'll check
@josevalim Hello! We tried fix for a week and unfortunately it didn't help. Looks like its stuck on Process.exit
https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L430
** (exit) exited in: :sys.get_state(Messaging.Events.Kafka.Consumer.Broadway.Producer_1)
** (EXIT) time out
(stdlib 4.0.1) sys.erl:338: :sys.send_system_msg/2
(stdlib 4.0.1) sys.erl:139: :sys.get_state/1
iex:4: (file)
producer stacktrace
{:current_stacktrace,
[
{BroadwayKafka.Producer, :handle_info, 2,
[file: 'lib/broadway_kafka/producer.ex', line: 430]},
{Broadway.Topology.ProducerStage, :handle_info, 2,
[file: 'lib/broadway/topology/producer_stage.ex', line: 229]},
{GenStage, :noreply_callback, 3, [file: 'lib/gen_stage.ex', line: 2117]},
{:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
{:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]},
{:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
]}
coordinator stacktrace
{:current_stacktrace,
[
{:gen, :do_call, 4, [file: 'gen.erl', line: 237]},
{GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1035]},
{BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
[file: 'lib/broadway_kafka/producer.ex', line: 539]},
{:telemetry, :span, 3,
[file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 321]},
{:brod_group_coordinator, :stabilize, 3,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 502
]},
{:brod_group_coordinator, :handle_info, 2,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 372
]},
{:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
{:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]}
]}
additional info
[
current_function: {:gen, :do_call, 4},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 2,
links: [],
dictionary: [
"$initial_call": {:brod_group_coordinator, :init, 1},
"$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_8,
Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4769.0>]
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4768.0>,
total_heap_size: 10961,
heap_size: 4185,
stack_size: 48,
reductions: 19899,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 20,
minor_gcs: 7
],
suspending: []
]
[
registered_name: Messaging.Events.Kafka.Consumer.Broadway.Producer_1,
current_function: {BroadwayKafka.Producer, :handle_info, 2},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 7,
links: [#PID<0.5024.0>],
dictionary: [
{:"$initial_call", {GenStage, :init, 1}},
{:"$ancestors",
[Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4769.0>]},
{63, []},
{62, []},
{61, []},
{60, []},
{59, []},
{58, []},
{57, []},
{56, []},
{55, []},
{54, []},
{53, []},
{52, []},
{51, []},
{50, []},
{49, []},
{48, []},
{47, []},
{46, []},
{45, []},
{44, []},
{43, []},
{42, []},
{41, []},
{40, []},
{39, []},
{38, []},
{37, []},
{36, []},
{35, []},
{34, []},
{33, []},
{32, []},
{31, []},
{30, []},
{29, []},
{28, []},
{27, []},
{26, []},
{25, []},
{24, ...},
{...},
...
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4768.0>,
total_heap_size: 35914,
heap_size: 6772,
stack_size: 28,
reductions: 6413879,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 20,
minor_gcs: 8
],
suspending: []
]
This is very unexpected because Process.exit is, afaik, asynchronous.
@josevalim maybe stacktrace not accurate and it stucks on https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L432 ? Although I don't understand how that could be.
Can you consistently reproduce it now or it only happens from time to time?
I can't reproduce this manual but it happens few times a week when some of our servers is idle and kafka closes connect.
I tried to emulate two gen servers, where the second calls the first while he process msg in handle_info and do Process.exit for second. And it works good, first server got msg in receive block after Process.exit.
If you need any additional information I can collect it next time
@josevalim seems to have found the problem. brod_group_coordinator sets trap_exit: true
on start https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L320
So
Process.exit(pid, reason) If pid is trapping exits, the exit signal is transformed into a message {:EXIT, from, reason} and delivered to the message queue of pid
coordinator should handle it here
https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L374-L386
but stucks on call to Producer which waiting for {:DOWN, _, _, ^coord, _}
from coordinator
@josevalim Hello. Can we just remove receive block in producer's handle_info or he should wait end of exit?
@anoskov I think we can remove it. I have just pushed a commit that does so, please give it a try.
@josevalim hm. I still see it in main branch https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L428 commit doesn't include this https://github.com/dashbitco/broadway_kafka/commit/6ef6f41fab0fa5bcf8b322ac9129042c8ce45ceb
Apologies, I clearly pushed the wrong commit. It is there now.
Looks like fix helped. Thank you