mimir icon indicating copy to clipboard operation
mimir copied to clipboard

Consider using `memberlist` for distributor's HA-tracker

Open colega opened this issue 2 years ago • 18 comments

Is your feature request related to a problem? Please describe.

Currently the HA-tracker feature of the Distributor requires having an etcd cluster, which isn't a small requirement.

Describe the solution you'd like

I would like to use memberlist, if possible, for HA-tracker.

Describe alternatives you've considered

Disabling HA-tracker.

Additional context

This was discussed on the first Mimir Community Call, on Mar 31st 2022, and @pstibrany raised some concerns about memberlist propagation delay, so it's not only about switching the implementation, but actually about having confidence that memberlist is good enough. cc @pracucci & @bboreham who also participated in the conversation.

colega avatar Mar 31 '22 16:03 colega

I would like to better understand why a very quick propagation is required for the HA tracker. Given the HA tracker triggers a failover once we're not receiving any sample from the elected replica since X seconds, even if the propagation of the new elected replica is "slow" (few seconds) it shouldn't matter much.

pracucci avatar Apr 01 '22 08:04 pracucci

Maybe it'd be a good idea to use Kubernetes integrated leader-election?

I stumbled across this just now while trying to deploy Mimir with Tanka. I'm on k8s 1.23 already in my homely so I had to port the ETCD Operator CRDs to v1 etc - not optimal :)

xvzf avatar Apr 02 '22 14:04 xvzf

There are two values stored in KV store that need to be synchronized between distributors

  • Current elected replica (replica field)
  • received_at

If distributors have different view on these values, it's easy to get into situation when different distributors make different decisions.

For example if one distributor doesn't see up-to-date received_at, and decides to make a switch to different replica. It then writes the decision back to KV store (with updated both received_at and replica), but in the meantime other distributor seeing previous replica and up-to-date received_at will happily ingest data from previous replica.

If this other distributor also updates received_at and writes it back to KV store, now we have conflicting updates in the gossip network. How are we going to deal with them?

pstibrany avatar Apr 07 '22 12:04 pstibrany

For example if one distributor doesn't see up-to-date received_at, and decides to make a switch to different replica

With the default config, the received_at is updated every 15s while the failover timeout is 30s. So, it means it has to take more than 15s to propagate the changes, right? If my math is correct then the question is whether with memberlist we can guarantee a propagation within 15s in a large cluster.

If this other distributor also updates received_at and writes it back to KV store, now we have conflicting updates in the gossip network. How are we going to deal with them?

Given I agree we need to handle conflicts (and we need to be prepared for them in edge cases), the conflict here could be solved picking the most recent received_at?

Even if we have split brain for a short period of time, what could really happen? We ingest samples from both replicas and some of them will fail because OOO or we could end up with data loss? Still thinking about the data loss...

pracucci avatar Apr 07 '22 15:04 pracucci

Given I agree we need to handle conflicts (and we need to be prepared for them in edge cases), the conflict here could be solved picking the most recent received_at?

Yes. Most important thing is to get all nodes agreeing on the current value, taking replica with most recent received_at will work as long as received_at is not the same. If it is the same, we can take "lower" replica (ie. "a" < "b").

pstibrany avatar Apr 22 '22 10:04 pstibrany

Even if we have split brain for a short period of time, what could really happen? We ingest samples from both replicas and some of them will fail because OOO or we could end up with data loss? Still thinking about the data loss...

I don't see how we would lose data (but me not seeing it doesn't mean anything). I think the problem is that we can ingest different subsets of series from conflicting requests.

For example, replicas A and B send request at the same time, and both are ingested (due to propagation delay). Both requests have some histogram data.

Some series for histogram (few buckets) are accepted from replica A, and some other series for the same histogram (other buckets, sum, count) are accepted from replica B. If these histogram data wasn't scraped at exact same time, end result may be that we will store inconsistent "snapshot" of histogram. I wonder if this can lead to wrong results being returned by PromQL.

pstibrany avatar Apr 22 '22 10:04 pstibrany

So, it means it has to take more than 15s to propagate the changes, right?

There is some jitter on how each node reacts to the times; this defaults to 5 seconds max so probably "more than 10s" is more correct.

But I think your broad point is in the right direction: we expect propagation latencies to be a lot lower than this.

bboreham avatar Apr 25 '22 13:04 bboreham

We ingest samples from both replicas and some of them will fail because OOO

The worst case scenario is that fast moving counters can have a lower value for a higher timestamp. i.e, one Prometheus sends a sample (ts, val): (1000, 5000) and another Prometheus sends (1001, 4999), this is a very unlikely scenario but with fast moving counters and given Prometheus' assign the timestamp, it could happen.

