fluent-plugin-mqtt-io
fluent-plugin-mqtt-io copied to clipboard
MQTT output with ssl not working
Hi Toyokazu,
It's been a long time. Hope you are doing well.
We are facing issue while using ssl certificates in mqtt output plugin.
Below is the td-agent.conf file.
<source>
@type mqtt
host 127.0.0.1
port 1883
topic testsan
<parse>
@type none
</parse>
@label @awsiot
</source>
#Output mqtt to sanku_san topic
<label @awsiot>
<match **>
@type mqtt
host appsu1758md74-ats.iot.ap-south-1.amazonaws.com
# host a27v4oxaj38gth-ats.iot.ap-south-1.amazonaws.com
port 8883
topic sankujain
<format>
@type json
</format>
<security>
use_tls true
<tls>
ca_file /home/sanku/Intellyzen/Practice/AwsIoTCore/awsroot1.pem
key_file /home/sanku/Intellyzen/Practice/AwsIoTCore/aa43d35f3f-private.pem.key
cert_file /home/sanku/Intellyzen/Practice/AwsIoTCore/aa43d35f3f-certificate.pem.crt
</tls>
</security>
topic_rewrite_pattern '^([\w\/]+)$'
topic_rewrite_replacement 'sankujain'
retry_inc_ratio 2
retry_forever true
keep_alive 5s
<monitor>
send_time true
</monitor>
<buffer>
@type file
path /var/log/td-agent/mqtt/topic/enrichportal/log
flush_interval 10s
</buffer>
</match>
</label>
Also attached you td-agent log for reference.
019-01-30 13:13:42 +0000 [info]: #0 starting fluentd worker pid=3876 ppid=3864 worker=0
2019-01-30 13:13:42 +0000 [info]: #0 [input_debug_agent] listening dRuby uri="druby://127.0.0.1:24230" object="Fluent::Engine" worker=0
2019-01-30 13:13:42 +0000 [info]: #0 [input_forward] listening port port=24224 bind="0.0.0.0"
2019-01-30 13:13:42 +0000 [info]: #0 fluentd worker is now running worker=0
2019-01-30 13:13:42 +0000 [error]: #0 Protocol error occurs in out_mqtt.,MQTT::ProtocolException,Failed to read byte from socket
2019-01-30 13:13:42 +0000 [error]: #0 Retry in 1 sec
2019-01-30 13:13:48 +0000 [error]: #0 Timeout error occurs in out_mqtt.,Timeout::Error,execution expired
2019-01-30 13:13:48 +0000 [error]: #0 Retry in 1 sec
2019-01-30 13:13:48 +0000 [warn]: #0 failed to flush the buffer. retry_time=0 next_retry_seconds=2019-01-30 13:13:49 +0000 chunk="580acaaab3d26b169b6ca7c2fd760fa1" error_class=Timeout::Error error="Timeout error occurs."
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mqtt-io-0.4.2/lib/fluent/plugin/mqtt_proxy.rb:123:in `rescue in rescue_disconnection'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mqtt-io-0.4.2/lib/fluent/plugin/mqtt_proxy.rb:106:in `rescue_disconnection'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mqtt-io-0.4.2/lib/fluent/plugin/out_mqtt.rb:163:in `block in write'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/event.rb:323:in `each'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/event.rb:323:in `block in each'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/plugin/buffer/file_chunk.rb:172:in `open'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/event.rb:322:in `each'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mqtt-io-0.4.2/lib/fluent/plugin/out_mqtt.rb:162:in `write'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/plugin/output.rb:1123:in `try_flush'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/plugin/output.rb:1423:in `flush_thread_run'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/plugin/output.rb:452:in `block (2 levels) in start'
2019-01-30 13:13:48 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2019-01-30 13:13:49 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2019-01-30 13:13:50 +0000 chunk="580acaaab3d26b169b6ca7c2fd760fa1" error_class=Fluent::Plugin::MqttProxy::MqttError error="MQTT not connected exception occurs in out_mqtt."
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mqtt-io-0.4.2/lib/fluent/plugin/mqtt_proxy.rb:137:in `rescue in rescue_disconnection'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mqtt-io-0.4.2/lib/fluent/plugin/mqtt_proxy.rb:106:in `rescue_disconnection'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mqtt-io-0.4.2/lib/fluent/plugin/out_mqtt.rb:163:in `block in write'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/event.rb:323:in `each'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/event.rb:323:in `block in each'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/plugin/buffer/file_chunk.rb:172:in `open'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/event.rb:322:in `each'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mqtt-io-0.4.2/lib/fluent/plugin/out_mqtt.rb:162:in `write'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/plugin/output.rb:1123:in `try_flush'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/plugin/output.rb:1423:in `flush_thread_run'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/plugin/output.rb:452:in `block (2 levels) in start'
2019-01-30 13:13:49 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.3.3/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2019-01-30 13:13:49 +0000 [error]: #0 Protocol error occurs in out_mqtt.,MQTT::ProtocolException,Failed to read byte from socket
2019-01-30 13:13:49 +0000 [error]: #0 Retry in 2 sec
Hello. I've tested TLS connection using esayrsa and it does not show any errors. I'd like to share the configuration details. I hope it will be your help.
Your certificates may have some issues, e.g. missing key usage or alt name etc.
I think that easyrsa is an easy tool for creating certificates. Please try it.
# clone easyrsa
pwd
/path/to
git clone https://github.com/OpenVPN/easy-rsa.git
cd easy-rsa/easyrsa3
How to use easy-rsa is written in the following document.
https://github.com/OpenVPN/easy-rsa/blob/v3.0.6/README.quickstart.md
# init CA
./easyrsa init-pki
# generate certificate signing request for server (localhost)
./easyrsa gen-req localhost
# if you don’t want to set passphrase, add option “nopass”
# ./easyrsa gen-req localhost nopass
# sign certificate signing request
./easyrsa sign-req server localhost
# generate certificate signing request for client (client)
./easyrsa gen-req client nopass
# sign certificate signing request
./easyrsa sign-req client client
Now you can get certificates localhost.crt and client.crt under /path/to/easy-rsa/easyrsa3/pki/issued, keys localhost.key and client.key under /path/to/easy-rsa/easyrsa3/pki/private.
If you use mosquitto, you can configure the server as follows:
vi /usr/local/etc/mosquitto/mosquitto.conf
---
…
port 8883
...
cafile /path/to/easy-rsa/easyrsa3/pki/ca.crt
certfile /path/to/easy-rsa/easyrsa3/pki/issued/localhost.crt
keyfile /path/to/easy-rsa/easyrsa3/pki/private/localhost.key
...
---
/usr/local/sbin/mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf
# configure fluent.conf for testing in_mqtt
vi fluent-in_mqtt.conf
---
<source>
@type mqtt
host localhost
port 8883
topic test
<parse>
@type none
</parse>
<security>
use_tls true
<tls>
cafile /path/to/easy-rsa/easyrsa3/pki/ca.crt
certfile /path/to/easy-rsa/easyrsa3/pki/issued/client.crt
keyfile /path/to/easy-rsa/easyrsa3/pki/private/client.key
</tls>
</security>
@label @test
</source>
<label @test>
<match **>
@type stdout
</match>
</label>
---
# start fluentd
fluentd -v -c fluent-in_mqtt.conf
# test publish to the topic “test”
mosquitto_pub -L mqtts://localhost:8883/test -m "hoge" --cert /path/to/easy-rsa/easyrsa3/pki/issued/client.crt --key /path/to/easy-rsa/easyrsa3/pki/private/client.key --cafile /path/to/easy-rsa/easyrsa3/pki/ca.crt
# configure fluent.conf for testing out_mqtt
vi fluent-out_mqtt.conf
---
<source>
@type mqtt
host localhost
port 8883
topic test
<parse>
@type none
</parse>
<security>
use_tls true
<tls>
cafile /path/to/easy-rsa/easyrsa3/pki/ca.crt
certfile /path/to/easy-rsa/easyrsa3/pki/issued/client.crt
keyfile /path/to/easy-rsa/easyrsa3/pki/private/client.key
</tls>
</security>
@label @test
</source>
<label @test>
<match **>
@type mqtt
host localhost
port 8883
topic_rewrite_pattern '^([\w\/]+)$'
topic_rewrite_replacement '\1/rewritten'
<security>
use_tls true
<tls>
cafile /path/to/easy-rsa/easyrsa3/pki/ca.crt
certfile /path/to/easy-rsa/easyrsa3/pki/issued/client.crt
keyfile /path/to/easy-rsa/easyrsa3/pki/private/client.key
</tls>
</security>
<buffer>
flush_interval 10s
</buffer>
</match>
</label>
---
# subscribe a topic test/rewritten by mosquitto_sub
mosquitto_sub -L mqtts://localhost:8883/test/rewritten --cert /path/to/easy-rsa/easyrsa3/pki/issued/client.crt --key /path/to/easy-rsa/easyrsa3/pki/private/client.key --cafile /path/to/easy-rsa/easyrsa3/pki/ca.crt
# test publish to the topic “test”
mosquitto_pub -L mqtts://localhost:8883/test -m "hoge" --cert /path/to/easy-rsa/easyrsa3/pki/issued/client.crt --key /path/to/easy-rsa/easyrsa3/pki/private/client.key --cafile /path/to/easy-rsa/easyrsa3/pki/ca.crt