kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

Possible Memory Issue with ACKS=0

Open iklotzko opened this issue 3 years ago • 9 comments

We use kafkajs to produce a high throughput of messages > 4,000 messages per second. For one of our workflows we use acks=0 and notice that the memory builds up quite high because the producer send request is resolved immediately. However, the message still gets placed on a queue in kafkajs (?) with a socket request (?) until it is actually sent. For this there would be a delay, however small until the message is sent, removed from the queue, and unreferenced by kafkajs to be eligible for gc. But we don't see that delay due to the send resolving immediately, asynchronously, and before the message is sent.

We use worker threads in a pool to do the producing to Kafka. We check for any backpressure on the main thread from our workers in order to pause our intake until the workers can finish producing their kafka messages and return to get more.

Since the workers return immediately from a send with acks=0, the backpressure is not detected and the memory rises until the application exits due to heap-space issues. When we make the acks=1 this no longer becomes an issue, of course, as the full round trip is executed and our application works as expected.

Is there any callback, instrumentation event, or otherwise we can use with an acks=0 in our producer in order to detect when the message is sent by the socketRequest (?) Is there a way to not have the socket expect a response but also to not resolve until the message has been actually sent by the socket and that message becomes eligible for gc?

Would that info be in the instrumentation event and we just wait on that in a listener? That can probably be done but it would likely be a smallish kludge for us to do, but we would understand if it were the only way.

Thanks for a great product! We can't wait until v16.0.0 to come out. You guys do such an amazing job.

iklotzko avatar Jan 30 '22 17:01 iklotzko

I do think using an instrumentation event might work for you as a way to check if you need to apply backpressure. Specifically, I suspect you might be able to use the REQUEST_QUEUE_SIZE event. That'll essentially tell you how many requests are currently either in flight or waiting to be sent. You can see an example of how it's used here: https://github.com/tulios/kafkajs/blob/fbf4a178fd062c346be94a8104b9f9f28356df5f/src/producer/index.spec.js#L275-L326

It's a little cumbersome because each broker connection has an independent request queue, so I supposed you need to keep a global total, something along the lines of:

const globalRequestQueueDepth = (client: Producer | Consumer | Admin) => {
  const queues = new Map();

  client.on(client.events.REQUEST_QUEUE_SIZE, ({ payload }) => {
    const { broker, queueSize } = payload;
    queues.set(broker, queueSize);
  });
  
  return () => {
    let sum = 0;
    for (const queueSize of queues.values()) {
      sum += queueSize;
    }
    return queueSize;
  }
}

const producerQueueDepth = globalRequestQueueDepth(producer)

producerQueueDepth()

Nevon avatar Jan 31 '22 07:01 Nevon

Thanks Nevon, we'll take a look at that, I always took the REQUEST_QUEUE_SIZE from the documentation as a change on how many in-flights you could have but looking at socketRequest I can see I was incorrect and I like your idea. Thank you for your help!

iklotzko avatar Jan 31 '22 14:01 iklotzko

I'm going to close this issue, but feel free to re-open if needed.

Nevon avatar Feb 08 '22 07:02 Nevon

@Nevon @iklotzko

Dear guys, I met the similar problem.

Env: EC2(client): 4u8g, nodejs v14.17.6,kafkajs:1.15.0

If I set ack=1, the next request will be blocked until kafka leader response, that will lower the qps of my core business when the kafka server load is high or some network problem happens. So I set client ack=0 to acheive that the kafka server problem should not affect my core business.

When I add 2 second delay to the network beetween client with kafka server, and send msg of 20qps(each msg 1MB) to the server, the EC2 memory increase rapidly, that result in Out of memory.

How to solve the OOM problem?

baolinCloud avatar May 23 '22 13:05 baolinCloud

Coming back to this with fresh eyes, maybe this makes sense:

a way to not have the socket expect a response but also to not resolve until the message has been actually sent by the socket and that message becomes eligible for gc?

