fluentd icon indicating copy to clipboard operation
fluentd copied to clipboard

fluentd support client mode when use in_unix

Open chenxianpao opened this issue 3 years ago • 19 comments

Is your feature request related to a problem? Please describe. If we need to get information from a socket created by another server, fluentd recreate socket file, it doesn't seem to work.

Describe the solution you'd like I found that logstash can support both server and client modes when use in_unix, but fluentd just support server mode.

chenxianpao avatar Apr 27 '21 03:04 chenxianpao

If we need to get information from a socket created by another server, fluentd recreate socket file

Hmm, you are right: https://github.com/fluent/fluentd/blob/2cbb650b748c38602898af58c2d0eb251946a230/lib/fluent/plugin/in_unix.rb#L69

I found that logstash can support both server and client modes when use in_unix,

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-unix.html#plugins-inputs-unix-mode

ashie avatar Apr 28 '21 09:04 ashie

I suppose in_unix creates a listening UNIX socket, mostly because out_foward does not have a support for creating a UNIX socket.

  • Basically in_unix is in_forward for unix sockets.
  • In other words, in_unix can only handle Fluentd's custom forward protocol.
  • So it's not quite exactly a general UNIX socket adapter (unlike in_tcp or in_udp).

So I think it's not very useful even if we add client mode to in_unix, because neither Fluentd's and Fluent Bit's out_forward don't support such a mode.

In short, no one will be able to make use of that mode!

fujimotos avatar Apr 30 '21 02:04 fujimotos

I suppose in_unix creates a listening UNIX socket, mostly because out_foward does not have a support for creating a UNIX socket.

  • Basically in_unix is in_forward for unix sockets.
  • In other words, in_unix can only handle Fluentd's custom forward protocol.
  • It's not quite exactly a general UNIX socket adapter (unlike in_tcp or in_udp).

So I think there is not much use if we add client mode to in_unix, considering that neither Fluentd's and Fluent Bit's out_forward has a support for creating a UNIX socket. No one will be able to actually make use of the mode!

I can get logs from ceph rgw ops log socket(admin socket created by ceph radosgw) by client mode of logstash, however, how can I get logs from unix socket by fluentd if there is no client mode? I want to get logs from ceph admin socket by fluentd, then send logs to elasticsearch.

chenxianpao avatar Apr 30 '21 02:04 chenxianpao

@ashie It just occurs to me: won't this make a good assignment for @daipom?

Basically, the task is to create an input plugin that:

  • Can listen on an existing UNIX domain socket
  • Can parse arbitrary incoming data via <parse>

I'm fine with either creating a new plugin (something like in_unix_client) or extending our in_unix to cover this use case.

It seems like a clear, well-defined task with a real-world usage scenario (as pointed out by @chenxianpao ), so I suppose it makes a great fit here.

fujimotos avatar Apr 30 '21 06:04 fujimotos

@ashie It just occurs to me: won't this make a good assignment for @daipom?

Ah, it's nice idea!

ashie avatar Apr 30 '21 06:04 ashie

@ashie @fujimotos Thank you for considering assigning me the implementation of such an interesting feature.

I looked through the current in_unix.rb code and code of similar plugins, and considered the direction of implementation.

I am thinking of creating a new plugin like in_unix_client as follows.

  • Simply connect to an existing socket and keep receiving.
  • Use parser plugin of PluginHelper to parse received data.

daipom avatar May 18 '21 06:05 daipom

Thanks :smiley: @daipom has created a new repository for it: https://github.com/daipom/fluent-plugin-unix-client

ashie avatar May 20 '21 06:05 ashie

Thanks ^_^

chenxianpao avatar May 20 '21 09:05 chenxianpao

@daipom I'm also checking your code. WFM.

fujimotos avatar May 20 '21 11:05 fujimotos

@chenxianpao

