broadway_kafka icon indicating copy to clipboard operation
broadway_kafka copied to clipboard

No rejoin after "payload connection down :shutdown, :tcp_closed}"

Open anoskov opened this issue 2 years ago • 2 comments

Hello! We have some issue with brod and broadway_kafka in kubernetes. When the service starts all consumers join to group and works fine. But after some time we got

2022-07-24 16:05:04.207 pid=<0.7117.24> [info] Group member (admin,coor=#PID<0.7117.24>,cb=#PID<0.20534.22>,generation=172):
re-joining group, reason:{:connection_down, {:shutdown, :tcp_closed}}
2022-07-24 16:05:04.207 pid=<0.7119.24> [info] client Client: payload connection down kafka-host:9092
reason:{:shutdown, :tcp_closed}

and after this brod doesn't reconnect and kafka-consumer-groups.sh says that consumer group has no active members.

We have been using raw brod quite a long time and usually he did reconnect after network issues.

What could be wrong? Maybe broadway_kafka doesn't rejoin after brod client down?

anoskov avatar Jul 25 '22 16:07 anoskov

Hi @anoskov! I recommend reaching out to ElixirForum for general help/questions, as it is more likely someone with a similar issue as yours can see your report.

josevalim avatar Jul 25 '22 16:07 josevalim

Thank you for answer. I asked a question there.

I noticed that disconnect occurs 10 minutes after connection. This value is simillar to connections.max.idle.ms and looks like kafka disonnects idle connections. I know that some kafka clients supports reconnect on idle feature. Does the brod/brodway_kafka support it?

UPD: brod_client process is alive and BroadwayKafka.BrodClient.connected?/1 says that it is connected and :brod.fetch/4 also returns messages. But in state we see dead_since

iex(admin@)32> :sys.get_state(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_0.Client))
{:state, Admin.Kafka.Consumer.Broadway.Producer_0.Client, [{"kafka", 9092}],
 #PID<0.31736.27>,
 [
   {:conn, {"kafka-0", 9092},
    {:dead_since, {1658, 257147, 463027}, {:shutdown, :tcp_closed}}}
 ], #PID<0.5428.4>, #PID<0.5430.4>, [connect_timeout: 10000],
 Admin.Kafka.Consumer.Broadway.Producer_0.Client}
iex(admin@7)33> BroadwayKafka.BrodClient.connected?(Admin.Kafka.Consumer.Broadway.Producer_0.Client)
true

looks like brod client reconnects but consumer doesn't rejoin

UPD2:

I checked Producer state and got timeout on :sys.get_state because it stuck in handle_info callback on BrodClient.stop_group_coordinator -> :brod_group_coordinator.stop

iex(admin@)13> :erlang.process_info(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_1))
[
  registered_name: Admin.Kafka.Consumer.Broadway.Producer_1,
  current_function: {:brod_group_coordinator, :stop, 1},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 4,
  links: [#PID<0.3880.0>],
  dictionary: [
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {:"$initial_call", {GenStage, :init, 1}},
    {48, []},
    {:"$ancestors",
     [Admin.Kafka.Consumer.Broadway.ProducerSupervisor,
      Admin.Kafka.Consumer.Broadway.Supervisor, Admin.Kafka.Consumer,
      Admin.Supervisor, #PID<0.3569.0>]},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {39, []},
    {38, []},
    {37, []},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.3568.0>,
  total_heap_size: 20338,
  heap_size: 2586,
  stack_size: 29,
  reductions: 20656517,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 65535,
    minor_gcs: 2858
  ],
  suspending: []
]

anoskov avatar Jul 25 '22 17:07 anoskov

@josevalim @slashmili Hello! It still occurs on 'main' branch. If for some time consumer doesn't receive message kafka disconnects idle connections. After this broadway producer and brod coordinator deadlock each other

consumer group info

I have no name!@kafka-0:/opt/bitnami/kafka$ ./bin/kafka-consumer-groups.sh --describe --group messaging --bootstrap-server localhost:9092
Consumer group 'messaging' has no active members.

brod client state

{:state, Messaging.Events.Kafka.Consumer.Broadway.Producer_0.Client,
 [{"kafka", 9092}], :undefined,
 [
   {:conn, {"kafka-0.cluster.local", 9092},
    {:dead_since, {1675, 174014, 110349}, {:shutdown, :tcp_closed}}}
 ], #PID<0.12743.1>, #PID<0.12744.1>,
 [connect_timeout: 10000, request_timeout: 240000],
 Messaging.Events.Kafka.Consumer.Broadway.Producer_0.Client}

trying get producer state

** (exit) exited in: :sys.get_state(#PID<0.12240.1>)
    ** (EXIT) time out

but it stuck on :brod_group_coordinator.stop

[
  registered_name: Messaging.Events.Kafka.Consumer.Broadway.Producer_0,
  current_function: {:brod_group_coordinator, :stop, 1},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 3,
  links: [#PID<0.5133.0>],
  dictionary: [
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {48, []},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {:"$initial_call", {GenStage, :init, 1}},
    {39, []},
    {38, []},
    {37, []},
    {:"$ancestors",
     [Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
      Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
      Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4877.0>]},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4876.0>,
  total_heap_size: 32885,
  heap_size: 4185,
  stack_size: 29,
  reductions: 3299758,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 7
  ],
  suspending: []
]

coordinator stuck on assigments_revoked call

[
  current_function: {:gen, :do_call, 4},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 3,
  links: [],
  dictionary: [
    "$initial_call": {:brod_group_coordinator, :init, 1},
    "$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_1,
     Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
     Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
     Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4877.0>]
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4876.0>,
  total_heap_size: 17734,
  heap_size: 6772,
  stack_size: 48,
  reductions: 19501,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 3
  ],
  suspending: []
]

coordinator stacktrace

{:current_stacktrace,
 [
   {:gen, :do_call, 4, [file: 'gen.erl', line: 237]},
   {GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1035]},
   {BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 532]},
   {:telemetry, :span, 3,
    [file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 320]},
   {:brod_group_coordinator, :stabilize, 3,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 502
    ]},
   {:brod_group_coordinator, :handle_info, 2,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 372
    ]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]}
 ]}

