Add ZMQ input plugin
Added a plugin which can received the message to be sent out over ZMQ channels. For each ZMQ input plugin there must be an "Endpoint" configured, which contains the name of the ZMQ socket which will be accessed to receive the messages (for example: "ipc:///var/run/zmq-channel-1"). The ZMQ socket must be of style PULL/PUSH - the ZMQ input plugin will PULL messages from the socket and the producer of the information must PUSH them to the socket. Note that the producer must to the "bind" to the ZMQ socket, as the ZMQ plugin will do the "connect". There is also optional config variable "hwm" which can be configured for the plug-in. This will cause the receive and send high-water-marks for the ZMQ socket to be set to the value. The value is in messages, and defaults to unlimited.
As normal with internal MSGPACK messages, there is an outer envelope array of two, with the first element being the timestamp, and the second one being a map of keys and their values. The ZMQ message received must contain three parts. The first part is encoded as a string named "topic" in the MAP part of the MSGPACK. The second part is encoded as a string named "key" in the MAP part. The third part is encoded as a binary data with a name "payload" in the MAP part.
Note: this plugin can currently only be compiled for Linux. It can be enabled on the cmake line with "-DFLB_IN_ZMQ=Yes".
Note that to compile this plugin the libczmq-dev package (and its dependencies) need installed.
Signed-off-by: Gavin Shearer [email protected]
Thanks for your contribution.
Based on community needs, we will evaluate the inclusion of this plugin.
note: this plugin depends on external system library libczmq
assigned @fujimotos for review
Note: I have just started to take a look at this patch.
I confirmed that this plugin works as described.
Attached is the test script I have used, and its execution result.
Test Client In Python
import time
import zmq
import msgpack
ctx = zmq.Context()
sock = ctx.socket(zmq.PUSH)
sock.bind("tcp://127.0.0.1:5555")
while True:
sock.send_multipart([b"topic", b"key", b"payload"], track=False)
time.sleep(1)
Fluent Bit log
$ fluent-bit -i zmq -p endpoint=tcp://127.0.0.1:5555 -o stdout
...
[0] zmq.0: [1606894429.236495720, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
[0] zmq.0: [1606894430.237640321, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
[1] zmq.0: [1606894431.238933348, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
[2] zmq.0: [1606894432.239989316, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
[3] zmq.0: [1606894433.241117389, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
[4] zmq.0: [1606894434.242347335, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
``
That said, there a few things I'm not sure about this patch:
-
Is the message format correct?
- I see Fluentd's ZMQ plugin allows users to send an event in a single frame.
- That seems be more intuitive to me than sending one in 3 frames (as the current patch does).
-
Shouldn't "topic" be emitted as Fluent Bit's tag (rather than a field in a record?)
a.k.a. shouldn't the record format be:
tag.from.zeromq: [1606894434.242347335, {"key"=>"key", "payload"=>"payload"}]instead of the following?
zmq.0: [1606894434.242347335, {"topic"=>"tag.from.zeromq", "key"=>"key", "payload"=>"payload"}]
But I'm no expart on ZMQ. If there is a good rationale for this design, I'm not against mergint this patch.
@gavin-shr Are you still interested to land this patch? If so, please rebase with git master and update conflicts.
Thanks!