logstash icon indicating copy to clipboard operation
logstash copied to clipboard

[Meta] End to End ACKs / Queueless Mode

Open andrewvc opened this issue 8 years ago • 25 comments

ROADMAP

  • [ ] Add tests to E2E/Other Pipeline code
  • [ ] Benchmark feature
  • [ ] Create blog post
  • [ ] Experimental feature flag in master / 6.x

One desirable property for Logstash would be the ability to operate in as stateless a mode as possible. One hindrance to this is the fact that with the memory queue, data is lost if the process crashes. With a persistent queue, data is lost if there is a permanent machine failure.

This is unavoidable with some input/output combos, such as raw TCP, where there is no system of acknowledgements. For others, however, like the beats (with client side spooling) or kafka inputs, or the elasticsearch or kafka outputs we can provide the same guarantees with no need to keep state on disk.

When using these technologies it would be nice to be able to have a third queueless option, that would ACK E2E, and be able to withstand a permanent Logstash machine failure. This would not be able to handle a permanent input client failure of course, since it depends on that for replay.

In this thread let's discuss approaches to implementing this future.

Key questions:

  1. How does the user experience change? What do users need to know? Do they need to tune Logstash differently? What is the UX here?
  2. Which internals need to change?
  3. Do we need to make any changes to the plugin API or any plugins to support these changes?
  4. What do we need from an input source to make it queueless?

andrewvc avatar Oct 23 '17 20:10 andrewvc

I believe queueless will require a few major considerations:

An "obvious" PoC for me is that we give inputs an object satisfying the queue interface (so queue << event still works) but instead of queuing, it gives the event to a pipeline worker. Unfortunately, all (most?) inputs push one event at a time, and this means inputs will incur a full round-trip for every event.

The in-memory queue in Logstash was originally designed to allow inputs to push-one-at-a-time without waiting a pipeline worker to be ready. Queueless changes this model dramatically. My hunch is that queueless will either have serious performance problems due to one-at-a-time processing or we will have to make significant API changes[1] to the Inputs to enable this.

[1] the input api change can be backwards-compatible, but to avoid performance costs we'll need either batch processing or a futures model in order to reduce stalling.

jordansissel avatar Oct 24 '17 17:10 jordansissel

For a prototype, forking the beats input is probably a good plugin to modify to test with any given proof-of-concept.

jordansissel avatar Oct 24 '17 18:10 jordansissel

My hunch is that queueless will either have serious performance problems due to one-at-a-time processing or we will have to make significant API changes[1] to the Inputs to enable this.

++ It should be simple and quick to test the performance of a naive synchronous approach.

An issue with UX is also how to inform the user that, if they're using inputs that don't support acks they'll lose inflight data. Maybe the inputs base class will need to have some metadata about it's support for acks.

jsvd avatar Nov 01 '17 11:11 jsvd

I've been running some forks locally that test a few simple e2e ack scenarios, initially using the beats input to send 100k apache log messages to elasticsearch via a fairly rudimentary grok filter - the same as is used in the benchmark cli.

  • For a baseline, running Logstash with default queue and batching settings
  • Naive queue implementation, where pushing a single event to a queue in the beats input will push that input through filters and the output directly, with the next event pushed when the previous event returns
  • Forking the beats input to allow it to send a batch of events to a queue, and sending the batch through filters and outputs directly, with the next batch pushed when the previous batch returns
  • Slightly amended version of above, where the batch from the input is sliced into pipeline.batch.size events. First version of this waits for the entire batch to be completed before pushing the next batch from the input.

Running this test, yields the following results: Baseline, 8 pipeline workers, batch size 125

$ curl localhost:9600/_node/stats | jq '.pipelines.main.plugins[][] | select(.events.duration_in_millis) | { name: .name, cost: (.events.duration_in_millis / .events.out) }'
 ...
{
  "name": "geoip",
  "cost": 0.09426
}
{
  "name": "grok",
  "cost": 0.13936
}
{
  "name": "useragent",
  "cost": 0.16678
}
{
  "name": "date",
  "cost": 0.02685
}
{
  "name": "elasticsearch",
  "cost": 1.12212
}

Wall clock time: 25.34s

Baseline, single pipeline worker

{
  "name": "geoip",
  "cost": 0.02503
}
{
  "name": "grok",
  "cost": 0.05447
}
{
  "name": "useragent",
  "cost": 0.05579
}
{
  "name": "date",
  "cost": 0.00483
}
{
  "name": "elasticsearch",
  "cost": 0.19864
}

Wall clock: 37.40s

