fluentd
fluentd copied to clipboard
fluentd support client mode when use in_unix
Is your feature request related to a problem? Please describe. If we need to get information from a socket created by another server, fluentd recreate socket file, it doesn't seem to work.
Describe the solution you'd like I found that logstash can support both server and client modes when use in_unix, but fluentd just support server mode.
If we need to get information from a socket created by another server, fluentd recreate socket file
Hmm, you are right: https://github.com/fluent/fluentd/blob/2cbb650b748c38602898af58c2d0eb251946a230/lib/fluent/plugin/in_unix.rb#L69
I found that logstash can support both server and client modes when use in_unix,
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-unix.html#plugins-inputs-unix-mode
I suppose in_unix
creates a listening UNIX socket, mostly because
out_foward
does not have a support for creating a UNIX socket.
- Basically
in_unix
isin_forward
for unix sockets. - In other words,
in_unix
can only handle Fluentd's custom forward protocol. - So it's not quite exactly a general UNIX socket adapter (unlike
in_tcp
orin_udp
).
So I think it's not very useful even if we add client
mode to in_unix
, because
neither Fluentd's and Fluent Bit's out_forward
don't support such a mode.
In short, no one will be able to make use of that mode!
I suppose
in_unix
creates a listening UNIX socket, mostly becauseout_foward
does not have a support for creating a UNIX socket.
- Basically
in_unix
isin_forward
for unix sockets.- In other words,
in_unix
can only handle Fluentd's custom forward protocol.- It's not quite exactly a general UNIX socket adapter (unlike
in_tcp
orin_udp
).So I think there is not much use if we add
client
mode toin_unix
, considering that neither Fluentd's and Fluent Bit'sout_forward
has a support for creating a UNIX socket. No one will be able to actually make use of the mode!
I can get logs from ceph rgw ops log socket(admin socket created by ceph radosgw) by client mode of logstash, however, how can I get logs from unix socket by fluentd if there is no client mode? I want to get logs from ceph admin socket by fluentd, then send logs to elasticsearch.
@ashie It just occurs to me: won't this make a good assignment for @daipom?
Basically, the task is to create an input plugin that:
- Can listen on an existing UNIX domain socket
- Can parse arbitrary incoming data via
<parse>
I'm fine with either creating a new plugin (something like in_unix_client
) or
extending our in_unix
to cover this use case.
It seems like a clear, well-defined task with a real-world usage scenario (as pointed out by @chenxianpao ), so I suppose it makes a great fit here.
@ashie It just occurs to me: won't this make a good assignment for @daipom?
Ah, it's nice idea!
@ashie @fujimotos Thank you for considering assigning me the implementation of such an interesting feature.
I looked through the current in_unix.rb
code and code of similar plugins,
and considered the direction of implementation.
I am thinking of creating a new plugin like in_unix_client
as follows.
- Simply connect to an existing socket and keep receiving.
- Use
parser plugin
of PluginHelper to parse received data.
Thanks :smiley: @daipom has created a new repository for it: https://github.com/daipom/fluent-plugin-unix-client
Thanks ^_^
@daipom I'm also checking your code. WFM.
@chenxianpao
I have released fluent-plugin-unix-client
.
- Git repository: https://github.com/daipom/fluent-plugin-unix-client
- Ruby gem: https://rubygems.org/gems/fluent-plugin-unix-client
I have confirmed that this can receive json format data as a client, but I don't know if this will be able to receive data successfully from ceph.
So I would like you to try to use this to see if you can receive data successfully.
You can download this by gem install fluent-plugin-unix-client
.
Here is a sample config.
<source>
@type unix_client
tag debug.unix_client
path /tmp/unix.sock
<parse>
@type json
</parse>
delimiter "\n"
</source>
<match debug.**>
@type stdout
</match>
@daipom Thanks. There have been a lot of things recently, and I can’t test in time, I will test as soon as possible
@daipom Sorry to test so late. I tried it, but I couldn't seem to read the data from ceph rgw socket, the stdout is empty. My log is: 2021-09-09 15:10:21 +0800 [info]: fluent/log.rb:330:info: Worker 0 finished with status 0 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: parsing config file is succeeded path="/etc/td-agent/td-agent.conf" 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-elasticsearch' version '5.0.3' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-elasticsearch' version '4.3.3' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-flowcounter-simple' version '0.1.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-kafka' version '0.16.1' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-kafka' version '0.16.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-prometheus' version '1.8.5' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-prometheus_pushgateway' version '0.0.2' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-record-modifier' version '2.1.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-rewrite-tag-filter' version '2.3.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-s3' version '1.5.1' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-sd-dns' version '0.1.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-systemd' version '1.0.2' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-td' version '1.1.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-unix-client' version '0.1.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-webhdfs' version '1.4.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluentd' version '1.14.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluentd' version '1.12.1' 2021-09-09 15:10:22 +0800 [trace]: fluent/log.rb:287:trace: registered output plugin 'stdout' 2021-09-09 15:10:22 +0800 [trace]: fluent/log.rb:287:trace: registered metrics plugin 'local' 2021-09-09 15:10:23 +0800 [trace]: fluent/log.rb:287:trace: registered buffer plugin 'memory' 2021-09-09 15:10:23 +0800 [trace]: fluent/log.rb:287:trace: registered formatter plugin 'stdout' 2021-09-09 15:10:23 +0800 [trace]: fluent/log.rb:287:trace: registered formatter plugin 'json' 2021-09-09 15:10:23 +0800 [trace]: fluent/log.rb:287:trace: registered input plugin 'unix_client' 2021-09-09 15:10:23 +0800 [trace]: fluent/log.rb:287:trace: registered parser plugin 'json' 2021-09-09 15:10:23 +0800 [debug]: fluent/log.rb:309:debug: No fluent logger for internal event 2021-09-09 15:10:23 +0800 [info]: fluent/log.rb:330:info: using configuration file:
<ROOT>
<source>
@type unix_client
tag "debug.unix_client"
path "/var/run/ceph/rgw-opslog.asok"
<parse>
@type "json"
</parse>
</source>
<match debug.**>
@type stdout
</match>
</ROOT>
2021-09-09 15:10:23 +0800 [info]: fluent/log.rb:330:info: starting fluentd-1.14.0 pid=4022411 ruby="2.7.2" 2021-09-09 15:10:23 +0800 [info]: fluent/log.rb:330:info: spawn command to main: cmdline=["/opt/td-agent/bin/ruby", "-Eascii-8bit:ascii-8bit", "/opt/td-agent/bin/fluentd", "-vv", "--log", "/var/log/td-agent/td-agent.log", "--daemon", "/var/run/td-agent/td-agent.pid", "--under-supervisor"] 2021-09-09 15:10:24 +0800 [info]: fluent/log.rb:330:info: adding match pattern="debug.**" type="stdout" 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered output plugin 'stdout' 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered metrics plugin 'local' 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered buffer plugin 'memory' 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered formatter plugin 'stdout' 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered formatter plugin 'json' 2021-09-09 15:10:24 +0800 [info]: fluent/log.rb:330:info: adding source type="unix_client" 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered input plugin 'unix_client' 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered parser plugin 'json' 2021-09-09 15:10:24 +0800 [debug]: #0 fluent/log.rb:309:debug: No fluent logger for internal event 2021-09-09 15:10:24 +0800 [info]: #0 fluent/log.rb:330:info: starting fluentd worker pid=4022423 ppid=4022420 worker=0 2021-09-09 15:10:24 +0800 [info]: #0 fluent/log.rb:330:info: fluentd worker is now running worker=0 2021-09-09 15:10:24 +0800 [info]: #0 fluent/log.rb:330:info: in_unix_client: opened socket: /var/run/ceph/rgw-opslog.asok.
My socket data is: [root@helloworld plugin]# nc -U /var/run/ceph/rgw-opslog.asok [{"bucket":"bucket_for_test","time":"2021-09-09 07:19:16.964098Z","time_local":"2021-09-09 15:19:16.964098","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":314271,"bytes_received":0,"object_size":0,"total_time":60,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.055099Z","time_local":"2021-09-09 15:19:17.055099","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&marker=79d5b270-a983-11eb-a0ae-00fffef4ca80.jpg&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":297599,"bytes_received":0,"object_size":0,"total_time":37,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.564102Z","time_local":"2021-09-09 15:19:17.564102","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":314271,"bytes_received":0,"object_size":0,"total_time":53,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.652103Z","time_local":"2021-09-09 15:19:17.652103","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&marker=79d5b270-a983-11eb-a0ae-00fffef4ca80.jpg&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":297599,"bytes_received":0,"object_size":0,"total_time":47,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.696103Z","time_local":"2021-09-09 15:19:17.696103","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":314271,"bytes_received":0,"object_size":0,"total_time":66,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.791103Z","time_local":"2021-09-09 15:19:17.791103","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&marker=79d5b270-a983-11eb-a0ae-00fffef4ca80.jpg&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":297599,"bytes_received":0,"object_size":0,"total_time":46,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.999105Z","time_local":"2021-09-09 15:19:17.999105","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":314271,"bytes_received":0,"object_size":0,"total_time":43,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:18.079105Z","time_local":"2021-09-09 15:19:18.079105","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&marker=79d5b270-a983-11eb-a0ae-00fffef4ca80.jpg&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":297599,"bytes_received":0,"object_size":0,"total_time":21,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""},
If I run "nc -U /var/run/ceph/rgw-opslog.asok" before start fluentd, I can get data on the screen, then if I restart fluentd, I get nothing when I operate ceph rgw. I'm not sure if this is normal. I’m not familiar with ruby, don’t know how to debug, can you give me some sugguestions?
@chenxianpao Thank you for the test.
This plugin is designed to recognize JSON objects with a delimiter \n
as follows.
{"A":"x","B":"y"}\n
{"A":"x","B":"y"}\n
{"A":"x","B":"y"}\n
...
However, this socket data seems to be a list of JSON as follows.
[{"A":"x","B":"y"},
{"A":"x","B":"y"},
{"A":"x","B":"y"},
...]
I think this difference causes the parsing to fail.
The delimiter can be changed, but we can't use ,
because it is used inside of JSON.
I will try to think about how to solve it.
@chenxianpao I'm sorry to reply so late.
How about setting delimiter
to ",\n"
?
If ,\n
is used as a delimiter for each data in ceph, then this setting might work.
<ROOT>
<source>
@type unix_client
tag "debug.unix_client"
path "/var/run/ceph/rgw-opslog.asok"
<parse>
@type "json"
</parse>
+ delimiter ",\n"
</source>
<match debug.**>
@type stdout
</match>
</ROOT>
I'm afraid the first data can't be received because the first character [
will make its parsing failed, but all subsequent data can be received correctly if ,\n
is used as a delimiter.
Additional implementation may be required to be able to receive the first data as well, and it may take some time.
Anyway I would appreciate if you could try this setting.
@chenxianpao I assume the format of the data of ceph is A or B.
- A: One big list
[{"A":"x","B":"y"},\n
{"A":"x","B":"y"},\n
{"A":"x","B":"y"},\n
]
- B: Multiple clusters
[{"A":"x","B":"y"},\n
{"A":"x","B":"y"},\n
],
[{"A":"x","B":"y"},\n
{"A":"x","B":"y"},\n
]
If it is B, the setting using ,\n
as the delimiter is not good, because all the first data of each cluster can not be received.
@daipom It works when I use ,\n
, I can see information in log. Thank you.
After the National Day holiday, I will continue to test and output to elasticsearch, and give you feedback.
@chenxianpao I'm glad it works. I'm waiting for your feedback.
@daipom Hi, I test it, it works. Thank you very much, sorry to test it for so long. I think it should be merged into the master branch.
td-agent.conf
<source>
@type unix_client
tag debug.unix_client
path /var/run/ceph/rgw-opslog.asok
<parse>
@type json
</parse>
delimiter ",\n"
</source>
<match debug.unix_client>
@type elasticsearch
host "myhostip"
port 9200
request_timeout 60s
logstash_format true
</match>
ElasticSearch Data:
@chenxianpao, Thank you for testing! I'm glad it works! However, please note that the first data is lost in the current version, as I said before.
I'm afraid the first data can't be received because the first character
[
will make its parsing failed, but all subsequent data can be received correctly if,\n
is used as a delimiter.
I will fix this problem in the next version!
@chenxianpao I'm sorry for taking so long.
I have released fluent-plugin-unix-client v1.0.0
.
- https://rubygems.org/gems/fluent-plugin-unix-client
However, please note that the first data is lost in the current version, as I said before.
I'm afraid the first data can't be received because the first character
[
will make its parsing failed, but all subsequent data can be received correctly if,\n
is used as a delimiter.I will fix this problem in the next version!
I added a new option format_json
, and you can use this as followoing.
<ROOT>
<source>
@type unix_client
tag "debug.unix_client"
path "/var/run/ceph/rgw-opslog.asok"
<parse>
@type "json"
</parse>
- delimiter ",\n"
+ delimiter "\n"
+ format_json true
</source>
<match debug.**>
@type stdout
</match>
</ROOT>
By this, you can receive the first data and the last data in the JSON list! I'm sorry for taking so long!