amazon-kinesis-producer
amazon-kinesis-producer copied to clipboard
KPL back off is kind of bush league
The KPL authors suggest writing something like this when records are being produced at a higher rate than the KPL can submit them to kinesis.
while (producer.getOutstandingRecordsCount() > MAX_BUFFERED) {
Thread.sleep(1);
}
Using a busy loop to keep the KPL queue from getting too big seems kind of a novice approach. Since the Java code already has a callback and a ConcurrentHashMap it should allow us to specify a max and then have a "blocking" version of the addUserRecord
call. I am doing something like this to work around the issue:
private static final Semaphore backPressure = new Semaphore(MAX_BUFFERED_SIZE, true);
private static final FutureCallback<UserRecordResult> CALLBACK = new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
backPressure.release();
}
@Override
public void onFailure(Throwable t) {
backPressure.release();
. . .
}
};
public void submit(String stream, String partitionKey, ByteBuffer data) throws InterruptedException {
backPressure.acquire();
try {
ListenableFuture<UserRecordResult> future = producer.addUserRecord(stream, partitionKey, data);
Futures.addCallback(future, CALLBACK);
} catch (RuntimeException e) {
backPressure.release();
throw e;
}
}
It seems kind of lame I have to do this with a semphore when the KPL java code could do this more efficiently and built-in.
Worse still, the internal retry logic in KPL is quite aggressive (see #12 + #14 for example). So if you don't follow the user-level throttling, things get linearly worse in the KPL daemon.
Forcing the backoff logic on consumers means everyone is forced to solve the problem, and most users are going to do it badly.
Thanks for reporting this, we're looking at improving the experience using the KPL. I agree that producer threads shouldn't need to call sleep, but instead should be parked via a wait call. Adding simpler support for max inflight is something we will definitely look at providing.
Can others who are impacted by this please leave a comment or reaction to assist in prioritizing these changes.
We've similarly implemented our own backoff - but in a different way. We instead skip Kinesis altogether, log an error, and put the record into a local file that gets uploaded separately. (which spawned https://github.com/awslabs/amazon-kinesis-producer/issues/76)
Would certainly welcome the inclusion of abstractions and choices around backoff, but I'm weary of the above being the default or the only choice. It would introduce blocking into an otherwise nonblocking client, which would likely cause a lot of pain in the Reactive/Scala world. Using something like above is likely to put a bunch of threads/tasks in waiting/blocked states if records are not being sent fast enough, which may be desirable in some scenarios but not in others (based on how critical the data is, and the system architecture at hand). It would also prevent real backpressure strategies from being implemented - like slowing down the rate that messages are coming in.
So would suggest an abstract BackpressureStrategy
and a concrete semaphore based waiting strategy that can be optionally chosen
What's the recommended strategy right now?