amazon-kinesis-client icon indicating copy to clipboard operation
amazon-kinesis-client copied to clipboard

If IRecordProcessor.processRecords() throws an exception, record processor should be destroyed, events should not be skipped.

Open llamahunter opened this issue 11 years ago • 44 comments
trafficstars

Right now, if processRecords() throws an exception, the KCL simply discards all those records passed to it and continues blindly onwards, passing in new records. An exception indicates something bad has happened, and the record processor should not continue to be used. The worker should discard the processor, create a new one, and pass the same events to it.

llamahunter avatar Oct 28 '14 03:10 llamahunter

The KCL treats these as poison pill records, since the application record processor has had an opportunity to look at the exception and apply the appropriate retry policy (including System.exit() if inducing a fail over is beneficial).

gauravgh avatar Oct 29 '14 15:10 gauravgh

Ok, then, in the event that my record processor knows these are not 'poisoned' records, but has gotten into some internally inconsistent state due to other bugs in the KCL (see issue #11), how do I request that my record processor is destroyed and recreated (see issue #12 )?

llamahunter avatar Oct 29 '14 16:10 llamahunter

I don't think relying on the record processor to 'look at the exception and apply the appropriate retry policy' is adequate here. There are a wide range of scenarios in which unexpected application conditions can lead to unexpected errors, and the KCL should not act in a way that causes data loss in this situation IMO.

AWS Lambda does the correct thing here IMO:

Stream-based event sources – For stream-based event sources (Amazon Kinesis Streams and DynamoDB streams), AWS Lambda polls your stream and invokes your Lambda function. Therefore, if a Lambda function fails, AWS Lambda attempts to process the erring batch of records until the time the data expires, which can be up to seven days for Amazon Kinesis Streams. The exception is treated as blocking, and AWS Lambda will not read any new records from the stream until the failed batch of records either expires or processed successfully. This ensures that AWS Lambda processes the stream events in order.

http://docs.aws.amazon.com/lambda/latest/dg/retries-on-errors.html

The KCL should only move on if it has successfully executed the processRecords method. If applications wish to treat errors as poison pills, they can do this by catching all exceptions and allowing processRecords to complete without error.

With the KCL dropping data when an exception is thrown by processRecords I don't know how it's possible to build applications with any delivery guarantees. Kinesis offers at-least-once delivery, but it seems that no Java application built on Kinesis can offer the same.

joelittlejohn avatar Feb 13 '17 00:02 joelittlejohn

Any plan to address this?

haivietduong avatar Apr 11 '17 23:04 haivietduong

I see the behavior is still same as before. What is the reason why it is designed this way?

supernavy avatar Jun 28 '17 07:06 supernavy

Poor design decision. This issue makes whole library useless for any application that must guarantee event delivery. Is there any plans to address this?

Mentis avatar Jul 04 '17 07:07 Mentis

Useless is a bit far fetched, as gauravgh said you can always have an outer catch-all exception handler in your processRecords and then exit on your own to destroy the failed processor.

(But I agree with the general sentiment: I would expect it to be the other way round, let the processor exit on any exception and if you want to swallow exceptions, implement this in processRecords).

lightblu avatar Jul 05 '17 09:07 lightblu

@lightblu I don't think it's a bit far fetched, unless I have misunderstood the KCL implementation. @Mentis said:

useless for any application that must guarantee event delivery

and I think he's right. If you have an absolute requirement to guarantee event delivery, you cannot use this library, since it will throw away data when an application fails to process a message. The catch all that you describe doesn't help mitigate this in any way. Applications have many unpredictable failure modes.

Sorry, I'm not trying to be awkward or rude here. I just want to verify my understanding and ensure that the scope of this issue is understood by me and others. This is a critical issue IMO. Java users need be able to offer delivery guarantees just like Lambda users can.

I would never describe this library as useless in general. Most use-cases don't need such delivery guarantees. For those, retrying a few times inside processRecords in case of errors, then discarding the message, is likely good enough. For use-cases that need to guarantee at-least-once delivery, this library is useless.

joelittlejohn avatar Jul 05 '17 10:07 joelittlejohn

"The KCL should only move on if it has successfully executed the processRecords method" This is a streaming library that should not stop just because implementor of processRecords may not have handled exception that may happen to be related to formatting of one record. You have to ask yourself a question is it recoverable or non-recoverable exception? If it is non-recoverable perhaps you need to handle this case yourself (error stream would be nice) or you never move on your stream processing? If it is recoverable, like dependent service being down, perhaps retries with exponential backoff be appropriate? Either way, you as a developer have to indicate which is the case.

apiwoni avatar Jul 20 '17 19:07 apiwoni

@apiwoni Could you explain to me how one can implement an app with at-least-once delivery guaranteed, using the current KCL implementation?

joelittlejohn avatar Jul 20 '17 19:07 joelittlejohn

John, I think you are getting confused here about the concept of guaranteed delivery. Records are being guaranteed to be delivered to the processor at-least-once. Yes or no? What are you talking about is application acknowledgement that processing has been completed successfully and potential retry of delivery. This in itself, has nothing to do with the concept of guaranteed delivery but, if you will, a concept of manual acknowledgement as opposed to auto-acknowledgement.

apiwoni avatar Jul 20 '17 19:07 apiwoni

@apiwoni I don't think I'm confused, but maybe we simply haven't understood each other yet.

What I'm talking about is attempting to implement an application on top of Kineses that offers guaranteed delivery. I should be able to do this when I build upon a queue or stream that itself offers guaranteed delivery.

Right now, if I use the Java KCL I cannot offer this guarantee to consumers of my application, even though I am building on an underlying streaming technology (Kinesis) that should allow me to build an application with this kind of guarantee. The reason for this is that the KCL does not handle application failure modes in a way that will avoid message loss. The official Java client should (IMO) handle application failure modes in a way that avoids message loss.

I think the approach implemented by AWS Lambda is the right one.

joelittlejohn avatar Jul 20 '17 19:07 joelittlejohn

But would you really want to stop processing stream when you have non-recoverable error like formatting in one of your records? I wouldn't unless there are some ordering dependencies in records but order is only guaranteed within a shard and then you have to consider implications of potential re-sharding on that order. I probably would look into traditional messaging solutions with manual acknowledgements and DLQs if that's the case or Kafka where you can commit offsets manually but do not care about order as much.

apiwoni avatar Jul 20 '17 20:07 apiwoni

Ordering is a separate topic I think. Let's leave ordering aside for now and consider only the delivery guarantees of the applications we build.

If your application wants to ignore a record with a formatting error (as most would) and continue processing, you are free to catch exceptions within processRecords and allow the invocation of processRecords to complete without error.

The problem is not expected exceptions of this kind (these are easy to catch and ignore within processRecords), the problem is application states (and factors outside the control of your application) that lead to unexpected errors when processing a valid message. This kind of error should never lead to that valid message being dropped when it has not successfully been processed. Right now, the KCL treats success and failure the same.

joelittlejohn avatar Jul 20 '17 20:07 joelittlejohn

So you are talking about uncaught exceptions where acknowledgement is manual and not auto?

apiwoni avatar Jul 20 '17 20:07 apiwoni

The theory of recoverable or unrecoverable exception seems tempting but not realistic in fast evolving distributed systems. You can't assume the involved parties in an exception are still exactly same when you try next time. So you can't assume the same call will fail for same reason next time in any case. Logically with as as many as possible retries, every runtime exception is recoverable. If your and your dependent system have SLA on the time for any runtime exception to recover, simply retry until succeed in any case will make your system much simpler and won't bring you any operation cost when the fault is in your dependent system.

supernavy avatar Jul 20 '17 22:07 supernavy

@supernavy so you agree then that:

  1. The only way to guarantee recovery from an unexpected error state is to allow indefinite retry
  2. The only way to allow indefinite retry (either within the current process, or beyond the life of the current process, in another process) is to not mark a message consumed unless it is successfully processed without error

Sorry if I am restating your comment, I just want to ensure that I understand you as I feel I could read parts of your reply in different ways.

joelittlejohn avatar Jul 20 '17 22:07 joelittlejohn

@joelittlejohn Yes. I agree with that. And it is becoming more and more general cases today. So as an infrastructure level library, KCL should consider this requirement and add support for that.

supernavy avatar Jul 20 '17 22:07 supernavy

what is wrong with approach A?

You're suggesting using System.exit() as a means to retry? Shutting down the entire JVM, and relying on an external process to restart it in order to retry ? That seems more in the category of a (bad) workaround rather than a solution.

adrian-baker avatar Mar 09 '18 02:03 adrian-baker

This is the opposite how how everything similar to this works in AWS. For example in SQS if you throw an exception the messages goes back on the queue to be processed again. With the Lambda connector for Kinesis if you throw an exception the stream stops processing. Yes that is not ideal, but that is better than loosing data. The way this works right now is dangerous and if it is going to stay this way needs to be more clearly communicated and stated up front to people.

krunde avatar Jun 15 '18 12:06 krunde

Hi guys, Interesting discussion, we also have requirement for zero data loss in our app and i feel KCL should be doing this like AWS lambda. Is there any plans from AWS to complete this request?

akumariiit avatar Sep 18 '18 18:09 akumariiit

We are dealing with a similar problem but it seems like the easiest way is to restart the RecordProcessor for the individual shard and let the Scheduler restart and thus the records will be retried there.

Right now the only way that seems possible is to throw an instance of Error from processRecords which should be thrown from the lifecycle in the scheduler.

Is this a valid approach?

autodidacticon avatar Oct 09 '18 20:10 autodidacticon

We are also dealing with this issue, and are using KCL v2. @autodidacticon did you have luck with the above approach?

kareanra avatar Oct 17 '18 14:10 kareanra

As long as you don't progress the checkpoint using processRecordsInput.checkpointer().checkpoint(), if the application starts over it will pickup the records since the last checkpoint. Doing a sys.exit will achieve a similar behavior as Lambda function and it will keep retrying on the same set of records. Although it's not the best approach due to sudden shut down of the app and warm-up period. Also sending those records to a DeadLetterQueue and process them separately could be a better approach both for KCL and Lambda.

majidfn avatar Oct 25 '18 23:10 majidfn

Does anyone know how to shutdown a record processor in processRecords when an exception is thrown from the check pointer? I currently experience two workers reading the same shard one throws errors check pointing in processRecords since it does not own the lease to the shard. However it appears I have not handled the exception properly but rather caught it and moved on causing the duplicate worker to continue on. Is there a way to tell the worker coordinator to shutdown just the worker without killing the whole application with a system.exit?

BobbyJohansen avatar Dec 12 '18 02:12 BobbyJohansen

@BobbyJohansen My assumption is if you do not swallow that lease exception, the coordinator will balance it out automatically. A sys.exit is not a good option here. Imagine you have 12 shards and three instances of your workers, an instance could be responsible for four of those shards, and a sys.exit (because the worker has lost the lease on one shard) will cause it to interrupt processing the other three shards it's responsible for too. Which is not ideal.

majidfn avatar Dec 12 '18 02:12 majidfn

@majidfn Unfortunately the interface ShardRecordProcessor.processRecords does not allow me to throw a ShutdownException. Would you recommend throwing a runtime Exception?

BobbyJohansen avatar Dec 12 '18 04:12 BobbyJohansen

@BobbyJohansen IMHO I recommend not handling the exception raised from the checkpointer at all and also not throwing any other exceptions. Let that exception bubble up and processRecords will/should handle the coordination by itself. This is what I have experienced so far. I hope someone from the official KCL team can chime in on this.

majidfn avatar Dec 12 '18 16:12 majidfn

Unfortunately the interface ShardRecordProcessor does not throw an exception therefore I cannot throw a ShutdownException from the processRecords overriden method :(. I can wrap it and throw a non checked Exception.

BobbyJohansen avatar Dec 13 '18 20:12 BobbyJohansen

Why not checkpoint on every record once you have processed it successfully. kinesisRecordPayLoadHolder.getiRecordProcessorCheckpointer ().checkpoint ( kinesisRecordPayLoadHolder.getRecord () );

This is what kafka recommends as well to put your own checkpoint on every sucessfully processed record

bmarwaha-godaddy avatar Oct 28 '19 03:10 bmarwaha-godaddy