brod icon indicating copy to clipboard operation
brod copied to clipboard

No reconnect after "payload connection down :shutdown, :tcp_closed}"

Open anoskov opened this issue 1 year ago • 1 comments

Hello! We have some issue with brod in kubernetes. When the service starts all consumers join to group and works fine. But after some time we got

2022-07-24 16:05:04.207 pid=<0.7117.24> [info] Group member (admin,coor=#PID<0.7117.24>,cb=#PID<0.20534.22>,generation=172):
re-joining group, reason:{:connection_down, {:shutdown, :tcp_closed}}
2022-07-24 16:05:04.207 pid=<0.7119.24> [info] client Client: payload connection down kafka-host:9092
reason:{:shutdown, :tcp_closed}

and after this brod doesn't reconnect and kafka-consumer-groups.sh says that consumer group has no active members.

We have been using brod quite a long time and usually he did reconnect after network issues. Now we use broadway_kafka built on brod.

What could be wrong?

anoskov avatar Jul 25 '22 16:07 anoskov

I noticed that disconnect occurs 10 minutes after connection. This value is simillar to connections.max.idle.ms and looks like kafka disonnects idle connections. I know that some kafka clients supports reconnect on idle feature. Does the brod support it?

UPD. brod_client process is alive and :brod_client.metadata/2 and :brod.fetch/4 returns data. But in state we see dead_since

iex(admin@35> :sys.get_state(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_0.Client))
{:state, Admin.Kafka.Consumer.Broadway.Producer_0.Client, [{"kafka", 9092}],
 #PID<0.31736.27>,
 [
   {:conn, {"kafka-0", 9092},
    {:dead_since, {1658, 257147, 463027}, {:shutdown, :tcp_closed}}}
 ], #PID<0.5428.4>, #PID<0.5430.4>, [connect_timeout: 10000],
 Admin.Kafka.Consumer.Broadway.Producer_0.Client}

UPD2:

I checked Producer state and got timeout on :sys.get_state because it stuck in handle_info callback on BrodClient.stop_group_coordinator -> :brod_group_coordinator.stop

iex(admin@)13> :erlang.process_info(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_1))
[
  registered_name: Admin.Kafka.Consumer.Broadway.Producer_1,
  current_function: {:brod_group_coordinator, :stop, 1},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 4,
  links: [#PID<0.3880.0>],
  dictionary: [
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {:"$initial_call", {GenStage, :init, 1}},
    {48, []},
    {:"$ancestors",
     [Admin.Kafka.Consumer.Broadway.ProducerSupervisor,
      Admin.Kafka.Consumer.Broadway.Supervisor, Admin.Kafka.Consumer,
      Admin.Supervisor, #PID<0.3569.0>]},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {39, []},
    {38, []},
    {37, []},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.3568.0>,
  total_heap_size: 20338,
  heap_size: 2586,
  stack_size: 29,
  reductions: 20656517,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 65535,
    minor_gcs: 2858
  ],
  suspending: []
]

Any suggestions why :brod_group_coordinator.stop is blocked?

anoskov avatar Jul 25 '22 17:07 anoskov

the stop call waits for the pid's DOWN message before returns. my guess is that the fetched batch is too large, causing the workers to be blocked for too long and not reacting to the assignment revocation, hence the coordinator is unable react to group rejoin events, so Kafka would just mark it as inactive.

maybe try these:

  • Fetch smaller batches
  • increase group rebalance timeout

zmstone avatar Aug 20 '22 06:08 zmstone

@zmstone thanks for reply. It happens only if consumer not receiving messages for 10 minutes+ (connections.max.idle.ms). So at this point it doesn't process batches. The connection is closed permanently and does not recover even after a few hours until we restart the pod.

anoskov avatar Aug 20 '22 10:08 anoskov

@zmstone hi! I found out that brod_group_coordinator stucks on some call and therefore cannot process terminate message. here is coordinator info:

[
  current_function: {:gen, :do_call, 4},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 2,
  links: [],
  dictionary: [
    rand_seed: {%{
       bits: 58,
       jump: #Function<3.92093067/1 in :rand."-fun.exsplus_jump/1-">,
       next: #Function<0.92093067/1 in :rand."-fun.exsss_next/1-">,
       type: :exsss,
       uniform: #Function<1.92093067/1 in :rand."-fun.exsss_uniform/1-">,
       uniform_n: #Function<2.92093067/2 in :rand."-fun.exsss_uniform/2-">
     }, [76455265455526698 | 144843847514796377]},
    "$initial_call": {:brod_group_coordinator, :init, 1},
    "$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_8,
     Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
     Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
     Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4609.0>]
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4608.0>,
  total_heap_size: 2589,
  heap_size: 1598,
  stack_size: 48,
  reductions: 780336,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 5
  ],
  suspending: []
]

anoskov avatar Aug 22 '22 14:08 anoskov

I'm have a similar problem, but now with the producers connections.

Always, the first message that I try to send I got the error: {:connection_down, {:shutdown, :ssl_closed}}

Any tip how to solve it?

Here the config:

  kafka_hosts = parse_kafka_hosts.()
  config :brod,
    clients: [
      kafka_producer: [
        endpoints: kafka_hosts,
        restart_delay_seconds: 10,
        auto_start_producers: true,
        allow_topic_auto_creation: false,
        default_producer_config: [
          retry_backoff_ms: 50,
          max_retries: 3,
          required_acks: -1,
          ack_timeout: 300,
          max_linger_ms: 10,
          max_linger_count: 5
        ]
        client_id: "producer-#{System.fetch_env!("POD_NAME")}",
        endpoints: kafka_hosts,
        ssl: [
          verify: :verify_peer,
          cacertfile: "/etc/ssl/certs/ca-certificates.crt",
          depth: 3,
          customize_hostname_check: [
            match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
          ]
        ],
        sasl: {
          "KAFKA_sasl_mechanisms" |> System.fetch_env!() |> String.downcase() |> String.to_atom(),
          System.fetch_env!("KAFKA_sasl_username"),
          System.fetch_env!("KAFKA_sasl_password")
        }
      ]
    ]

Thanks =D

robsonpeixoto avatar Aug 25 '22 12:08 robsonpeixoto

Hi @anoskov My guess is the assignment revocation callback. You can maybe try to get the current stacktrace of the coordinator process ? in iex, it's Process.info(pid, :current_stacktrace)

zmstone avatar Aug 26 '22 12:08 zmstone

Hi @robsonpeixoto Let's move the discussion to the other issue I created for you.

zmstone avatar Aug 26 '22 12:08 zmstone

@zmstone Hello! You're right

iex([email protected])5> Process.info pid("0.3283.44"), :current_stacktrace
{:current_stacktrace,
 [
   {:gen, :do_call, 4, [file: 'gen.erl', line: 214]},
   {GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1027]},
   {BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 525]},
   {:telemetry, :span, 3,
    [file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 320]},
   {:brod_group_coordinator, :stabilize, 3,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 502
    ]},
   {:brod_group_coordinator, :handle_info, 2,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 376
    ]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 695]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 771]}
 ]}

anoskov avatar Aug 30 '22 07:08 anoskov

Finally got the pieces together. This is deadlock on race between a) assigments_revoked call inside brod_group_coordinator which do infinity drain_after_revoke call to Producer in Broadway implementation, and b) handle 'DOWN' message in Producer which call :brod_group_coordinator.stop who is waiting result of a)

I think it is a BroadwayKafka issue. I close this issue. Thanks for help!

anoskov avatar Aug 30 '22 08:08 anoskov