PipelineBus.sendEvents locks on the sender PipelineOutput impacting overall throughput
Currently (up to 8.14.0 at the moment of writing), the PipelineBus class has a lock on the sender output:
public void sendEvents(final PipelineOutput sender,
final Collection<JrubyEventExtLibrary.RubyEvent> events,
final boolean ensureDelivery) {
if (events.isEmpty()) return; // This can happen on pipeline shutdown or in some other situations
synchronized (sender) {
// .....
addressesToInputs.forEach((address, addressState) -> {
// .....
PipelineInput input = addressState.getInput(); // Save on calls to getInput since it's volatile
// .....
lastResponse = input.internalReceive(clones);
internalReceive will call Queue.write that mainly does 3 steps:
public long write(Queueable element) throws IOException {
//...
byte[] data = element.serialize();
lock.lock();
this.headPage.write( .. );
This means that when there are multiple writers to the same pipeline and serialization + pagewrite take a long time, most threads will spend time waiting for 1 thread that is writing an event, which can be seen with the simple pipelines.yml:
- pipeline.id: source
config.string: "input { java_generator {} } output { pipeline { send_to => [dest1] } }"
- pipeline.id: dest
config.string: "input { pipeline { address => dest1 } } output { null {} }"
And queue.type: persisted in the logstash.yml. This will cause all but one of the workers of the upstream pipeline to be blocked at any given time:
❯ curl -s -XGET 'localhost:9600/_node/hot_threads?human=true&threads=30&stacktrace_size=10' | grep "thread name.*source.*worker" -A 1 | grep -v "\-\-"
12.17 % of cpu usage, state: blocked, thread name: '[source]>worker2', thread id: 54
app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
12.12 % of cpu usage, state: blocked, thread name: '[source]>worker5', thread id: 60
app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.96 % of cpu usage, state: blocked, thread name: '[source]>worker0', thread id: 51
app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.93 % of cpu usage, state: blocked, thread name: '[source]>worker4', thread id: 58
app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.89 % of cpu usage, state: runnable, thread name: '[source]>worker3', thread id: 56
[email protected]/jdk.internal.misc.Unsafe.unpark(Native Method)
11.84 % of cpu usage, state: blocked, thread name: '[source]>worker7', thread id: 63
app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.82 % of cpu usage, state: blocked, thread name: '[source]>worker6', thread id: 62
app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.66 % of cpu usage, state: blocked, thread name: '[source]>worker9', thread id: 68
app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.57 % of cpu usage, state: blocked, thread name: '[source]>worker1', thread id: 52
app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.48 % of cpu usage, state: blocked, thread name: '[source]>worker8', thread id: 65
app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
This was introduced by https://github.com/elastic/logstash/pull/10872 to ensure proper order during pipeline shutdown. However it should be possible to improve concurrency by having a readwritelock that allows read access to the sender object during event processing, but uses the write lock for every other operation.
Impact on 7.2.0 -> 7.2.1 can be easily observed. For the following pipelines.yaml:
- pipeline.id: source
config.string: "input { generator { count => 20000000 } } output { pipeline { send_to => destination } }"
- pipeline.id: destination
queue.type: persisted
config.string: "input { pipeline { address => destination } } output { null {} }"
we can observe the following numbers:
7.2.0 - 1m 46s
❯ curl -s localhost:9600/_node/stats/pipelines/destination | jq .pipelines.destination.events
{
"duration_in_millis": 771,
"filtered": 20000000,
"in": 20000000,
"out": 20000000,
"queue_push_duration_in_millis": 90072
}
/tmp/logstash-7.2.1
❯ curl -s localhost:9600/_node/stats/pipelines/source | jq .pipelines.source.events
{
"duration_in_millis": 866418,
"filtered": 20000000,
"in": 20000000,
"out": 20000000,
"queue_push_duration_in_millis": 7082
}
7.2.1 - 2m 38s
❯ curl -s localhost:9600/_node/stats/pipelines/destination | jq .pipelines.destination.events
{
"duration_in_millis": 565,
"filtered": 20000000,
"in": 20000000,
"out": 20000000
"queue_push_duration_in_millis": 3720,
}
/tmp/logstash-7.2.0
❯ curl -s localhost:9600/_node/stats/pipelines/source | jq .pipelines.source.events
{
"duration_in_millis": 1354787,
"filtered": 20000000,
"in": 20000000,
"out": 20000000
"queue_push_duration_in_millis": 12592,
}