producer stacktrace

{:current_stacktrace,
 [
   {:brod_group_coordinator, :stop, 1,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 311
    ]},
   {BroadwayKafka.Producer, :terminate, 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 540]},
   {:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 1158]},
   {:gen_server, :terminate, 10, [file: 'gen_server.erl', line: 1348]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
 ]}

anoskov avatar Feb 03 '23 21:02 anoskov

@anoskov can you please try the main branch again?

josevalim avatar Feb 04 '23 09:02 josevalim

@anoskov can you please try the main branch again?

Thank you! I'll check

anoskov avatar Feb 04 '23 12:02 anoskov

@josevalim Hello! We tried fix for a week and unfortunately it didn't help. Looks like its stuck on Process.exit https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L430

** (exit) exited in: :sys.get_state(Messaging.Events.Kafka.Consumer.Broadway.Producer_1)
    ** (EXIT) time out
    (stdlib 4.0.1) sys.erl:338: :sys.send_system_msg/2
    (stdlib 4.0.1) sys.erl:139: :sys.get_state/1
    iex:4: (file)

producer stacktrace

{:current_stacktrace,
 [
   {BroadwayKafka.Producer, :handle_info, 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 430]},
   {Broadway.Topology.ProducerStage, :handle_info, 2,
    [file: 'lib/broadway/topology/producer_stage.ex', line: 229]},
   {GenStage, :noreply_callback, 3, [file: 'lib/gen_stage.ex', line: 2117]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
 ]}

coordinator stacktrace

{:current_stacktrace,
 [
   {:gen, :do_call, 4, [file: 'gen.erl', line: 237]},
   {GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1035]},
   {BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 539]},
   {:telemetry, :span, 3,
    [file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 321]},
   {:brod_group_coordinator, :stabilize, 3,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 502
    ]},
   {:brod_group_coordinator, :handle_info, 2,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 372
    ]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]}
 ]}

additional info

[
  current_function: {:gen, :do_call, 4},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 2,
  links: [],
  dictionary: [
    "$initial_call": {:brod_group_coordinator, :init, 1},
    "$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_8,
     Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
     Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
     Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4769.0>]
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4768.0>,
  total_heap_size: 10961,
  heap_size: 4185,
  stack_size: 48,
  reductions: 19899,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 7
  ],
  suspending: []
]
[
  registered_name: Messaging.Events.Kafka.Consumer.Broadway.Producer_1,
  current_function: {BroadwayKafka.Producer, :handle_info, 2},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 7,
  links: [#PID<0.5024.0>],
  dictionary: [
    {:"$initial_call", {GenStage, :init, 1}},
    {:"$ancestors",
     [Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
      Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
      Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4769.0>]},
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {48, []},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {39, []},
    {38, []},
    {37, []},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4768.0>,
  total_heap_size: 35914,
  heap_size: 6772,
  stack_size: 28,
  reductions: 6413879,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 8
  ],
  suspending: []
]

anoskov avatar Mar 03 '23 20:03 anoskov

This is very unexpected because Process.exit is, afaik, asynchronous.

josevalim avatar Mar 14 '23 17:03 josevalim

@josevalim maybe stacktrace not accurate and it stucks on https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L432 ? Although I don't understand how that could be.

anoskov avatar Mar 14 '23 20:03 anoskov

Can you consistently reproduce it now or it only happens from time to time?

josevalim avatar Mar 14 '23 20:03 josevalim

I can't reproduce this manual but it happens few times a week when some of our servers is idle and kafka closes connect.

I tried to emulate two gen servers, where the second calls the first while he process msg in handle_info and do Process.exit for second. And it works good, first server got msg in receive block after Process.exit.

If you need any additional information I can collect it next time

anoskov avatar Mar 14 '23 20:03 anoskov

@josevalim seems to have found the problem. brod_group_coordinator sets trap_exit: true on start https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L320

So

Process.exit(pid, reason) If pid is trapping exits, the exit signal is transformed into a message {:EXIT, from, reason} and delivered to the message queue of pid

coordinator should handle it here https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L374-L386 but stucks on call to Producer which waiting for {:DOWN, _, _, ^coord, _} from coordinator

anoskov avatar Mar 14 '23 21:03 anoskov

@josevalim Hello. Can we just remove receive block in producer's handle_info or he should wait end of exit?

anoskov avatar Mar 20 '23 12:03 anoskov

@anoskov I think we can remove it. I have just pushed a commit that does so, please give it a try.

josevalim avatar Mar 24 '23 08:03 josevalim

@josevalim hm. I still see it in main branch https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L428 commit doesn't include this https://github.com/dashbitco/broadway_kafka/commit/6ef6f41fab0fa5bcf8b322ac9129042c8ce45ceb

anoskov avatar Mar 24 '23 10:03 anoskov

Apologies, I clearly pushed the wrong commit. It is there now.

josevalim avatar Mar 24 '23 10:03 josevalim

Looks like fix helped. Thank you

anoskov avatar May 11 '23 14:05 anoskov