But I think this is a non-issue because Prometheus HA Pairs don't scrape at the same time but rather based on the hostname and external labels: https://github.com/prometheus/prometheus/blob/fc3f1b8ded810a1263f72d37db9db4e8ce726120/scrape/manager.go#L260-L262

we could end up with data loss

We absolutely could tbh. Lets say we have a scenario with P1 and P2 and P1 is the leader. And assume that the data volume is low. Lets say we have 5 distributors (d0-d4)

P1 is 10mins behind (for whatever reason) and P2 is upto date. d1 sees a sample and updates the received_at, but d4 doesn't see it yet and maybe it doesn't receive a sample for the last 30s and it accepts a sample from P2. Now even if it later gets the info that P1 is still the leader, the damage will be done and there will be a dataloss of 10mins cuz P1 is behind.

In a SaaS environment there could be tenants scraping just one or two targets, or sending a low enough volume of metrics that its possible that only a subset of distributors get samples every 30s. Whether for such a low volume, we should support HA deduping is another question.

gouthamve avatar Apr 25 '22 14:04 gouthamve

We absolutely could tbh. Lets say we have a scenario with P1 and P2 and P1 is the leader. And assume that the data volume is low. Lets say we have 5 distributors (d0-d4)

P1 is 10mins behind (for whatever reason) and P2 is upto date..

Whenever he leader is behind, switching to a leader will cause data loss. Yes, it's more likely to happen with memberlist's high latency when data volume is low, but it's also less likely that someone with low data volume actually uses HA.

colega avatar Jun 02 '22 07:06 colega

I had two ideas:

  • I think we may be able to get reliable state replication with memberlist (<15 sec). For some perspective, hashicorp managed to get full replication (dissemination they call it) (see Table V) within 22s in a cluster of 120 nodes and a gossip interval of 1s. Unfortunately they don't give a formula. Mimir's default gossip interval is 200ms. I think we should experiment with this to get an idea of how fast we can replicate a value.
    • Maybe we should have a dedicated memberlist cluster for distributors to reduce the number of nodes and reduce dissemination period?
  • If we want a fast and consistent KV-store, I don't think memberlist will give us that (edit: well, Consul does that, but we're not Consul). But one of the distributors can act as a rudimentary one (e.g. GetReplica, ObserveReplicaReceivedAt). We can use the ring to choose a distributor as a leader (RF=0, find owner of token=1). The GEM admin-api uses a similar approach. This still leaves the problem of replication speed and split brain when choosing a leader. But it works reliably in the average case when the cluster is healthy.

dimitarvdimitrov avatar Jul 15 '22 09:07 dimitarvdimitrov

I think we may be able to get reliable state replication with memberlist (<15 sec)

We should do a real measurement with our settings.

I think we could write a simple app to measure the dissemination period. For example, add a testing key, edit the value in 1 replica, then poll all other replicas to check what's the value reported by them and measure the time it takes to propagate the change across replicas. Then repeat it continuously, and track some metrics, with 1/10/100/1K/10K nodes.

Maybe we should have a dedicated memberlist cluster for distributors to reduce the number of nodes and reduce dissemination period?

I find it easier to reasoning with only 1 memberlist cluster composed by all Mimir replicas. To keep it simple (which is the goal of this issue), I would discourage having a distributors-only memberlist cluster.

But one of the distributors can act as a rudimentary one

We want high availability for HA tracker. When a distributor is restarted or crashes, we don't want to loose the HA leader information.

pracucci avatar Jul 15 '22 12:07 pracucci

Probably I'm going to say something stupid, but...

