filter_lua: Add chunk mode for processing multiple records
This PR will introduce a chunk_mode for lua filter. It can be needed for use cases like parallelization (see lua lanes).
Please note that the lua functions will take only two arguments:
function process_records(tag, records)
if records and type(records) == "table" then
for i, record_row in ipairs(records) do
local timestamp = record_row.timestamp
local record = record_row.record
print("Timestamp entry:", timestamp.sec, timestamp.nsec)
print("Record entry:", record.message)
end
else
print("Error: Invalid 'records' table or nil")
end
return records
end
It's configuration looks like this:
[FILTER]
Name lua
Match my_logs
script lanes_example.lua
call process_records
chunk_mode On
time_as_table On
The returned table must be in the same format (table of timestamp and record pairs).
This mode currently only supports time_as_table by default and does always emit the returned records. There is no return code to be set.
A use case for this can be the parallel execution of lua filters by using the lua lanes library.
Please see example here (remember to install lua lanes first e.g. apt install luarocks && luarocks install lanes and check the path in the lanes_example.lua)
- fluent-bit.conf: https://gist.github.com/drbugfinder-work/63b6992247cfc3f865ee08f01c3d4b64
- lanes_example.lua: https://gist.github.com/drbugfinder-work/252bc103cd8efe6b0153242df8494978
Please see valgrind output:
- https://gist.github.com/drbugfinder-work/54eab5d992c902cc067a07e29d03839e
Documentation PR: https://github.com/fluent/fluent-bit-docs/pull/1310
Enter [N/A] in the box, if an item is not applicable to your change.
Testing Before we can approve your change; please submit the following in a comment:
- [x] Example configuration file for the change
- [x] Debug log output from testing the change
- [x] Attached Valgrind output that shows no leaks or memory corruption was found
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
- [N/A] Run local packaging test showing all targets (including any new ones) build.
- [N/A] Set
ok-package-testlabel to test for all targets (requires maintainer to do).
Documentation
- [x] Documentation required for this feature
Backporting
- [N/A] Backport to latest stable release.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
@tarruda would you take a look?
@drbugfinder-work Thanks for this PR.
I skimmed quickly through the PR and lua lanes documentation in order to understand the end goal, and I assume this is how everything will work:
- Chunked mode only difference is that it can pass multiple records at once to the lua callback.
- Lua lanes is a library that allows you to spread processing across multiple OS threads by spawning multiple Lua states (one for each thread), running the function in each thread and then returning the results to the main thread.
- Since the callback receives multiple records, your end goal is to split the records so that they would be processed in parallel.
(please let me know if I missed something):
With that in mind, I have some questions:
- How many records can fluent-bit pass to the Lua filter at once?
- Have you done any benchmarking with a big volume of records and compare with the traditional approach?
- Will Lua lanes create a new thread every time you ask it to run a function?
- How is the data shared between threads in lua lanes (is there serialization/deserialization to pass records across threads)?
The reason these questions are important is that this approach might have a lot of overhead and this adds quite a lot of complexity to the lua_filter implementation. Spawning threads is expensive, and only worth if you have a huge amount of data. If fluent-bit can never pass more than a few hundred records to the lua callback, I suspect splitting the workload across thread will make things even slower (this is why I suggest benchmarking).
Hi @tarruda,
you're absolutely right with that summary. The main goal is to parallelize record processing in Lua. However, there might be other use cases that could make use of processing more than one record at once within Lua (e.g. trend analysis).
I'm testing this in different environments, especially in K8s clusters with a couple hundreds instances of Fluent Bit. In high load scenarios, I can see chunks of up to around 300-500 records coming into the Lua filter per call. It fluctuates between 1 to 50 records per filter call under normal circumstances, depending on the log volume at the pipeline and Flush settings. However, only having one CPU core is no longer the limit, so now I can see up to 4 cores totally being utilized in our environment per Fluent Bit instance. Of course there is some overhead by using the Lua Lanes library and creating threads, etc. but it definitely increases our maximum throughput by roughly 200-300%, as seen in our benchmarks. The limiting factor is the main Fluent Bit pipeline again and not the Lua script anymore.
Please keep in mind, that this change to the plugin does not make a direct use of the Lua Lanes library - that is just one use case. The library has to be installed by the user, in case it should be used. The implementation on how to distribute the records within the Lua into different worker threads is up to the user. I've added a simple (non worker thread limited) example to the documentation to show how to create a thread for every incoming record (this would of course create a thread on the OS for every record - which might or might not be performing well). The user could use the Linda objects of the Lanes lib to share data between threads, if this is needed by the user.
I don't think that this adds much complexity to the lua filter plugin itself, as it just passes a table of records instead of a single record to Lua - but it gives you the option to make use of all the available data in that chunk.
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
Over the last months we noticed a memory leak (not detected by valgrind) by this change. Please do not merge until this is fixed.