emqtt icon indicating copy to clipboard operation
emqtt copied to clipboard

Unhandled {error,closed} in state connected causes crash

Open rtATvw opened this issue 1 year ago • 2 comments

After having established a connection and it's having been closed an attempt is made to re-establish that connection presumably if that attempt unexpectedly fails the following occurs: From a monitor {'EXIT',<0.29920.2>,{error,closed}} 19:35:49.593 [error] gen_statem <0.29920.2> in state connected terminated with reason: {{error,closed},state_functions} 19:35:49.594 [error] CRASH REPORT Process <0.29920.2> with 0 neighbours exited with reason: {error,closed} in gen_statem:loop_state_callback_result/11 line 1548

rebar.lock

 {<<"emqtt">>,                                                                  
   {git,"https://github.com/emqx/emqtt",                                         
        {ref,"d640ee688a5ce510c91bda8ea3610d914f0fa421"}},                       
   0},

rtATvw avatar Jan 23 '24 15:01 rtATvw

to understand your trouble, could you give me code example?

qzhuyan avatar Jan 26 '24 15:01 qzhuyan

handle_continue({start_emqtt}, #state{bin_hostname = BinHostname} = State) ->
  WillPld = thoas:encode(#{event => disconnected, hostname => BinHostname, connect_timestamp => erlang:system_time(seconds)}),
   EmqttOpts = 
     [{name, undefined},
      {owner, self()},
      {host, "MqttHost"},
      {port, 8883},
      {username, <<"MqttUsername">>},
      {password, <<"MqttPassword">>},
      {connect_timeout, 60},
      {keepalive, 30},
      {client_id, <<"app_test">>},
      {tcp_opts, []},
      {ssl_opts, []},
      {ssl, true},
      {bridge_mode, false},
      {proto_ver, v4},
      {max_inflight, infinity},
      {retry_interval, 30},
      {auto_ack, true},
      {ack_timeout, 30},
      {force_ping, false},
      {reconnect, infinity},
      {reconnect_timeout, 30},
      {with_qoe_metrics, true},
      {properties, #{}},
      {will_topic, <<"WillTopic">>},
      {will_payload, WillPld},
      {will_retain, true},
      {will_qos, 1},
      {will_props, #{}}
      ],                                                                         
                                                                                 
  {ok, EmqttPid} = emqtt:start_link(EmqttOpts),                                 
  EmqttMRef = erlang:monitor(process, EmqttPid),
  {noreply, State#state{conn_mref = EmqttMRef, conn_pid = EmqttPid}, {continue, {connect_emqtt}}};

handle_continue({connect_emqtt}, #state{conn_pid = EmqttPid} = State) ->
  case emqtt:connect(EmqttPid) of                                                       
    {error, Error} ->                                                                                                  
       erlang:start_timer(5000, self(), {retry_connect_emqtt}),
       {noreply, State}                                                         
    {ok, _EmqttConnProps} ->                                                    
      {noreply, State}                                                                    
  end;
   
handle_info({'DOWN', EmqttMRef, process, EmqttPid, _}, #state{conn_mref = EmqttMRef, conn_pid = EmqttPid} = State) ->
   {noreply, State, {continue, {start_emqtt}}};
handle_info({retry_connect_emqtt}, State) ->                                    
   {noreply, State, {continue, connect_emqtt}};

rtATvw avatar Jan 26 '24 19:01 rtATvw