beats icon indicating copy to clipboard operation
beats copied to clipboard

TCP/UDP inputs should allow more concurrency under load

Open faec opened this issue 9 months ago • 9 comments

The TCP/UDP inputs run a server that reads and publishes events from hosts that connect. While multiple hosts can connect at once and run in parallel, there are two factors that limit the throughput from a connected host:

  • The work of publishing events is all done sequentially in the same goroutine as the network reader. This means that integrations and configurations that define processors are pausing their network connections to run them, even if the overall system load is low.
  • Regardless of the number of connections, all publish calls happen through the same pipeline client. This means that there is only one set of processors for all incoming client data, and those processors can only handle one event at a time.

These factors mean that the input itself can be the bottleneck for high-throughput clients, causing dropped events even when the overall CPU load is moderate and the queue and output are keeping up fine.

The inputs should instead perform the work of publishing on a different goroutine than reading network data, and should use a pool of clients to allow processors to handle multiple events in parallel.

faec avatar Apr 03 '25 20:04 faec

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

elasticmachine avatar Apr 03 '25 20:04 elasticmachine

Pinging @elastic/sec-deployment-and-devices (Team:Security-Deployment and Devices)

elasticmachine avatar Apr 04 '25 17:04 elasticmachine

@cmacknz - any update on this? This is an issue for a large customer.

defutek-tj avatar Jun 03 '25 17:06 defutek-tj

Sorry, we haven't been able to prioritize this yet.

cmacknz avatar Jun 03 '25 21:06 cmacknz

Edited the issue to reflect that it applies to the UDP input too (this comes up e.g. with the panw integration which can involve very high event volume).

faec avatar Jun 04 '25 22:06 faec

+1 I've had to hack around udp listener limitations even where output latency is tiny, Logstash gives options which improves this but aside from tuning rx_rmem_max, there's little that can currently be done on the Filebeat side...

adcleared avatar Jun 17 '25 10:06 adcleared

@faec @cmacknz are there any workarounds that would avoid the event drops?

I do think that we need to address the lack of concurrency as described by @faec but my immediate short-term concern is around dropping the events. I fear that splitting the ingress goroutine from the publishing one, whilst promising, will allow us to handle a higher throughput but we may still hit a limit.

nimarezainia avatar Jun 19 '25 01:06 nimarezainia

The way the input is implemented now cannot maximize use of the CPU of the machine it runs on.

The only mitigation I could think of would be to run multiple agents or beats behind a load balancer to horizontally scale out. That would be the infrastructure version of what the code should be doing. I expect horizontal scaling will always achieve higher throughput even after the optimization we need to do because it parallelizes the output in addition to the input pipeline.

cmacknz avatar Jun 19 '25 14:06 cmacknz

Load balancer and/or a message queue have been recommended. Clearly there's a going to be a high ingress rate at which an atomic unit (in this case Filebeat) just won't be able to handle - no matter how we change the architecture of the product (unless we insert a message bus inside the beat itself).

If this is happening over TCP, could we not tweak the window available to throttle the ingress traffic? if this is possible then we can figure out what our supported max throughput can be (we can improve on that by making some architecture changes as suggested here) and ensure that nothing is dropped if the rate goes above this point.

If the user wants to avoid drops, then they should move away from using UDP.

nimarezainia avatar Jun 20 '25 00:06 nimarezainia

After running many benchmarks, reviewing the code from my PR adding concurrency (specifically the version from 243c4fe60), I make some interesting discoveries.

All those analysis were made using the TCP input.

In the latest released version of Filebeat, the bottleneck is either the output or the input itself (reading from network), this is also true for the concurrent version. When using the discard output, it is clear the input itself is the bottleneck.

This becomes clear when looking at the goroutine blocking profile and the goroutine analysis from the trace, the publishing goroutines spend most of their time waiting for events to be read from the network.

Image Image

Once a really slow processor is added, then the gains of having the processors (which are part of the publish call) in different goroutines and scaling them becomes clearly noticeable.

