emqx_kafka_bridge
emqx_kafka_bridge copied to clipboard
how to use emqx_kafa_bridge
I don't know how I can use this plugins. how to pub to EMQX and sub at kafka?
What do I do after install and load plugin?
- change "kafka.host = 127.0.0.1" , "kafka.port = 9092" to your kafka server ip and port.
- cretate "Processing" topic in your kafka server.
- start emqx and load plugin emqx_kafka_bridge.
- pub message to emqx, then emqx will produce json message to "Processing" topic.
- consume "Processing" topic.
ekaf_server:171 <0.1637.0>:connected/2 cant handle {metadata,req,#Port<0.42>}2018-11-27 16:00:28.463 [error] mosqpub/[email protected]:47082 ** Generic server <0.1633.0> terminating ** Last message in was {inet_async,#Port<0.25>,50, {ok,<<16,33,0,6,77,81,73,115,100,112,3,2,0,60,0, 19,109,111,115,113,112,117,98,47,53,54,53, 53,45,117,98,117,110,116,117>>}} ** When Server state == {state,esockd_transport,#Port<0.25>, {{10,10,0,31},47082}, undefined,running,true, {pstate,external, #Fun<emqx_connection.0.52215817>, {{10,10,0,31},47082}, nossl,4,<<"MQTT">>,<<>>,false,<0.1633.0>, undefined,undefined,undefined,undefined,false, #{},1048576,undefined,undefined,undefined, undefined,false,false,true,true, #{msg => 0,pkt => 0}, #{msg => 0,pkt => 0}, false,undefined,false}, {none,#{max_packet_size => 1048576,version => 4}}, undefined,true,undefined,undefined,undefined, undefined,undefined,15000} ** Reason for termination == ** {{timeout,{gen_fsm,sync_send_event,[<0.1637.0>,prepare]}}, [{gen_fsm,sync_send_event,2,[{file,"gen_fsm.erl"},{line,247}]}, {ekaf_lib,prepare,2,[{file,"src/ekaf_lib.erl"},{line,55}]}, {ekaf_lib,common_async,3,[{file,"src/ekaf_lib.erl"},{line,98}]}, {emqx_kafka_bridge,on_client_connected,4, [{file,"src/emqx_kafka_bridge.erl"},{line,55}]}, {emqx_hooks,run_,2,[{file,"src/emqx_hooks.erl"},{line,93}]}, {emqx_protocol,connack,1,[{file,"src/emqx_protocol.erl"},{line,466}]}, {emqx_connection,handle_packet,2, [{file,"src/emqx_connection.erl"},{line,328}]}, {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,637}]}]} 2018-11-27 16:00:28.467 [error] mosqpub/5655-ubuntu ** Generic server <0.1635.0> terminating ** Last message in was {'EXIT',<0.1633.0>, {timeout, {gen_fsm,sync_send_event, [<0.1637.0>,prepare]}}} ** When Server state == {state,15000,true,local,<<"mosqpub/5655-ubuntu">>, undefined,undefined,undefined,1,0,#{},false, {emqx_inflight,32,{0,nil}}, 20000,undefined, {mqueue,true,1000,0,0,none,infinity, {queue,[],[],0}}, #{},100,300000,undefined,0,undefined,true, undefined,0,0, {1543,309223,461869}, 0,undefined,undefined} ** Reason for termination == ** {timeout,{gen_fsm,sync_send_event,[<0.1637.0>,prepare]}}
ekaf_server:130 <0.1637.0>:downtime/2 cant handle {metadata,resp, {metadata_response,1, [{broker,0,<<"ubuntu">>, 9092}], [{topic,<<"telehaha">>,0, [{partition,0,0,0, [{replica,0}], [{isr,0}], 0,[]}]}]}} ekaf_server:130 <0.1637.0>:downtime/2 cant handle {metadata,resp, {metadata_response,1, [{broker,0,<<"ubuntu">>, 9092}], [{topic,<<"telehaha">>,0, [{partition,0,0,0, [{replica,0}], [{isr,0}], 0,[]}]}]}} ekaf_server:130 <0.1637.0>:downtime/2 cant handle {metadata,resp, {metadata_response,1, [{broker,0,<<"ubuntu">>, 9092}], [{topic,<<"telehaha">>,0, [{partition,0,0,0, [{replica,0}], [{isr,0}], 0,[]}]}]}} ekaf_server:130 <0.1637.0>:downtime/2 cant handle {metadata,resp, {metadata_response,1, [{broker,0,<<"ubuntu">>, 9092}], [{topic,<<"telehaha">>,0, [{partition,0,0,0, [{replica,0}], [{isr,0}], 0,[]}]}]}} ekaf_server:130 <0.1637.0>:downtime/2 cant handle {metadata,resp, {metadata_response,1, [{broker,0,<<"ubuntu">>, 9092}], [{topic,<<"telehaha">>,0, [{partition,0,0,0, [{replica,0}], [{isr,0}], 0,[]}]}]}}
this is my issuse
why use '<<"ubuntu">>'? try to change hostname to ip address .