amazon-kinesis-producer icon indicating copy to clipboard operation
amazon-kinesis-producer copied to clipboard

KPL back off is kind of bush league

Open rdifalco opened this issue 8 years ago • 4 comments

The KPL authors suggest writing something like this when records are being produced at a higher rate than the KPL can submit them to kinesis.

while (producer.getOutstandingRecordsCount() > MAX_BUFFERED) {
    Thread.sleep(1);
}

Using a busy loop to keep the KPL queue from getting too big seems kind of a novice approach. Since the Java code already has a callback and a ConcurrentHashMap it should allow us to specify a max and then have a "blocking" version of the addUserRecord call. I am doing something like this to work around the issue:

  private static final Semaphore backPressure = new Semaphore(MAX_BUFFERED_SIZE, true);

  private static final FutureCallback<UserRecordResult> CALLBACK = new FutureCallback<UserRecordResult>() {
    @Override
    public void onSuccess(UserRecordResult result) {
      backPressure.release();
    }

    @Override
    public void onFailure(Throwable t) {
      backPressure.release();
      . . .
    }
  };

  public void submit(String stream, String partitionKey, ByteBuffer data) throws InterruptedException {
    backPressure.acquire();
    try {
      ListenableFuture<UserRecordResult> future = producer.addUserRecord(stream, partitionKey, data);
      Futures.addCallback(future, CALLBACK);
    } catch (RuntimeException e) {
      backPressure.release();
      throw e;
    }
  }

It seems kind of lame I have to do this with a semphore when the KPL java code could do this more efficiently and built-in.

rdifalco avatar Apr 04 '16 21:04 rdifalco

Worse still, the internal retry logic in KPL is quite aggressive (see #12 + #14 for example). So if you don't follow the user-level throttling, things get linearly worse in the KPL daemon.

Forcing the backoff logic on consumers means everyone is forced to solve the problem, and most users are going to do it badly.

samcday avatar Jun 07 '16 21:06 samcday

Thanks for reporting this, we're looking at improving the experience using the KPL. I agree that producer threads shouldn't need to call sleep, but instead should be parked via a wait call. Adding simpler support for max inflight is something we will definitely look at providing.

Can others who are impacted by this please leave a comment or reaction to assist in prioritizing these changes.

pfifer avatar Feb 15 '17 20:02 pfifer

We've similarly implemented our own backoff - but in a different way. We instead skip Kinesis altogether, log an error, and put the record into a local file that gets uploaded separately. (which spawned https://github.com/awslabs/amazon-kinesis-producer/issues/76)

Would certainly welcome the inclusion of abstractions and choices around backoff, but I'm weary of the above being the default or the only choice. It would introduce blocking into an otherwise nonblocking client, which would likely cause a lot of pain in the Reactive/Scala world. Using something like above is likely to put a bunch of threads/tasks in waiting/blocked states if records are not being sent fast enough, which may be desirable in some scenarios but not in others (based on how critical the data is, and the system architecture at hand). It would also prevent real backpressure strategies from being implemented - like slowing down the rate that messages are coming in.

So would suggest an abstract BackpressureStrategy and a concrete semaphore based waiting strategy that can be optionally chosen

kwinter avatar Mar 12 '18 22:03 kwinter

What's the recommended strategy right now?

buremba avatar Apr 13 '20 11:04 buremba