franz-go icon indicating copy to clipboard operation
franz-go copied to clipboard

kgo.LiveProduceConnection option for low-latency settings

Open helgeholm opened this issue 1 year ago • 6 comments

We are trying to use franz-go in a low-latency setting.

With a normal, steady flow of produced messages, we're seeing latencies of 5ms through the system which is nice. However, there are two cases where messages have 40-120ms latency because they need to wait for a TCP connect and a SSL handshake and auth:

  1. The first message sent triggers broker.loadConnection() and creates the broker.cxnProduce connection on the brokers before sending.
  2. There is a lull in messages for longer than cfg.connIdleTimeout. This causes the broker.cxnProduce to be reaped and scenario 1 reapplies on the next message.

A config option like kgo.LiveProduceConnection that would create broker.cxnProduce on init and shield it from the connection reaper, would solve our problem.

helgeholm avatar Aug 15 '24 20:08 helgeholm

Would it be preferable to shield it from the reaper, or would a function that ensures a producer connection to a specific broker is open (or all brokers with node id -1 perhaps) be preferable? The function could return only once auth is complete, and can also return an error that contains any brokers connections could not be opened for.

twmb avatar Aug 21 '24 18:08 twmb

Thanks for following up!

Ensuring connection to all brokers would be necessary for our use case, yes. The ideal is to send an "apiRequest" or other heartbeat-type message to each broker on startup and at a regular interval.

We've done a lot more testing on our end, and found that the brokers also have connection reapers defaulting to 10 minutes. While these can be configured off, it is only via undocumented behavior and some Kafka providers (most importantly ours) doesn't allow it.

Our topics have 12 partitions on a cluster of 3 brokers.

Currently we have it working well by producing dummy messages every 9 minutes and sending one each to an arbitrary partition on each of the brokers. This maintains the connection indefinitely and we are getting low latency behavior. It would however be more clean if we could maintain a connection without piling unnecessary messages on the topics.

helgeholm avatar Aug 22 '24 08:08 helgeholm

Have you tried adjusting ConnIdleTimeout? Is that an option?

twmb avatar Aug 26 '24 17:08 twmb

Some of our producers' traffic patterns have days and hours between messages, so even the max value of 15 minutes is insufficient.

If kgo's configuration tolerance were extended to high values or even 0=disabled, we can use it like we currently use librdkafka. That is, heartbeat messages every 30 minutes to avoid the server's connection reaper's max tolerance of 60 minutes.

But our hope is to be able to maintain live connections even without generating messages on a heartbeat topic. :)

Allowing us to send ApiRequest messages to individual brokers in the cluster would be an alternative that gives us the ability to solve our problem.

helgeholm avatar Aug 27 '24 08:08 helgeholm

PTAL at the function introduced in this PR: https://github.com/twmb/franz-go/pull/839

I don't think I'm going to squeeze this into the next release (tonight), so this may wait a month unfortunately, but lmk if that would solve what you're looking for.

twmb avatar Oct 15 '24 00:10 twmb

Thanks! The documented behavior makes sense. Specified or discovered brokers will definitely cover it.

If I understand the code path, it will A) initiate a connection to each broker if no connection exists, B) if a connection does exist, reset any client side timeout on it but otherwise make no communication with the broker. Is this correct? Or will it perform some sort of network "ping" in the B case?

If that understanding is correct, this will remove most cases of latency spikes as long as we keep calling EnsureProduceConnectionIsOpen often enough to immediately recover from server side or network middleware connection reaping, which I think we can keep down to around once every 30 minutes per broker. Definitely an improvement for us.

If I'm incorrect and B also performs a "ping" (e.g. querying "rd_kafka_metadata") or any other network communication, we will also avoid server-side connection reaping, and also only need to call EnsureProduceConnectionIsOpen every few minutes. If that is happening, or can be added, it would mitigate all reconnect latency spikes that are possible client-side.

helgeholm avatar Oct 15 '24 12:10 helgeholm

Three proposals for potential alternatives:

  1. Keep the existing API, but change the logic to actually send an ApiVersions request on the opened connection. This will force it to be live on both the client and the broker (and likely simplifies the logic).
  2. Change the name of the API to something like DeepPing -- and do the same logic on the produce connection. This function will operate similar to Ping, but extends it by ensuring the SASL flow is completed and pinging multiple brokers at once. I would also always force broker discovery (internally issue a metadata request if no brokers have been discovered yet).
  3. Idea (2), but with an extension where you can specify which connection "type" to request against -- i.e., TypeMetadata, TypeProduce, TypeConsume, TypeGroup, TypeAdmin.

I most prefer (2), closely followed by (1). If going for idea (1), I'd like to maybe change the logic to issue a metadata request if either (a) no brokers have been discovered, or (b) any broker directly specified is unknown.

twmb avatar Jan 22 '25 01:01 twmb

Barring feedback, I'll implement option 2 for the next minor release. DeepPing will use a produce connection unless the client is not a producer.

twmb avatar Feb 04 '25 22:02 twmb

I've touched up #839. After looking deeper at Ping, Ping does deeply ping a broker. Given that, there was little reason to introduce DeepPing.

I've changed the implementation you originally reviewed to go one step further: EnsureProduceConnectionIsOpen issues an ApiVersions request down the produce connection. This resolves the problem of the broker potentially reaping the unused connection.

I also changed EnsureProduceConnectionIsOpen to issue a metadata request only for brokers if no broker has been discovered yet and you are requesting all brokers to have an open produce connection. This doesn't really help if brokers haven't been discovered, because the client will need to load metadata again for the topic once you finally produce, but it makes testing the function works easier.

This will be in 1.19, which I aim to release by the end of the month. I'm going to close this ahead of time with the PR.

twmb avatar Apr 16 '25 22:04 twmb

This is released.

twmb avatar May 08 '25 17:05 twmb