Single event, synchronously processed:

{
  "name": "geoip",
  "cost": 0.00270802146233159
}
{
  "name": "grok",
  "cost": 0.0018838410172741494
}
{
  "name": "useragent",
  "cost": 0.038871040990362135
}
{
  "name": "date",
  "cost": 0.00018502009991085396
}
{
  "name": "elasticsearch",
  "cost": 2.0171062369642736
}

Wall clock time: 4 minutes 53 seconds

Forked beats input, processing entire batch:

{
  "name": "geoip",
  "cost": 0.03744
}
{
  "name": "grok",
  "cost": 0.0442
}
{
  "name": "useragent",
  "cost": 0.04865
}
{
  "name": "date",
  "cost": 0.0089
}
{
  "name": "elasticsearch",
  "cost": 0.16166
}

Wall clock time: 34.88s

Forked beats input, slicing into pipelines.batch.size:

{
  "name": "geoip",
  "cost": 0.03323
}
{
  "name": "grok",
  "cost": 0.03942
}
{
  "name": "useragent",
  "cost": 0.04719
}
{
  "name": "date",
  "cost": 0.00232
}
{
  "name": "elasticsearch",
  "cost": 0.20307
}

Wall Clock: 39.936s

robbavey avatar Nov 07 '17 18:11 robbavey

@robbavey these are really promising results!

I think one big variable here is how does this perform for a non-local ES, where latency is higher.

The batching should amortize that cost some, but it'd be worth it to fire up some EC2 instances at some point to validate that. Or, to point your local box at Elastic Cloud.

How much flexibility do we have with ACKs with the beats protocol? Can we be eager with sending data (maybe using extra threads?) and conservative with ACKing to hide the increased latency with this approach?

andrewvc avatar Nov 08 '17 14:11 andrewvc

@andrewvc Thats a good point - I'll set up an Elastic Cloud instance and run some tests through that, and I might also add toxiproxy to the mix to inject some controlled network latency/slowness to observe effects there.

Your question at the end is a good one - and is the next area I was planning on investigating.

robbavey avatar Nov 08 '17 14:11 robbavey

Meeting notes summary

Next objective implement PoC

PoC Goals

  • Just Beats Input to ES
  • New queue type

Stretch Goal

  • New capabilities API that lets Inputs/Outputs declare themselves safe for this new queue type
  • Queue timeouts

andrewvc avatar Nov 20 '17 18:11 andrewvc

I just noticed this ticket and I must say (As a user of logstash), that this is probably the number 1 feature that is missing from Logstash, and I'l be happy to help with making it be released as fast as possible.

shaharmor avatar Feb 19 '18 12:02 shaharmor

@shaharmor thanks! It's in active development. One thing that would help us is if you could describe what your use case is and why this feature would be useful for you.

We'd also be glad to reach out once this is merged for beta testing :)

andrewvc avatar Feb 19 '18 17:02 andrewvc

We are using the Kafka input with Elasticsearch output.

In some cases when errors occur and we have to restart logstash, the events that logstash currently process are lost for good.

shaharmor avatar Feb 20 '18 09:02 shaharmor

Thanks for the feedback @shaharmor . That's one of the use cases we're targeting.

I have one question however, is Logstash exiting abnormally, or cleanly shutting down. LS shouldn't lose data in that scenario when cleanly shutting down.

andrewvc avatar Feb 21 '18 01:02 andrewvc

What happens is that we use Logstash in the auto-scaling-group, so when there are more events to process, there are also more Logstash servers, and when there is less events, the group shrinks until only 1 Logstash server remains.

Its then possible that in case of output error (Elasticsearch is down for example), the Logstash server will go into retry mode where they keep retrying to push the events to Elasticsearch, waiting for it to come back online. But when they are in this phase, their CPU drops to 0% (They are basically idle), which means that the auto-scaling-group will think there isn't much work to do, so it will scale the group down, basically terminating the servers one at a time.

It does give Logstash some time to safely cleanup before actually terminating the server, but its only a few seconds or up to 1 minute, and if the downtime is much bigger than that, the events currently stored in Logstash will be gone.

shaharmor avatar Feb 21 '18 08:02 shaharmor

Hey, has this seen any progress what so ever? For us this is a must-have feature

shaharmor avatar May 16 '18 07:05 shaharmor

@shaharmor that's a great point. I really appreciate you bringing it up.

We still have this on our roadmap, but have been sidetracked by other projects like opening x-pack (which is done!) and fixing some PQ bugs.

The queueless mode would fix the data loss in your ASG scenario, so that's good to hear!