Can we use ingesters somehow to store this data? Data in the ingesters is naturally replicated, and reads after writes are guarranteed. Reading 1 series per write should not be a big deal if we properly chose the matchers (use a label that has few values, pointing to couple of series) and we can also cache that data for a while (there's no need to read in each write really).

Can we maybe store a timeseries with latest push from each cluster (that's 1 sample per push, can be ignored) and somehow take the unanimous decision in the distributor based on that?

When the desision isn't clear (too long since last push from the HA leader) we can use the ingesters as the decision mechanism (since they won't overwrite the samples unless OOO is enabled).

colega avatar Jul 15 '22 14:07 colega

Can we use ingesters somehow to store this data?

Dimitar mentioned it too in a private conversation. I think we're just giving for granted memberlist wouldn't work, but before building a more complicated system I would like to see a proof on why memberlist wouldn't work. So far I haven't seen it, and I think we didn't even pushed hard to get a fast memberlist propagation (which would benefit hash ring too).

The benefit of using memberlist is that distributors would have just to lookup the replica in-memory, with no sort of blocking call to do if the information is missing / stale.

pracucci avatar Jul 15 '22 15:07 pracucci

sorry for the delayed update: I could not spend too much time on this so far and it looks like I won't be able to spend much more time on it soon. This is a summary of my findings:

What I was trying to find out: At how many nodes do KV store updates propagate in more than 15 seconds when using memberlist and how to improve that.

What I did: Run a cluster between 100 and 1000 memberlist nodes. I wrote a small binary that propagates timestamps, using the memberlist-backed KV store in dskit. In the setup only a single one of the N nodes was sending updates (leader); the idea is that this approximates the worst-case scenario when we get a single send batch by a client and need to update the timestamp across all distributors (in practice we have to update the value in all memberlist members). I ran the memberlist nodes as pods on kubernetes. Updates were sent every 15 seconds because this is what the Prometheus scrape interval was for my pods.

What I measured: basically two main things: the age of the value when it appears in a member (latency) and the age the value it replaces (staleness). The reasoning for the second one is that an update can get lost and the next update can come in time; measuring the staleness of the replaced value solves this. I have a dashboard that visualizes these metrics along with things like cluster size, memberlist message queue size, compute resource utilization. Grafana employees can find the "Memberlist HA" dashboard; I've attached the dashboard JSON below.

Findings:

  • At rest propagation time for all cluster sizes is super stable and is well below 2 seconds. The problem is with rollouts.
  • During rollouts even at 100 members latency increased to over 20 seconds (even when I had multiple leaders by mistake). The first thing that seemed to help was reducing -memberlist.packet-write-timeout and -memberlist.packet-dial-timeout to a value around the gossip interval (200ms). See graph 1.
  • After Marco suggested https://github.com/grafana/dskit/issues/192, I tried a simple version of it and it made significant improvements during rollouts. See graph 2. There are still some spikes to 19 seconds. These seem to be happening when a node joins the cluster. I think the perceived latency happens because the joining node does a push/pull with a node that still hasn't received the last update, so the new node joins and records a stale update as the new update.
  • Staleness and latency usually stay together - when one worsens, the other worsens too
  • CPU throttling and memory start becoming a problem when scaling to 500 and 1000 nodes; using only 50Mi RAM and 50m CPU wasn't enough and pods started getting severely throttled and OOM-killed; I had to scale vertically, which in turn caused delays in the rollouts due to cluster auto-scaling. I increased to 400Mi and 250m, but also saw throttling when trying to scale beyond 1000 nodes.
    • A symptom of this was that nodes were dropping incoming messages. Log lines contained handler queue full, dropping message (3) from=10.68.10.23:7946. Default queue size is 1024 (HandoffQueueDepth), which should be enough.

My open questions:

  • what happens when we increase the size of messages? so far I was sending a unix millisecond timestamp as a string; that's 13 bytes and is unrealistic. How slower will bigger updates propagate?
  • Does CPU and memory consumption scale linearly with cluster size?

This is the branch I used - dimitar/memberlist-ha - and I've also attached the two k8s deployments and a service that I used.

memberlist-ha-svc.yaml.txt memberlist-ha-deploy.yaml.txt memberlist-ha-leader-deploy.yaml.txt dashboard.json.txt

Graphs Graph 1 - blue annotation shows when timeouts were reduced. There is smaller latency even when nodes go from 100 to 200 Screenshot 2022-08-03 at 16 18 49

Graph 2 - memberlist with async TCP transport sending Screenshot 2022-08-03 at 16 29 10

dimitarvdimitrov avatar Aug 03 '22 15:08 dimitarvdimitrov

what happens when we increase the size of messages? so far I was sending a unix millisecond timestamp as a string; that's 13 bytes and is unrealistic

For the HA tracker purposes we'll need to transfer updates for this data structure, plus the cluster ID:

https://github.com/grafana/mimir/blob/eed8ae3e6329dd6b8c2f3d402ef41eab971de9a0/pkg/distributor/ha_tracker.pb.go#L28-L37

Assuming an average 20 bytes for cluster ID and replica ID, the binary data we would have to transport may be in the order of: 20 + 20 + 8 + 8 = 56 bytes.

pracucci avatar Aug 03 '22 16:08 pracucci

I'd expect anything smaller than MTU of ~1500B to take the same time to transmit, WDYT?

colega avatar Aug 04 '22 08:08 colega

Does CPU and memory consumption scale linearly with cluster size?

I would expect it to do so, right? Every extra client can potentially send you a gossip message with a constant probability.

colega avatar Aug 04 '22 08:08 colega

Does CPU and memory consumption scale linearly with cluster size? I would expect it to do so, right? Every extra client can potentially send you a gossip message with a constant probability.

This is true for the hash ring use case, but for the HA tracker use case I would expect the message passing be a factor of how many HA clusters the Mimir tenants are using, because I expect a data structure with a key for tenant and HA cluster, and propagate each key value independently (like in the ring we propagate each instance independently).

pracucci avatar Aug 04 '22 09:08 pracucci