I don't know that we can change the semantics of acks=0, since probably there are applications depending on it behaving the way it does currently, but maybe there could be a way to opt-in to the behavior where the producer resolves when the request has been sent, so as to not lose the backpressure.

Nevon avatar May 23 '22 14:05 Nevon

Let's say you were to have a method allowing you to send a kafka message asynchronously receiving a callback on successful (or unsuccessful) ack. The messages in the queue would still lead to pileup and probably more memory issues unless backpressure was monitored and controlled via some type of sent vs received ack ratio.

We eventually switched to use acks=-1 along with RD Kafka which sends at a very reasonable rate. But can easily used acks=0 or 1. To detect and manage back-pressure we use the async library and set up a queue. Lib RD is a gigantic pain in the tuckus: it doesn't work in worker threads, is native, and has a lot of configuration details, but ultimately provides extraordinary control over our producing (and consuming for that matter). Using the async package's queue data structure allows us to make each produce call concurrently (asynchronous) but wait on the promises. The queue blocks automatically if the queue gets beyond a certain amount. It's a wonderful package.

Also, RD Kafka provides for a very robust roll-your-own commit methodology.

However, for most applications I recommend KafkaJS as it is well abstracted and provides simplicity to a very complex system. It also works quite well and has very good documentation. But if speed is your game, RD Kafka is the way to go.

On Mon, May 23, 2022 at 10:27 AM Tommy Brunn @.***> wrote:

Reopened #1278 https://github.com/tulios/kafkajs/issues/1278.

— Reply to this email directly, view it on GitHub https://github.com/tulios/kafkajs/issues/1278#event-6661646647, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAHJRYT4FQZXMHYGHT66DC3VLOIV7ANCNFSM5NEPIOYA . You are receiving this because you authored the thread.Message ID: @.***>

iklotzko avatar May 23 '22 14:05 iklotzko

Thanks for your reply.

I think i should try :

  1. set ack=1 to enable backpressure
  2. seprate the async sending kafka msg business from the core.
  3. lower the default timeout to 3 seconds to fail quickly
  4. create a global variable PENDING_LEN to record the length of pending msg. Before sending to kafka, PENDING_LEN should be added and will decrease the num if sending successfully. If PENDING_LEN > MAX_PENDING_LEN, msg will be dropped. MAX_PENDING_LEN should be considered carefully.

baolinCloud avatar May 24 '22 14:05 baolinCloud

Why timeout, or drop messages rather than pause the intake of data going to your producer (when max_pending_len > pending_len) ? The driver of our producer messages is a consumer from another topic and their subsequent transformations. Whenever our pending_cnt > high_water_cnt we issue a pause on our consumer. When our pending_cnt <= low_water_cnt we resume. For other applications our producer is fed from files read in, in those cases we issue a pause to the input stream. 

I think separating out the async sending is a great idea, it's always good to have shorter-running tasks executing on your event queue. After your business-logic is done it's not as important to have the sending be separate as it is to encapsulate the sending into an async task put on the event queue--but perhaps that is what you are saying.

Also, KafkaJS plays very nicely in worker threads, if you are running this on a system with a sufficient core count, consider spawning worker threads (or cluster if you don't like threads) that send your messages to take advantage of parallel processing. However, just make sure your producer sends back a message saying it has completed its task so you can manage back pressure using the pending count.

Are you batching your messages into an array before sending? The performance/throughput advantages of doing that cannot be understated. Ira

iklotzko avatar May 24 '22 14:05 iklotzko

Also, @Nevon had a very nice idea of using an instrumentation event to track for acks=0 to prevent pileup in the socket. It does not look very difficult to implement based off his excellent suggestion. However, setting acks=0 almost guarantees that you will have some lost data, which in many scenarios is not an issue, but in others it can be catastrophic.

iklotzko avatar May 24 '22 14:05 iklotzko