brod
brod copied to clipboard
No reconnect after "payload connection down :shutdown, :tcp_closed}"
Hello! We have some issue with brod in kubernetes. When the service starts all consumers join to group and works fine. But after some time we got
2022-07-24 16:05:04.207 pid=<0.7117.24> [info] Group member (admin,coor=#PID<0.7117.24>,cb=#PID<0.20534.22>,generation=172):
re-joining group, reason:{:connection_down, {:shutdown, :tcp_closed}}
2022-07-24 16:05:04.207 pid=<0.7119.24> [info] client Client: payload connection down kafka-host:9092
reason:{:shutdown, :tcp_closed}
and after this brod doesn't reconnect and kafka-consumer-groups.sh says that consumer group has no active members.
We have been using brod quite a long time and usually he did reconnect after network issues. Now we use broadway_kafka built on brod.
What could be wrong?
I noticed that disconnect occurs 10 minutes after connection. This value is simillar to connections.max.idle.ms
and looks like kafka disonnects idle connections. I know that some kafka clients supports reconnect on idle
feature. Does the brod support it?
UPD.
brod_client process is alive and :brod_client.metadata/2 and :brod.fetch/4 returns data. But in state we see dead_since
iex(admin@35> :sys.get_state(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_0.Client))
{:state, Admin.Kafka.Consumer.Broadway.Producer_0.Client, [{"kafka", 9092}],
#PID<0.31736.27>,
[
{:conn, {"kafka-0", 9092},
{:dead_since, {1658, 257147, 463027}, {:shutdown, :tcp_closed}}}
], #PID<0.5428.4>, #PID<0.5430.4>, [connect_timeout: 10000],
Admin.Kafka.Consumer.Broadway.Producer_0.Client}
UPD2:
I checked Producer state and got timeout on :sys.get_state
because it stuck in handle_info callback on BrodClient.stop_group_coordinator -> :brod_group_coordinator.stop
iex(admin@)13> :erlang.process_info(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_1))
[
registered_name: Admin.Kafka.Consumer.Broadway.Producer_1,
current_function: {:brod_group_coordinator, :stop, 1},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 4,
links: [#PID<0.3880.0>],
dictionary: [
{63, []},
{62, []},
{61, []},
{60, []},
{59, []},
{58, []},
{57, []},
{56, []},
{55, []},
{54, []},
{53, []},
{52, []},
{51, []},
{50, []},
{49, []},
{:"$initial_call", {GenStage, :init, 1}},
{48, []},
{:"$ancestors",
[Admin.Kafka.Consumer.Broadway.ProducerSupervisor,
Admin.Kafka.Consumer.Broadway.Supervisor, Admin.Kafka.Consumer,
Admin.Supervisor, #PID<0.3569.0>]},
{47, []},
{46, []},
{45, []},
{44, []},
{43, []},
{42, []},
{41, []},
{40, []},
{39, []},
{38, []},
{37, []},
{36, []},
{35, []},
{34, []},
{33, []},
{32, []},
{31, []},
{30, []},
{29, []},
{28, []},
{27, []},
{26, []},
{25, []},
{24, ...},
{...},
...
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.3568.0>,
total_heap_size: 20338,
heap_size: 2586,
stack_size: 29,
reductions: 20656517,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 65535,
minor_gcs: 2858
],
suspending: []
]
Any suggestions why :brod_group_coordinator.stop
is blocked?
the stop call waits for the pid's DOWN message before returns. my guess is that the fetched batch is too large, causing the workers to be blocked for too long and not reacting to the assignment revocation, hence the coordinator is unable react to group rejoin events, so Kafka would just mark it as inactive.
maybe try these:
- Fetch smaller batches
- increase group rebalance timeout
@zmstone thanks for reply. It happens only if consumer not receiving messages for 10 minutes+ (connections.max.idle.ms). So at this point it doesn't process batches. The connection is closed permanently and does not recover even after a few hours until we restart the pod.
@zmstone hi! I found out that brod_group_coordinator stucks on some call and therefore cannot process terminate message. here is coordinator info:
[
current_function: {:gen, :do_call, 4},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 2,
links: [],
dictionary: [
rand_seed: {%{
bits: 58,
jump: #Function<3.92093067/1 in :rand."-fun.exsplus_jump/1-">,
next: #Function<0.92093067/1 in :rand."-fun.exsss_next/1-">,
type: :exsss,
uniform: #Function<1.92093067/1 in :rand."-fun.exsss_uniform/1-">,
uniform_n: #Function<2.92093067/2 in :rand."-fun.exsss_uniform/2-">
}, [76455265455526698 | 144843847514796377]},
"$initial_call": {:brod_group_coordinator, :init, 1},
"$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_8,
Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4609.0>]
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4608.0>,
total_heap_size: 2589,
heap_size: 1598,
stack_size: 48,
reductions: 780336,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 20,
minor_gcs: 5
],
suspending: []
]
I'm have a similar problem, but now with the producers connections.
Always, the first message that I try to send I got the error: {:connection_down, {:shutdown, :ssl_closed}}
Any tip how to solve it?
Here the config:
kafka_hosts = parse_kafka_hosts.()
config :brod,
clients: [
kafka_producer: [
endpoints: kafka_hosts,
restart_delay_seconds: 10,
auto_start_producers: true,
allow_topic_auto_creation: false,
default_producer_config: [
retry_backoff_ms: 50,
max_retries: 3,
required_acks: -1,
ack_timeout: 300,
max_linger_ms: 10,
max_linger_count: 5
]
client_id: "producer-#{System.fetch_env!("POD_NAME")}",
endpoints: kafka_hosts,
ssl: [
verify: :verify_peer,
cacertfile: "/etc/ssl/certs/ca-certificates.crt",
depth: 3,
customize_hostname_check: [
match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
]
],
sasl: {
"KAFKA_sasl_mechanisms" |> System.fetch_env!() |> String.downcase() |> String.to_atom(),
System.fetch_env!("KAFKA_sasl_username"),
System.fetch_env!("KAFKA_sasl_password")
}
]
]
Thanks =D
Hi @anoskov
My guess is the assignment revocation callback.
You can maybe try to get the current stacktrace of the coordinator process ?
in iex, it's Process.info(pid, :current_stacktrace)
Hi @robsonpeixoto Let's move the discussion to the other issue I created for you.
@zmstone Hello! You're right
iex([email protected])5> Process.info pid("0.3283.44"), :current_stacktrace
{:current_stacktrace,
[
{:gen, :do_call, 4, [file: 'gen.erl', line: 214]},
{GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1027]},
{BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
[file: 'lib/broadway_kafka/producer.ex', line: 525]},
{:telemetry, :span, 3,
[file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 320]},
{:brod_group_coordinator, :stabilize, 3,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 502
]},
{:brod_group_coordinator, :handle_info, 2,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 376
]},
{:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 695]},
{:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 771]}
]}
Finally got the pieces together. This is deadlock on race between a) assigments_revoked
call inside brod_group_coordinator which do infinity drain_after_revoke
call to Producer in Broadway implementation, and b) handle 'DOWN' message in Producer which call :brod_group_coordinator.stop
who is waiting result of a)
I think it is a BroadwayKafka issue. I close this issue. Thanks for help!