andrewvc avatar May 16 '18 16:05 andrewvc

@andrewvc We're very interested in the E2E acks for some of our use-cases as well. Any updates or expected timeline?

dominicbecker avatar Dec 22 '18 00:12 dominicbecker

Hey guys, I would really appreciate an update on this. Without this patch, I don't have any path to build a lossless, autoscaling logstash setup. If I scale down, persistent queues get orphaned and without persistent queues, any scale down with pipeline issue will result in message loss. Graceful shutdown doesn't help since most docker schedulers will only give a small time to exit.

ansoni avatar Oct 17 '19 21:10 ansoni

Talked with a customer at ElasticON San Jose today that would benefit from this based on their current architecture.

They were using docker with autoscaling such that Logstash would periodically grow and shrink node counts, and the shrinking is causing them data loss due to certain factors:

  • Sometimes Elasticsearch isn't responding quckly enough for Logstash to drain
  • PQ draining takes a while
  • And docker/autoscaling is killing the container after about 15-30 minutes of waiting for it to terminate gracefully.

Under these scenarios, making Logstash stateless-ish would help because a failed Logstash would, with cooperating input plugin (kafka, in this case?), resume what was aborted during the autoscaling shrink activity.

jordansissel avatar Oct 17 '19 21:10 jordansissel

Is there any update on this?

shaharmor avatar Mar 12 '20 14:03 shaharmor

What is the status on this?

I am really interested in this stateless queueless mode. But wouldn't it just suffice for mvp to make the current acknowledge of close_batch and its contained events(or metadata or id field) at the end of worker loop somehow available to input plugins so that they can listen for event_ids(they put into the events) and acknowledge them? This would push the external queue and acknowledge logic to the specific queue input plugins like sqs, redis(if stream support is added :)) and kafka.

I have a similiar use case to @shaharmor and noticed the same problems @ansoni mentioned of either orphaning persistent queues(and losing events) or losing events with the in memory queue. If either queue is not drained quickly enough or output is unavailable during autoscaling.

rv2673 avatar May 11 '20 20:05 rv2673

I made an attempt at making the end-to-end acknowledgement(queless type might actually be hard due to cross pipeline communication) on a fork https://github.com/rv2673/logstash/compare/master...acknowledgement_bus

  • I created a bus where at the end of the workerloop the events in the inflight batch are acknowledged based on the presence of an acknowledgetoken in the event. That token(which contains a plugin generated id) is/can be added on event creation.
  • Plugins can register themselves(which gives them a token generator). When they add token to an event they get notified when it reaches the end of a pipeline.
  • It should even work with cross pipeline communication and the original input plugin is notified of the "clone" when crossing pipeline boundries so the plugin knows it might need to wait for another acknowledge to have the event fully processed.
  • For plugins that clone an event in a pipeline(like the clone plugin) there is no need for notification, since the original event(that we want to have acknowledged) always reaches the end of the pipeline(though might be cancelled).

What I can't figure out however is to have a Java input Plugin create an event with an acknowledgetoken since it does not create event directly like ruby plugins do. Anybody have an idea?

rv2673 avatar Jun 26 '20 22:06 rv2673

I made an attempt at making the end-to-end acknowledgement

@rv2673 This is amazing :tada:. Can you open up a work-in-progress pull-request with your diff? We would love to work with you to get the effort across the finish line.

yaauie avatar Jun 29 '20 16:06 yaauie

@yaauie Sorry I seem to have missed the notification of your comment. I haven't been sitting still, and have figured out a way for both the ruby and java plugins to use it without having a big impact on the interface. The problem was that I viewed the event class as the main object instead of an interface around a map based data structure. I have opened a wip pull request

rv2673 avatar Jul 06 '20 20:07 rv2673

@rv2673 great. We'll get someone from the team to review it and see where we can take it. It might take a couple weeks to find traction as those who have worked in this area have in-flight work targeting next week's 7.9 feature-freeze.

Note that the Ruby Execution Engine is going to be removed in a future major release, so if you hit blockages there (java_pipeline vs pipeline, et. al.), focus on the Java Execution Engine (ruby-based plugins are supported on both engines).

yaauie avatar Jul 09 '20 05:07 yaauie

@yaauie Any updates on this? We are very interested in this. Will the PR created by @rv2673 be merged?

vmohanan1 avatar Nov 12 '21 14:11 vmohanan1

Any updates on this? Also, it there any clarification on why @rv2673 was not merged? Thanks

ossie-git avatar Nov 19 '25 12:11 ossie-git