I have released fluent-plugin-unix-client.

  • Git repository: https://github.com/daipom/fluent-plugin-unix-client
  • Ruby gem: https://rubygems.org/gems/fluent-plugin-unix-client

I have confirmed that this can receive json format data as a client, but I don't know if this will be able to receive data successfully from ceph.

So I would like you to try to use this to see if you can receive data successfully.

You can download this by gem install fluent-plugin-unix-client. Here is a sample config.

<source>
  @type unix_client
  tag debug.unix_client
  path /tmp/unix.sock
  <parse>
    @type json
  </parse>
  delimiter "\n"
</source>

<match debug.**>
  @type stdout
</match>

daipom avatar May 31 '21 05:05 daipom

@daipom Thanks. There have been a lot of things recently, and I can’t test in time, I will test as soon as possible

chenxianpao avatar Jun 03 '21 02:06 chenxianpao

@daipom Sorry to test so late. I tried it, but I couldn't seem to read the data from ceph rgw socket, the stdout is empty. My log is: 2021-09-09 15:10:21 +0800 [info]: fluent/log.rb:330:info: Worker 0 finished with status 0 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: parsing config file is succeeded path="/etc/td-agent/td-agent.conf" 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-elasticsearch' version '5.0.3' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-elasticsearch' version '4.3.3' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-flowcounter-simple' version '0.1.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-kafka' version '0.16.1' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-kafka' version '0.16.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-prometheus' version '1.8.5' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-prometheus_pushgateway' version '0.0.2' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-record-modifier' version '2.1.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-rewrite-tag-filter' version '2.3.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-s3' version '1.5.1' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-sd-dns' version '0.1.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-systemd' version '1.0.2' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-td' version '1.1.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-unix-client' version '0.1.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluent-plugin-webhdfs' version '1.4.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluentd' version '1.14.0' 2021-09-09 15:10:22 +0800 [info]: fluent/log.rb:330:info: gem 'fluentd' version '1.12.1' 2021-09-09 15:10:22 +0800 [trace]: fluent/log.rb:287:trace: registered output plugin 'stdout' 2021-09-09 15:10:22 +0800 [trace]: fluent/log.rb:287:trace: registered metrics plugin 'local' 2021-09-09 15:10:23 +0800 [trace]: fluent/log.rb:287:trace: registered buffer plugin 'memory' 2021-09-09 15:10:23 +0800 [trace]: fluent/log.rb:287:trace: registered formatter plugin 'stdout' 2021-09-09 15:10:23 +0800 [trace]: fluent/log.rb:287:trace: registered formatter plugin 'json' 2021-09-09 15:10:23 +0800 [trace]: fluent/log.rb:287:trace: registered input plugin 'unix_client' 2021-09-09 15:10:23 +0800 [trace]: fluent/log.rb:287:trace: registered parser plugin 'json' 2021-09-09 15:10:23 +0800 [debug]: fluent/log.rb:309:debug: No fluent logger for internal event 2021-09-09 15:10:23 +0800 [info]: fluent/log.rb:330:info: using configuration file:

<ROOT>
  <source>
    @type unix_client
    tag "debug.unix_client"
    path "/var/run/ceph/rgw-opslog.asok"
    <parse>
      @type "json"
    </parse>
  </source>
  <match debug.**>
    @type stdout
  </match>
</ROOT>

