dskit
dskit copied to clipboard
Proposal: make memberlist TCP Transport non blocking
This proposal is based on a theory I have, but haven't proved if it can significantly improve the messages propagation yet.
The dskit's memberlist client propagate changes (e.g. hash ring changes) using broadcast messages: https://github.com/grafana/dskit/blob/ead3f9308bb7b413ce997182dd4d7c6e038bc68f/kv/memberlist/memberlist_client.go#L1042-L1058
Messages are push to a queue, along with messages received from other nodes and enqueue for re-broadcasting, and then a single goroutine periodically pulls messages from the queue and send them to some other (random) nodes: https://github.com/grafana/memberlist/blob/09ffed8adbbed3ee3ace7de977c5e1cade90da87/state.go#L604-L626
The gossip() function, responsible to pick up messages from the queue and send them to some random nodes, is called at a regular interval, configued by -memberlist.gossip-interval (200ms by default):
https://github.com/grafana/memberlist/blob/09ffed8adbbed3ee3ace7de977c5e1cade90da87/state.go#L131-L135
For each gossip interval, the gossip() function picks up messages up to UDPBufferSize, which is hardcoded to 10MB in dskit (because dskit uses a custom transport based on TCP):
https://github.com/grafana/dskit/blob/ead3f9308bb7b413ce997182dd4d7c6e038bc68f/kv/memberlist/memberlist_client.go#L420-L422
We assume that we broadcast new messages and re-broadcast received message every -memberlist.gossip-interval=200ms but that's not guaranteed: the gossip() function make take significantly longer, because the dskit's custom TCP transport blocks on each call to rawSendMsgPacket(). In particular, the call to rawSendMsgPacket() blocks for the whole duration it takes to the TCP transport to create a new TCP connection, write the data and close it:
https://github.com/grafana/dskit/blob/ead3f9308bb7b413ce997182dd4d7c6e038bc68f/kv/memberlist/tcp_transport.go#L438-L439
For example, if it can't reach a node and the connection times out the gossiping is blocked for 5 seconds (when running with the default config -memberlist.packet-dial-timeout=5s).
Proposal
The gossip() in memberlist library assumes an UDP transport, where sending packet is a "fire and forget", and basically doesn't block for a long time. On the other side, the dskit's TCP transport blocks even if it swallows any error:
https://github.com/grafana/dskit/blob/ead3f9308bb7b413ce997182dd4d7c6e038bc68f/kv/memberlist/tcp_transport.go#L430-L432
In pratice, it's currently the worst of the two worlds: with TCP we can detect more delivery errors at the cost of blocking the call, but we never return the error.
I propose to make TCP transport's WriteTo() non-blocking. It preserves the current "fire and forget" behaviour without blocking the gossiping of messages to other nodes if a node is unresponsive.
Looking at timer_memberlist_gossip metric
The duration of gossip() execution is tracked by timer_memberlist_gossip summary metric. Since it's configured with a max age of 10s, but in our infrastructure we use a greater scrape interval, we're missing some data points but it gives good insights anyway:

At time 11:22 UTC we started a Mimir cluster rolling update. As you can see, the gossip() duration spikes (both max and average computed on the 99th percentile over the 10s observation window).
I propose to make TCP transport's WriteTo() non-blocking. It preserves the current "fire and forget" behaviour without blocking the gossiping of messages to other nodes if a node is unresponsive.
On Slack, @dimitarvdimitrov pointed out that we still need to wait until messages are successfully sent on shutdown, when we send the "dead node" (leave) message. He's right. Since we're in control of the TCP transport, I think this is something we can easily do.
What happens when the TCP send buffer fills up?
I don't know if using non-blocking writes will help - I believe we create and close a connection for every WriteTo. I don't know off the top of my head what happens to the bytes in the buffer, whether closing the socket blocks or the bytes are discarded.
What happens when the TCP send buffer fills up?
In our current implementation we open new TCP connection for each memberlist "packet" or "stream". But gossiping works in a single goroutine (as this issue points out), so if remote side doesn't read the data quickly, gossiping gets stuck.
However remote side creates listener in a goroutine, and reads entire "packet" fully, before passing it to memberlist.
I don't know if using non-blocking writes will help - I believe we create and close a connection for every WriteTo
Exactly, and that's a blocking operation. The whole gossiping from a node blocks on each of such operation.
Ah I see - the connect/disconnect is async too.
It would be more efficient to use UDP when possible, opening a new connection for every packet is quite wasteful.
If we go this route, we would have to limit the number of concurrent operations, otherwise we could risk holding a large number of TCP connections open. If unbounded, then a system with e.g. N memberlist members would be able to open N TCP connections.
Looking at it from a different angle: If we're comfortable opening N connections, then why not hold the connections open to save the connect/disconnect time.
If we go this route, we would have to limit the number of concurrent operations, otherwise we could risk holding a large number of TCP connections open.
Sure. It should be limited, and block once over the limit.
Looking at it from a different angle: If we're comfortable opening N connections, then why not hold the connections open to save the connect/disconnect time.
💯 agree and I would like to discuss it in a separate issue. I think that's another issue we have to address, but IMO we may suffer by a blocking implementation anyway, that's why I think making the transport WriteTo() async is desired anyway.
Looking at it from a different angle: If we're comfortable opening N connections, then why not hold the connections open to save the connect/disconnect time.
Opened a dedicated issue: https://github.com/grafana/dskit/issues/193
A PoC of this proposal was experimented by @dimitarvdimitrov in https://github.com/grafana/mimir/issues/1597 and reported as a significant improvement to reduce propagation times during rollouts.