To simulate this scenario I used the script processor calculating Fibonacci recursively:

      - script:
          lang: javascript
          source: >
            function fib(n) {
              if (n < 2) {
                return n;
              }
              else {
                return fib(n - 1) + fib(n - 2);
              }
            }
            function process(event) {
              event.Put("test", fib(16))
            }

Those are the goroutine blocking profile and the goroutine analysis from the trace with the slow script processor:

Image Image

With the slow script processor, we can see a clear gain in performance when using multiple goroutines to run the publish call.

Benchmark results

Suit name

  • Single goroutine is the latest released version. It does not support any concurrency on the input
  • Concurrent version is my PR's version

Benchmark name: Is auto generated by bench builder, the only variation is the number of workers. 0 means the latest release (v9.1.0`) that ignores the workers setting.

All versions/benchmarks were run with the same configuration, the only variation is the number of workers.

Configurations used

filebeat.yml with syslog processor

filebeat:
    inputs:
        - data_stream:
            dataset: tcp.generic
          host: localhost:4044
          id: tcp-1-workers
          index: logs-tcp.generic-default
          number_of_workers: 1 # That's the only effective change among benchmarks/versions
          processors:
            - add_fields:
                fields:
                    input_id: tcp-1-workers-preset-throughput
                target: '@metadata'
            - add_fields:
                fields:
                    dataset: tcp.generic
                    namespace: default
                    type: logs
                target: data_stream
            - add_fields:
                fields:
                    dataset: tcp.generic
                target: event
            - add_fields:
                fields:
                    stream_id: tcp-tcp.generic
                target: '@metadata'
            - add_fields:
                fields:
                    id: b51bc1c7-2d5c-4526-b9bf-616de0dc5a63
                    snapshot: false
                    version: 9.2.0
                target: elastic_agent
            - add_fields:
                fields:
                    id: b51bc1c7-2d5c-4526-b9bf-616de0dc5a63
                target: agent
            - syslog:
                field: message
                tag: syslog
          type: tcp
http.enabled: true
http.host: localhost
http.pprof.enabled: true
logging.info: info
logging.to_files: false
logging.to_stderr: true
name: 1-workers-preset-throughput
output:
    discard:
        enabled: true
filebeat.yml with script processor

filebeat:
    inputs:
        - data_stream:
            dataset: tcp.generic
          host: localhost:4044
          id: tcp-1-workers
          index: logs-tcp.generic-default
          number_of_workers: 1 # That's the only effective change among benchmarks/versions
          processors:
            - add_fields:
                fields:
                    input_id: tcp-1-workers-preset-throughput
                target: '@metadata'
            - add_fields:
                fields:
                    dataset: tcp.generic
                    namespace: default
                    type: logs
                target: data_stream
            - add_fields:
                fields:
                    dataset: tcp.generic
                target: event
            - add_fields:
                fields:
                    stream_id: tcp-tcp.generic
                target: '@metadata'
            - add_fields:
                fields:
                    id: b51bc1c7-2d5c-4526-b9bf-616de0dc5a63
                    snapshot: false
                    version: 9.2.0
                target: elastic_agent
            - add_fields:
                fields:
                    id: b51bc1c7-2d5c-4526-b9bf-616de0dc5a63
                target: agent
            - script:
                lang: javascript
                source: |
                    function fib(n) {
                      if (n < 2) {
                        return n;
                      }
                      else {
                        return fib(n - 1) + fib(n - 2);
                      }
                    } function process(event) {
                      event.Put("test", fib(16))
                    }
          type: tcp
http.enabled: true
http.host: localhost
http.pprof.enabled: true
logging.info: info
logging.to_files: false
logging.to_stderr: true
name: 1-workers-preset-throughput
output:
    discard:
        enabled: true
+--------------------------------------+--------------------+------------------------------+---------+-----------------+-------------+
| RACE ID                              | SUITE NAME         | BENCHMARK NAME               | VERSION | DURATION (SEC)  | EPS         |
+--------------------------------------+--------------------+------------------------------+---------+-----------------+-------------+
| d08273a1-e4de-42a8-9717-b055e25c3372 | Single goroutine   | 0-workers-throughput         | 9.1.0   | 3m8.261661079s  | 2656.000000 |
| ebb5e053-4ba0-4afd-8cda-4291c3c4aace | Concurrent version | 1-workers-preset-throughput  | 9.2.0   | 3m11.264531666s | 2614.000000 |
| baaf7f03-237d-4101-b395-4eb2f6aaa884 | Concurrent version | 4-workers-preset-throughput  | 9.2.0   | 1m14.192704498s | 6739.000000 |
| 6b9db306-284c-49c3-a040-af8c0b2e3377 | Concurrent version | 8-workers-preset-throughput  | 9.2.0   | 56.044595021s   | 8921.000000 |
| c2f98bb5-d480-4f3b-8913-6b42104cf76e | Concurrent version | 16-workers-preset-throughput | 9.2.0   | 59.140696792s   | 8454.000000 |
+--------------------------------------+--------------------+------------------------------+---------+-----------------+-------------+

The time benchbuilder took to write the events to the network connection, for each run is:

[d08273a1::filebeat::0-workers-throughput]         - 2025/07/29 11:28:19 Streamer completed sending events 500000 took 3m7.460732778s with avg speed 2667.225251
[ebb5e053::filebeat::1-workers-preset-throughput]  - 2025/07/29 11:31:53 Streamer completed sending events 500000 took 3m10.59331749s with avg speed 2623.386835
[baaf7f03::filebeat::4-workers-preset-throughput]  - 2025/07/29 11:33:30 Streamer completed sending events 500000 took 1m13.845074415s with avg speed 6770.932306
[6b9db306::filebeat::8-workers-preset-throughput]  - 2025/07/29 11:34:44 Streamer completed sending events 500000 took 51.243569948s with avg speed 9757.321758
[c2f98bb5::filebeat::16-workers-preset-throughput] - 2025/07/29 11:36:09 Streamer completed sending events 500000 took 56.685431643s with avg speed 8820.608497

The dataset was generated using flog, the format is rfc5424, each entry is about 2.5kb, the test file contains 500 000 entries and 12Gb of data.

belimawr avatar Jul 29 '25 16:07 belimawr

thanks @belimawr

What does " 0 worker" mean in the Single goroutine example?
is it possible to use the "1-workers-preset-throughput" with the latest release (Single goroutine) so that we have a like for like comparison against the your changes?

nimarezainia avatar Jul 29 '25 23:07 nimarezainia

The "0 worker" is actually the latest release, it ignores the worker setting. I ended using "0-worker" to keep the naming pattern of the files generated by benchbuilder.

The latest release was tested with the same settings as the PR version. They all used the same settings, the only variation was the number of workers.

belimawr avatar Jul 30 '25 12:07 belimawr

Sorry if my post wasn't very clear 🙈. I was under the weather yesterday, I wanted to share the results but it seems it wasn't as clear as it should have 😅 .

I'll clarify it and add some other interesting facts.

belimawr avatar Jul 30 '25 12:07 belimawr

More as a curiosity, here is the goroutine analysis without the syslog processor. We can see that there is about 0.6s reduction in the goroutine execution time.

Image

belimawr avatar Jul 30 '25 14:07 belimawr

As a last curiosity, I was looking the metrics from the script processor on some of my benchmark runs, there is some variance between the metrics for the concurrent version and the latest release, the number of samples is also different, probably due to the way they're collected.

I didn't dig/experiment to understand the difference in performance and sampling of the script processor, but it might explain why the performance gain with the concurrent version is not as high as we'd might have expected.

Filebeat v9.1.0 (latest release) Data samples: 500 000

Image
raw data

        "histogram": {
          "process_time": {
            "count": 500000,
            "max": 1619457,
            "mean": 370724.67724609375,
            "median": 357279.5,
            "min": 287678,
            "p75": 406715.5,
            "p95": 440704.05,
            "p99": 496952.79,
            "p999": 935834.1730000019,
            "stddev": 57819.725833032506
          }
        }

PR Version with 8 workers Data samples: 62 332

Image
raw data

{
  "histogram": {
    "process_time": {
      "count": 62332,
      "max": 5458225,
      "mean": 903002.5698242188,
      "median": 704923.5,
      "min": 458856,
      "p75": 1176233.5,
      "p95": 1544709.8499999999,
      "p99": 2606650.8899999997,
      "p999": 4213725.623000011,
      "stddev": 434795.71438284294
    }
  }
}

belimawr avatar Jul 30 '25 15:07 belimawr

The latest release was tested with the same settings as the PR version. They all used the same settings, the only variation was the number of workers.

@belimawr so sounds like Single goroutine:0-workers-throughput has the same setting as Concurrent version:1-workers-preset-throughput - is that correct? even the worker count is the same?

There doesn't seem to be an EPS improvement at all when comparing the current version (Single goroutine) to the PR version (Concurrent version). That's the bit I'm trying to understand.

nimarezainia avatar Jul 31 '25 02:07 nimarezainia

@belimawr so sounds like Single goroutine:0-workers-throughput has the same setting as Concurrent version:1-workers-preset-throughput - is that correct? even the worker count is the same?

Yes, but the code is different. Concurrent version:1-workers-preset-throughput has the input ingestion (reading from the network) running on a different goroutine than the publishing call. The goal of having it is to know the performance with the default settings/compare it with the currently released code.

Comparing Single goroutine:0-workers-throughput and Concurrent version:1-workers-preset-throughput shows us that with the default settings ("1 worker"), the performance is effectively the same as the last released version, so users upgrading won't have a performance downgrade if they keep the default settings.

belimawr avatar Aug 04 '25 12:08 belimawr

Comparing Single goroutine:0-workers-throughput and Concurrent version:1-workers-preset-throughput shows us that with the default settings ("1 worker"), the performance is effectively the same as the last released version, so users upgrading won't have a performance downgrade if they keep the default settings.

the goal is to improve performance which sounds like can only happen when worker is greater than 1. As you mentioned with worker being set to 1, the new code doesn't show a performance degradation. So we probably need to see what Single goroutine:X-workers-throughputlooks like where X is 4, 8 or 16 and have that result compared to your changes.

nimarezainia avatar Aug 05 '25 02:08 nimarezainia

the goal is to improve performance which sounds like can only happen when worker is greater than 1.

Yes, that is true. Another caveat is that for the performance to be improved, we also need "slow enough" processors otherwise we're bound by how fast the input can read events from the network.

The premise of this issue is: Processors defined in the input configuration are slow enough that they hinder the throughput.

The TCP & UDP inputs publish metrics and my PR decouples the metrics for receiving and processing events, making it easier to reason whether running the processors might be the bottleneck. So if processing_time is getting larger than arrival_period, then increasing concurrency will help (given that there are no other bottlenecks).

So we probably need to see what Single goroutine:X-workers-throughput looks like where X is 4, 8 or 16 and have that result compared to your changes.

That's not possible. The Single goroutine version is the currently released version (well, any version before my PR gets merged/released) and they do not support any concurrency in the input. So no matter the value of X in Single goroutine:X-workers-throughput, the results will be exactly the same as Single goroutine: 0-workers-throughput because the input workers configuration is just ignored by the versions that do not support concurrency.

belimawr avatar Aug 05 '25 13:08 belimawr

@belimawr Could you please provide some user facing documentation on how a user can take advantage of the changes you have made here in order to increase the Filebeat throughput? is there a config that needs to be applied? it would be good to have this documented in our public docs so that folks are aware and can take advantage of your enhancement. thanks

nimarezainia avatar Aug 11 '25 01:08 nimarezainia

We have the new setting documented in the input docs and a mention in the changelog. I'll update them to be more explicit that increasing the number of workers increase performance.

belimawr avatar Aug 11 '25 14:08 belimawr

@nimarezainia what do you think: https://github.com/elastic/beats/pull/45891?

You can see the rendered docs:

  • https://docs-v3-preview.elastic.dev/elastic/beats/pull/45891/reference/filebeat/filebeat-input-tcp
  • https://docs-v3-preview.elastic.dev/elastic/beats/pull/45891/reference/filebeat/filebeat-input-udp

belimawr avatar Aug 11 '25 15:08 belimawr