2021-09-09 15:10:23 +0800 [info]: fluent/log.rb:330:info: starting fluentd-1.14.0 pid=4022411 ruby="2.7.2" 2021-09-09 15:10:23 +0800 [info]: fluent/log.rb:330:info: spawn command to main: cmdline=["/opt/td-agent/bin/ruby", "-Eascii-8bit:ascii-8bit", "/opt/td-agent/bin/fluentd", "-vv", "--log", "/var/log/td-agent/td-agent.log", "--daemon", "/var/run/td-agent/td-agent.pid", "--under-supervisor"] 2021-09-09 15:10:24 +0800 [info]: fluent/log.rb:330:info: adding match pattern="debug.**" type="stdout" 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered output plugin 'stdout' 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered metrics plugin 'local' 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered buffer plugin 'memory' 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered formatter plugin 'stdout' 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered formatter plugin 'json' 2021-09-09 15:10:24 +0800 [info]: fluent/log.rb:330:info: adding source type="unix_client" 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered input plugin 'unix_client' 2021-09-09 15:10:24 +0800 [trace]: #0 fluent/log.rb:287:trace: registered parser plugin 'json' 2021-09-09 15:10:24 +0800 [debug]: #0 fluent/log.rb:309:debug: No fluent logger for internal event 2021-09-09 15:10:24 +0800 [info]: #0 fluent/log.rb:330:info: starting fluentd worker pid=4022423 ppid=4022420 worker=0 2021-09-09 15:10:24 +0800 [info]: #0 fluent/log.rb:330:info: fluentd worker is now running worker=0 2021-09-09 15:10:24 +0800 [info]: #0 fluent/log.rb:330:info: in_unix_client: opened socket: /var/run/ceph/rgw-opslog.asok.

My socket data is: [root@helloworld plugin]# nc -U /var/run/ceph/rgw-opslog.asok [{"bucket":"bucket_for_test","time":"2021-09-09 07:19:16.964098Z","time_local":"2021-09-09 15:19:16.964098","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":314271,"bytes_received":0,"object_size":0,"total_time":60,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.055099Z","time_local":"2021-09-09 15:19:17.055099","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&marker=79d5b270-a983-11eb-a0ae-00fffef4ca80.jpg&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":297599,"bytes_received":0,"object_size":0,"total_time":37,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.564102Z","time_local":"2021-09-09 15:19:17.564102","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":314271,"bytes_received":0,"object_size":0,"total_time":53,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.652103Z","time_local":"2021-09-09 15:19:17.652103","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&marker=79d5b270-a983-11eb-a0ae-00fffef4ca80.jpg&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":297599,"bytes_received":0,"object_size":0,"total_time":47,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.696103Z","time_local":"2021-09-09 15:19:17.696103","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":314271,"bytes_received":0,"object_size":0,"total_time":66,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.791103Z","time_local":"2021-09-09 15:19:17.791103","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&marker=79d5b270-a983-11eb-a0ae-00fffef4ca80.jpg&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":297599,"bytes_received":0,"object_size":0,"total_time":46,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:17.999105Z","time_local":"2021-09-09 15:19:17.999105","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":314271,"bytes_received":0,"object_size":0,"total_time":43,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""}, {"bucket":"bucket_for_test","time":"2021-09-09 07:19:18.079105Z","time_local":"2021-09-09 15:19:18.079105","remote_addr":"172.21.114.101","user":"cxp","operation":"list_bucket","uri":"GET /bucket_for_test/?delimiter=%2F&marker=79d5b270-a983-11eb-a0ae-00fffef4ca80.jpg&max-keys=1000&prefix= HTTP/1.1","http_status":"200","error_code":"","bytes_sent":297599,"bytes_received":0,"object_size":0,"total_time":21,"user_agent":"S3 Browser 9.5.5 https://s3browser.com","referrer":""},

If I run "nc -U /var/run/ceph/rgw-opslog.asok" before start fluentd, I can get data on the screen, then if I restart fluentd, I get nothing when I operate ceph rgw. I'm not sure if this is normal. I’m not familiar with ruby, don’t know how to debug, can you give me some sugguestions?

chenxianpao avatar Sep 09 '21 07:09 chenxianpao

@chenxianpao Thank you for the test.

This plugin is designed to recognize JSON objects with a delimiter \n as follows.

{"A":"x","B":"y"}\n
{"A":"x","B":"y"}\n
{"A":"x","B":"y"}\n
...

However, this socket data seems to be a list of JSON as follows.

