[Meta] End to End ACKs / Queueless Mode
ROADMAP
- [ ] Add tests to E2E/Other Pipeline code
- [ ] Benchmark feature
- [ ] Create blog post
- [ ] Experimental feature flag in master / 6.x
One desirable property for Logstash would be the ability to operate in as stateless a mode as possible. One hindrance to this is the fact that with the memory queue, data is lost if the process crashes. With a persistent queue, data is lost if there is a permanent machine failure.
This is unavoidable with some input/output combos, such as raw TCP, where there is no system of acknowledgements. For others, however, like the beats (with client side spooling) or kafka inputs, or the elasticsearch or kafka outputs we can provide the same guarantees with no need to keep state on disk.
When using these technologies it would be nice to be able to have a third queueless option, that would ACK E2E, and be able to withstand a permanent Logstash machine failure. This would not be able to handle a permanent input client failure of course, since it depends on that for replay.
In this thread let's discuss approaches to implementing this future.
Key questions:
- How does the user experience change? What do users need to know? Do they need to tune Logstash differently? What is the UX here?
- Which internals need to change?
- Do we need to make any changes to the plugin API or any plugins to support these changes?
- What do we need from an input source to make it queueless?
I believe queueless will require a few major considerations:
An "obvious" PoC for me is that we give inputs an object satisfying the queue interface (so queue << event still works) but instead of queuing, it gives the event to a pipeline worker. Unfortunately, all (most?) inputs push one event at a time, and this means inputs will incur a full round-trip for every event.
The in-memory queue in Logstash was originally designed to allow inputs to push-one-at-a-time without waiting a pipeline worker to be ready. Queueless changes this model dramatically. My hunch is that queueless will either have serious performance problems due to one-at-a-time processing or we will have to make significant API changes[1] to the Inputs to enable this.
[1] the input api change can be backwards-compatible, but to avoid performance costs we'll need either batch processing or a futures model in order to reduce stalling.
For a prototype, forking the beats input is probably a good plugin to modify to test with any given proof-of-concept.
My hunch is that queueless will either have serious performance problems due to one-at-a-time processing or we will have to make significant API changes[1] to the Inputs to enable this.
++ It should be simple and quick to test the performance of a naive synchronous approach.
An issue with UX is also how to inform the user that, if they're using inputs that don't support acks they'll lose inflight data. Maybe the inputs base class will need to have some metadata about it's support for acks.
I've been running some forks locally that test a few simple e2e ack scenarios, initially using the beats input to send 100k apache log messages to elasticsearch via a fairly rudimentary grok filter - the same as is used in the benchmark cli.
- For a baseline, running Logstash with default queue and batching settings
- Naive queue implementation, where pushing a single event to a queue in the beats input will push that input through filters and the output directly, with the next event pushed when the previous event returns
- Forking the beats input to allow it to send a batch of events to a queue, and sending the batch through filters and outputs directly, with the next batch pushed when the previous batch returns
- Slightly amended version of above, where the batch from the input is sliced into
pipeline.batch.sizeevents. First version of this waits for the entire batch to be completed before pushing the next batch from the input.
Running this test, yields the following results: Baseline, 8 pipeline workers, batch size 125
$ curl localhost:9600/_node/stats | jq '.pipelines.main.plugins[][] | select(.events.duration_in_millis) | { name: .name, cost: (.events.duration_in_millis / .events.out) }'
...
{
"name": "geoip",
"cost": 0.09426
}
{
"name": "grok",
"cost": 0.13936
}
{
"name": "useragent",
"cost": 0.16678
}
{
"name": "date",
"cost": 0.02685
}
{
"name": "elasticsearch",
"cost": 1.12212
}
Wall clock time: 25.34s
Baseline, single pipeline worker
{
"name": "geoip",
"cost": 0.02503
}
{
"name": "grok",
"cost": 0.05447
}
{
"name": "useragent",
"cost": 0.05579
}
{
"name": "date",
"cost": 0.00483
}
{
"name": "elasticsearch",
"cost": 0.19864
}
Wall clock: 37.40s
Single event, synchronously processed:
{
"name": "geoip",
"cost": 0.00270802146233159
}
{
"name": "grok",
"cost": 0.0018838410172741494
}
{
"name": "useragent",
"cost": 0.038871040990362135
}
{
"name": "date",
"cost": 0.00018502009991085396
}
{
"name": "elasticsearch",
"cost": 2.0171062369642736
}
Wall clock time: 4 minutes 53 seconds
Forked beats input, processing entire batch:
{
"name": "geoip",
"cost": 0.03744
}
{
"name": "grok",
"cost": 0.0442
}
{
"name": "useragent",
"cost": 0.04865
}
{
"name": "date",
"cost": 0.0089
}
{
"name": "elasticsearch",
"cost": 0.16166
}
Wall clock time: 34.88s
Forked beats input, slicing into pipelines.batch.size:
{
"name": "geoip",
"cost": 0.03323
}
{
"name": "grok",
"cost": 0.03942
}
{
"name": "useragent",
"cost": 0.04719
}
{
"name": "date",
"cost": 0.00232
}
{
"name": "elasticsearch",
"cost": 0.20307
}
Wall Clock: 39.936s
@robbavey these are really promising results!
I think one big variable here is how does this perform for a non-local ES, where latency is higher.
The batching should amortize that cost some, but it'd be worth it to fire up some EC2 instances at some point to validate that. Or, to point your local box at Elastic Cloud.
How much flexibility do we have with ACKs with the beats protocol? Can we be eager with sending data (maybe using extra threads?) and conservative with ACKing to hide the increased latency with this approach?
@andrewvc Thats a good point - I'll set up an Elastic Cloud instance and run some tests through that, and I might also add toxiproxy to the mix to inject some controlled network latency/slowness to observe effects there.
Your question at the end is a good one - and is the next area I was planning on investigating.
Meeting notes summary
Next objective implement PoC
PoC Goals
- Just Beats Input to ES
- New queue type
Stretch Goal
- New capabilities API that lets Inputs/Outputs declare themselves safe for this new queue type
- Queue timeouts
I just noticed this ticket and I must say (As a user of logstash), that this is probably the number 1 feature that is missing from Logstash, and I'l be happy to help with making it be released as fast as possible.
@shaharmor thanks! It's in active development. One thing that would help us is if you could describe what your use case is and why this feature would be useful for you.
We'd also be glad to reach out once this is merged for beta testing :)
We are using the Kafka input with Elasticsearch output.
In some cases when errors occur and we have to restart logstash, the events that logstash currently process are lost for good.
Thanks for the feedback @shaharmor . That's one of the use cases we're targeting.
I have one question however, is Logstash exiting abnormally, or cleanly shutting down. LS shouldn't lose data in that scenario when cleanly shutting down.
What happens is that we use Logstash in the auto-scaling-group, so when there are more events to process, there are also more Logstash servers, and when there is less events, the group shrinks until only 1 Logstash server remains.
Its then possible that in case of output error (Elasticsearch is down for example), the Logstash server will go into retry mode where they keep retrying to push the events to Elasticsearch, waiting for it to come back online. But when they are in this phase, their CPU drops to 0% (They are basically idle), which means that the auto-scaling-group will think there isn't much work to do, so it will scale the group down, basically terminating the servers one at a time.
It does give Logstash some time to safely cleanup before actually terminating the server, but its only a few seconds or up to 1 minute, and if the downtime is much bigger than that, the events currently stored in Logstash will be gone.
Hey, has this seen any progress what so ever? For us this is a must-have feature
@shaharmor that's a great point. I really appreciate you bringing it up.
We still have this on our roadmap, but have been sidetracked by other projects like opening x-pack (which is done!) and fixing some PQ bugs.
The queueless mode would fix the data loss in your ASG scenario, so that's good to hear!
@andrewvc We're very interested in the E2E acks for some of our use-cases as well. Any updates or expected timeline?
Hey guys, I would really appreciate an update on this. Without this patch, I don't have any path to build a lossless, autoscaling logstash setup. If I scale down, persistent queues get orphaned and without persistent queues, any scale down with pipeline issue will result in message loss. Graceful shutdown doesn't help since most docker schedulers will only give a small time to exit.
Talked with a customer at ElasticON San Jose today that would benefit from this based on their current architecture.
They were using docker with autoscaling such that Logstash would periodically grow and shrink node counts, and the shrinking is causing them data loss due to certain factors:
- Sometimes Elasticsearch isn't responding quckly enough for Logstash to drain
- PQ draining takes a while
- And docker/autoscaling is killing the container after about 15-30 minutes of waiting for it to terminate gracefully.
Under these scenarios, making Logstash stateless-ish would help because a failed Logstash would, with cooperating input plugin (kafka, in this case?), resume what was aborted during the autoscaling shrink activity.
Is there any update on this?
What is the status on this?
I am really interested in this stateless queueless mode. But wouldn't it just suffice for mvp to make the current acknowledge of close_batch and its contained events(or metadata or id field) at the end of worker loop somehow available to input plugins so that they can listen for event_ids(they put into the events) and acknowledge them? This would push the external queue and acknowledge logic to the specific queue input plugins like sqs, redis(if stream support is added :)) and kafka.
I have a similiar use case to @shaharmor and noticed the same problems @ansoni mentioned of either orphaning persistent queues(and losing events) or losing events with the in memory queue. If either queue is not drained quickly enough or output is unavailable during autoscaling.
I made an attempt at making the end-to-end acknowledgement(queless type might actually be hard due to cross pipeline communication) on a fork https://github.com/rv2673/logstash/compare/master...acknowledgement_bus
- I created a bus where at the end of the workerloop the events in the inflight batch are acknowledged based on the presence of an acknowledgetoken in the event. That token(which contains a plugin generated id) is/can be added on event creation.
- Plugins can register themselves(which gives them a token generator). When they add token to an event they get notified when it reaches the end of a pipeline.
- It should even work with cross pipeline communication and the original input plugin is notified of the "clone" when crossing pipeline boundries so the plugin knows it might need to wait for another acknowledge to have the event fully processed.
- For plugins that clone an event in a pipeline(like the clone plugin) there is no need for notification, since the original event(that we want to have acknowledged) always reaches the end of the pipeline(though might be cancelled).
What I can't figure out however is to have a Java input Plugin create an event with an acknowledgetoken since it does not create event directly like ruby plugins do. Anybody have an idea?
I made an attempt at making the end-to-end acknowledgement
@rv2673 This is amazing :tada:. Can you open up a work-in-progress pull-request with your diff? We would love to work with you to get the effort across the finish line.
@yaauie Sorry I seem to have missed the notification of your comment. I haven't been sitting still, and have figured out a way for both the ruby and java plugins to use it without having a big impact on the interface. The problem was that I viewed the event class as the main object instead of an interface around a map based data structure. I have opened a wip pull request
@rv2673 great. We'll get someone from the team to review it and see where we can take it. It might take a couple weeks to find traction as those who have worked in this area have in-flight work targeting next week's 7.9 feature-freeze.
Note that the Ruby Execution Engine is going to be removed in a future major release, so if you hit blockages there (java_pipeline vs pipeline, et. al.), focus on the Java Execution Engine (ruby-based plugins are supported on both engines).
@yaauie Any updates on this? We are very interested in this. Will the PR created by @rv2673 be merged?
Any updates on this? Also, it there any clarification on why @rv2673 was not merged? Thanks