emqtt
emqtt copied to clipboard
Unhandled {error,closed} in state connected causes crash
After having established a connection and it's having been closed an attempt is made to re-establish that connection presumably if that attempt unexpectedly fails the following occurs: From a monitor {'EXIT',<0.29920.2>,{error,closed}} 19:35:49.593 [error] gen_statem <0.29920.2> in state connected terminated with reason: {{error,closed},state_functions} 19:35:49.594 [error] CRASH REPORT Process <0.29920.2> with 0 neighbours exited with reason: {error,closed} in gen_statem:loop_state_callback_result/11 line 1548
rebar.lock
{<<"emqtt">>,
{git,"https://github.com/emqx/emqtt",
{ref,"d640ee688a5ce510c91bda8ea3610d914f0fa421"}},
0},
to understand your trouble, could you give me code example?
handle_continue({start_emqtt}, #state{bin_hostname = BinHostname} = State) ->
WillPld = thoas:encode(#{event => disconnected, hostname => BinHostname, connect_timestamp => erlang:system_time(seconds)}),
EmqttOpts =
[{name, undefined},
{owner, self()},
{host, "MqttHost"},
{port, 8883},
{username, <<"MqttUsername">>},
{password, <<"MqttPassword">>},
{connect_timeout, 60},
{keepalive, 30},
{client_id, <<"app_test">>},
{tcp_opts, []},
{ssl_opts, []},
{ssl, true},
{bridge_mode, false},
{proto_ver, v4},
{max_inflight, infinity},
{retry_interval, 30},
{auto_ack, true},
{ack_timeout, 30},
{force_ping, false},
{reconnect, infinity},
{reconnect_timeout, 30},
{with_qoe_metrics, true},
{properties, #{}},
{will_topic, <<"WillTopic">>},
{will_payload, WillPld},
{will_retain, true},
{will_qos, 1},
{will_props, #{}}
],
{ok, EmqttPid} = emqtt:start_link(EmqttOpts),
EmqttMRef = erlang:monitor(process, EmqttPid),
{noreply, State#state{conn_mref = EmqttMRef, conn_pid = EmqttPid}, {continue, {connect_emqtt}}};
handle_continue({connect_emqtt}, #state{conn_pid = EmqttPid} = State) ->
case emqtt:connect(EmqttPid) of
{error, Error} ->
erlang:start_timer(5000, self(), {retry_connect_emqtt}),
{noreply, State}
{ok, _EmqttConnProps} ->
{noreply, State}
end;
handle_info({'DOWN', EmqttMRef, process, EmqttPid, _}, #state{conn_mref = EmqttMRef, conn_pid = EmqttPid} = State) ->
{noreply, State, {continue, {start_emqtt}}};
handle_info({retry_connect_emqtt}, State) ->
{noreply, State, {continue, connect_emqtt}};