[{"A":"x","B":"y"},
{"A":"x","B":"y"},
{"A":"x","B":"y"},
...]

I think this difference causes the parsing to fail.

The delimiter can be changed, but we can't use , because it is used inside of JSON. I will try to think about how to solve it.

daipom avatar Sep 11 '21 11:09 daipom

@chenxianpao I'm sorry to reply so late. How about setting delimiter to ",\n" ? If ,\n is used as a delimiter for each data in ceph, then this setting might work.

<ROOT>
  <source>
    @type unix_client
    tag "debug.unix_client"
    path "/var/run/ceph/rgw-opslog.asok"
    <parse>
      @type "json"
    </parse>
+   delimiter ",\n"
  </source>
  <match debug.**>
    @type stdout
  </match>
</ROOT>

I'm afraid the first data can't be received because the first character [ will make its parsing failed, but all subsequent data can be received correctly if ,\n is used as a delimiter.

Additional implementation may be required to be able to receive the first data as well, and it may take some time.

Anyway I would appreciate if you could try this setting.

daipom avatar Sep 23 '21 07:09 daipom

@chenxianpao I assume the format of the data of ceph is A or B.

  • A: One big list
[{"A":"x","B":"y"},\n
{"A":"x","B":"y"},\n
{"A":"x","B":"y"},\n
]
  • B: Multiple clusters
[{"A":"x","B":"y"},\n
{"A":"x","B":"y"},\n
],
[{"A":"x","B":"y"},\n
{"A":"x","B":"y"},\n
]

If it is B, the setting using ,\n as the delimiter is not good, because all the first data of each cluster can not be received.

daipom avatar Sep 23 '21 08:09 daipom

@daipom It works when I use ,\n, I can see information in log. Thank you. image

After the National Day holiday, I will continue to test and output to elasticsearch, and give you feedback.

chenxianpao avatar Sep 30 '21 10:09 chenxianpao

@chenxianpao I'm glad it works. I'm waiting for your feedback.

daipom avatar Oct 01 '21 07:10 daipom

@daipom Hi, I test it, it works. Thank you very much, sorry to test it for so long. I think it should be merged into the master branch.

td-agent.conf

<source>
  @type unix_client
  tag debug.unix_client
  path /var/run/ceph/rgw-opslog.asok
  <parse>
    @type json
  </parse>
  delimiter ",\n"
</source>
<match debug.unix_client>
  @type elasticsearch
  host "myhostip"
  port 9200
  request_timeout 60s
  logstash_format true
</match>

ElasticSearch Data: image

image

chenxianpao avatar Oct 09 '21 07:10 chenxianpao

@chenxianpao, Thank you for testing! I'm glad it works! However, please note that the first data is lost in the current version, as I said before.

I'm afraid the first data can't be received because the first character [ will make its parsing failed, but all subsequent data can be received correctly if ,\n is used as a delimiter.

I will fix this problem in the next version!

daipom avatar Oct 13 '21 06:10 daipom

@chenxianpao I'm sorry for taking so long. I have released fluent-plugin-unix-client v1.0.0.

  • https://rubygems.org/gems/fluent-plugin-unix-client

However, please note that the first data is lost in the current version, as I said before.

I'm afraid the first data can't be received because the first character [ will make its parsing failed, but all subsequent data can be received correctly if ,\n is used as a delimiter.

I will fix this problem in the next version!

I added a new option format_json, and you can use this as followoing.

<ROOT>
  <source>
    @type unix_client
    tag "debug.unix_client"
    path "/var/run/ceph/rgw-opslog.asok"
    <parse>
      @type "json"
    </parse>
-   delimiter ",\n"
+   delimiter "\n"
+   format_json true
  </source>
  <match debug.**>
    @type stdout
  </match>
</ROOT>

By this, you can receive the first data and the last data in the JSON list! I'm sorry for taking so long!

daipom avatar Aug 22 '22 17:08 daipom