Deadlock on race between assigments_revoked call and handle DOWN message
Hello! As a result of the analysis of the previous issue (https://github.com/dashbitco/broadway_kafka/issues/112), it turned out that this was caused by deadlock. Deadlock on race between a) assigments_revoked callback call inside :brod_group_coordinator b) handle 'DOWN' message inside BroadwayKafka producer
I'll describe: a) :brod_group_coordinator in stabilize function do assigments_revoked call (BroadwayKafka producer implementation) https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L502 which do infinity GenStage.call to producer https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L525 b) Producer a moment before received 'DOWN' message and called :brod_group_coordinator.stop https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L422 who is stuck and waiting result of a)
group coordinator process info:
[
current_function: {:gen, :do_call, 4},
initial_call: {:proc_lib, :init_p, 5},
status: :waiting,
message_queue_len: 2,
links: [],
dictionary: [
rand_seed: {%{
bits: 58,
jump: #Function<3.92093067/1 in :rand."-fun.exsplus_jump/1-">,
next: #Function<0.92093067/1 in :rand."-fun.exsss_next/1-">,
type: :exsss,
uniform: #Function<1.92093067/1 in :rand."-fun.exsss_uniform/1-">,
uniform_n: #Function<2.92093067/2 in :rand."-fun.exsss_uniform/2-">
}, [23511365307712030 | 235316257979211424]},
"$initial_call": {:brod_group_coordinator, :init, 1},
"$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_4,
Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4609.0>]
],
trap_exit: true,
error_handler: :error_handler,
priority: :normal,
group_leader: #PID<0.4608.0>,
total_heap_size: 3577,
heap_size: 2586,
stack_size: 48,
reductions: 521044,
garbage_collection: [
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422,
min_heap_size: 233,
fullsweep_after: 20,
minor_gcs: 17
],
suspending: []
]
group coordinator stacktrace
{:current_stacktrace,
[
{:gen, :do_call, 4, [file: 'gen.erl', line: 214]},
{GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1027]},
{BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
[file: 'lib/broadway_kafka/producer.ex', line: 525]},
{:telemetry, :span, 3,
[file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 320]},
{:brod_group_coordinator, :stabilize, 3,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 502
]},
{:brod_group_coordinator, :handle_info, 2,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 376
]},
{:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 695]},
{:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 771]}
]}
producer stacktrace
{:current_stacktrace,
[
{:brod_group_coordinator, :stop, 1,
[
file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
line: 311
]},
{BroadwayKafka.Producer, :terminate, 2,
[file: 'lib/broadway_kafka/producer.ex', line: 533]},
{:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 733]},
{:gen_server, :terminate, 10, [file: 'gen_server.erl', line: 918]},
{:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
]}