kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-16448: Catch and handle processing exceptions

Open loicgreffier opened this issue 1 year ago • 25 comments

This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.

This PR actually catches processing exceptions.

Jira: https://issues.apache.org/jira/browse/KAFKA-16448.

Contributors

@Dabz @sebastienviale @loicgreffier

Depends On

https://github.com/apache/kafka/pull/16092 https://github.com/apache/kafka/pull/16090

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

loicgreffier avatar May 27 '24 15:05 loicgreffier

Thanks for the PR, some minor comments around code.

Thanks for the review @apoorvmittal10.

As mentioned, this PR is part of a biggest PR that is being divided (#15973).

It should be rebase (to ease the final review) and merged after the following PRs have been merged:

  • #16090 reviewed by @cadonna
  • #16092, not reviewed, but maybe not necessary as this PR can bring the changes as well (let me know)

loicgreffier avatar May 28 '24 17:05 loicgreffier

Thanks for the PR, some minor comments around code.

Thanks for the review @apoorvmittal10.

As mentioned, this PR is part of a biggest PR that is being divided (#15973).

It should be rebase (to ease the final review) and merged after the following PRs have been merged:

Sounds good!

apoorvmittal10 avatar May 28 '24 19:05 apoorvmittal10

Thanks for the PR, @loicgreffier!

Here my feedback!

@cadonna The last PR update solved most of the points. I let some conversations open waiting for your feedback

loicgreffier avatar Jun 22 '24 14:06 loicgreffier

@cadonna Updated! I think only this conversation remains actually open: https://github.com/apache/kafka/pull/16093#discussion_r1648627868

loicgreffier avatar Jun 24 '24 20:06 loicgreffier

I restarted the build since it ran into a timeout.

cadonna avatar Jun 25 '24 08:06 cadonna

The failures of org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFails* seem related to this PR.

cadonna avatar Jun 25 '24 14:06 cadonna

The e.getMessage() does not contain "Injected test exception" anymore.

It is now getCause().getMessage() that contains "Injected test exception"

Fix is pushed

sebastienviale avatar Jun 26 '24 07:06 sebastienviale

The e.getMessage() does not contain "Injected test exception" anymore.

It is now getCause().getMessage() that contains "Injected test exception"

Fix is pushed

@cadonna This is because now the processing exception is caught and wrapped into a StreamsException (when handler set to FAIL):

throw new StreamsException("Processing exception handler is set to fail upon" +
                    " a processing error. If you would rather have the streaming pipeline" +
                    " continue after a processing error, please set the " +
                    PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
                    e);

Thus, in the uncaught exception handler:

  • The exception message is the message of the StreamsException: "Processing exception handler is set to fail upon..."
  • The cause message is the actual processing exception message: "Injected test exception"

Maybe additional assertions should be added in these tests to validate the exception and the cause

loicgreffier avatar Jun 26 '24 17:06 loicgreffier

The e.getMessage() does not contain "Injected test exception" anymore. It is now getCause().getMessage() that contains "Injected test exception" Fix is pushed

@cadonna This is because now the processing exception is caught and wrapped into a StreamsException (when handler set to FAIL):

throw new StreamsException("Processing exception handler is set to fail upon" +
                    " a processing error. If you would rather have the streaming pipeline" +
                    " continue after a processing error, please set the " +
                    PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
                    e);

Thus, in the uncaught exception handler:

* The exception message is the message of the `StreamsException`: "Processing exception handler is set to fail upon..."

* The cause message is the actual processing exception message: "Injected test exception"

Maybe additional assertions should be added in these tests to validate the exception and the cause

I did not realize this! Thank you for raising this! Your code is breaking uncaught exception handler set with the old deprecated uncaught exception handler. I have to ask how we handle backward compatibility of deprecated APIs. I would say we need to be backwards compatible.

cadonna avatar Jun 27 '24 07:06 cadonna

@cadonna

Thanks for the remarks, I did not realize that either!

I see two solutions for Backward compatibility:

1/ log an error and throw the original exception:

    private final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
  } catch (final Exception e) {
            final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
                internalProcessorContext.topic(),
                internalProcessorContext.partition(),
                internalProcessorContext.offset(),
                internalProcessorContext.headers(),
                internalProcessorContext.recordContext().rawRecord().key(),
                internalProcessorContext.recordContext().rawRecord().value(),
                internalProcessorContext.currentNode().name(),
                internalProcessorContext.taskId());

            final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler
                .handle(errorHandlerContext, record, e);

            if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
                log.error("Processing exception handler is set to fail upon" +
                        " a processing error. If you would rather have the streaming pipeline" +
                        " continue after a processing error, please set the " +
                        PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
                throw new StreamsException(e);

            } else {
                droppedRecordsSensor.record();
            }
        }

2 / append the original error message to the new Exception

  } catch (final Exception e) {
            final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
                internalProcessorContext.topic(),
                internalProcessorContext.partition(),
                internalProcessorContext.offset(),
                internalProcessorContext.headers(),
                internalProcessorContext.recordContext().rawRecord().key(),
                internalProcessorContext.recordContext().rawRecord().value(),
                internalProcessorContext.currentNode().name(),
                internalProcessorContext.taskId());

            final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler
                .handle(errorHandlerContext, record, e);

             if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
                throw new StreamsException(String.format("Processing exception handler is set to fail upon" +
                    " a processing error. If you would rather have the streaming pipeline" +
                    " continue after a processing error, please set the " +
                    PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately. %s", e.getMessage()),
                    e);
            } else {
                droppedRecordsSensor.record();
            }
        }

Or maybe you are thinking of another solution ?

sebastienviale avatar Jun 27 '24 08:06 sebastienviale

Is only the message the issue or also the type of the exception? What exception is passed to the old uncaught exception handler in the eos tests without this PR? Is it a StreamsException or the original exception?

cadonna avatar Jun 27 '24 08:06 cadonna

It seems to be a StreamsException:

https://github.com/apache/kafka/blob/8b6013f851fec537401f29769be5608c2d246747/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L754

Could you nevertheless verify it?

cadonna avatar Jun 27 '24 08:06 cadonna

Actually it is a StreamException but from line 752:

https://github.com/apache/kafka/blob/8b6013f851fec537401f29769be5608c2d246747/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L751-L755

sebastienviale avatar Jun 27 '24 09:06 sebastienviale

Actually it is a StreamException but from line 752:

https://github.com/apache/kafka/blob/8b6013f851fec537401f29769be5608c2d246747/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L751-L755

I meant without the processing exception handler code in this PR. Then it should be line 754, right?

cadonna avatar Jun 27 '24 09:06 cadonna

I executed the test from the trunk in debug mode, I confirm it is line 752

image

sebastienviale avatar Jun 27 '24 09:06 sebastienviale

Alright, that only the message is the issue regarding backwards compatibility.

cadonna avatar Jun 27 '24 11:06 cadonna

Regarding this comment https://github.com/apache/kafka/pull/16093#issuecomment-2194117929, what do you prefer ?

sebastienviale avatar Jun 27 '24 11:06 sebastienviale

@cadonna @sebastienviale

Before this PR, I think processing exceptions are caught and converted to StreamsException here: https://github.com/apache/kafka/blob/3ebad6349de7d121a31f9d47c5ede7d6bbfac4d1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L805

Thus the uncaught exception handler is receiving a StreamsException with the following message:

Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad happened...
	at com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32)
	at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:168)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)

➡️ If we want to respect the backward compatibility, we might need to throw this from the ProcessorNode#process:

 if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
        throw new StreamsException(
            String.format(
                "Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
                internalProcessorContext.taskId(),
                internalProcessorContext.currentNode().name(),
                internalProcessorContext.topic(),
                internalProcessorContext.partition(),
                internalProcessorContext.offset(),
                getStacktraceString(e)
            ),
            e
        );
    } else {
        droppedRecordsSensor.record();
    }

Note that this is not 100% backward compatible as the message received by the uncaught exception handler before the PR is:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad happened...

And after:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-MAPVALUES-0000000003, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad happened...

The processor name where the exception is caught is not the same (more accurate after the PR). This might be acceptable tho.. @cadonna ?

loicgreffier avatar Jun 27 '24 17:06 loicgreffier

@loicgreffier @cadonna

Seems ok for me but in this case we are losing the information:

Processing exception handler is set to fail upon a processing error. If you would rather have the streaming pipeline continue after a processing error, please set the PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG appropriately

sebastienviale avatar Jun 28 '24 08:06 sebastienviale

What do you think about the following proposal.

When the processing exception handler replies with FAIL, wrap the exception into an internal exception (to be created) named FailedProcessingException or simliar. This internal exception is ignored by upstream processing exception handlers. When it reaches the catch blocks in StreamTask, the wrapped processing exception is unwrapped, wrapped into a StreamsException, and re-thrown. The messages

Processing exception handler is set to fail upon a processing error. If you would rather have the streaming pipeline  continue after a processing error, please set the PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG  appropriately

is logged on error level where the processing exception handler's return value is checked (as in Sébastien's option 1 above) and contains the processing exception.

Since StreamsException is a public exception, users might think that it is a good idea to throw a StreamsException in their user code. Using a StreamsException would circumvent the processing exception handler because for a processor node such an exception comes from a downstream processor node and it is just re-thrown. We cannot completely avoid this with an internal exception, but an internal exception makes it clear that it should not be used in user code.

cadonna avatar Jun 28 '24 13:06 cadonna

What do you think about the following proposal.

When the processing exception handler replies with FAIL, wrap the exception into an internal exception (to be created) named FailedProcessingException or simliar. This internal exception is ignored by upstream processing exception handlers. When it reaches the catch blocks in StreamTask, the wrapped processing exception is unwrapped, wrapped into a StreamsException, and re-thrown. The messages

Processing exception handler is set to fail upon a processing error. If you would rather have the streaming pipeline  continue after a processing error, please set the PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG  appropriately

is logged on error level where the processing exception handler's return value is checked (as in Sébastien's option 1 above) and contains the processing exception.

Since StreamsException is a public exception, users might think that it is a good idea to throw a StreamsException in their user code. Using a StreamsException would circumvent the processing exception handler because for a processor node such an exception comes from a downstream processor node and it is just re-thrown. We cannot completely avoid this with an internal exception, but an internal exception makes it clear that it should not be used in user code.

cadonna avatar Jun 28 '24 13:06 cadonna

@sebastienviale @cadonna Completely agree. FailedProcessingException is in line with my previous answer and deal with this scenario I was not totally comfortable with:

The drawback is exceptions of type StreamsException won't be handled by the processing exception handler. Thus, if a user manually throws a StreamsException in one of its processor nodes, it won't be handled.

loicgreffier avatar Jun 28 '24 17:06 loicgreffier

@cadonna PR updated. FailedProcessingException has been introduced.

Note that: ➡️ 1. By default, FailedProcessingException appears in the logs:

2024-06-28T23:03:34.862+02:00 ERROR 31724 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [streams-map] Encountered the following exception during processing and sent shutdown request for the entire application.

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=org.apache.kafka.streams.processor.internals.FailedProcessingException: java.lang.RuntimeException: Something bad happened...
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:217)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
	at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
	at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95)
	at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:848)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
	at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:848)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778)
	at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
	at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1982)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1000)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: java.lang.RuntimeException: Something bad happened...
	at com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32)
	at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
	... 26 more

	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:804) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1982) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1000) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
Caused by: org.apache.kafka.streams.processor.internals.FailedProcessingException: java.lang.RuntimeException: Something bad happened...
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:217) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:848) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:848) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	... 6 common frames omitted
Caused by: java.lang.RuntimeException: Something bad happened...
	at com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32) ~[classes/:na]
	at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	... 26 common frames omitted

In case of FailedProcessingException, the cause is used to create the StreamsException so the stack trace stays the same before and after the PR:

Throwable processingException = e instanceof FailedProcessingException ? e.getCause() : e;

final StreamsException error = new StreamsException(
    String.format(
        "Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
        id(),
        processorContext.currentNode().name(),
        record.topic(),
        record.partition(),
        record.offset(),
        getStacktraceString(processingException)
    ),
    processingException

➡️ 2. The processing exception handler logs:

2024-06-28T23:03:34.743+02:00  WARN 31724 --- [-StreamThread-1] s.e.LogAndFailProcessingExceptionHandler : Exception caught during message processing, processor node: KSTREAM-MAPVALUES-0000000003, taskId: 0_0, source topic: PERSON_TOPIC, source partition: 0, source offset: 0

While le TaskExecutor logs:

2024-06-28T23:58:14.337+02:00 ERROR 28932 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [streams-map] Encountered the following exception during processing and sent shutdown request for the entire application.

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad happened...

Both log that an exception occurred in different processors.

Maybe this could be improved (or not) in another PR. The FailedProcessingException could be used to pass the precise processor node name where the exception occurred to the StreamTask#process.

loicgreffier avatar Jun 28 '24 22:06 loicgreffier

Maybe this could be improved (or not) in another PR. The FailedProcessingException could be used to pass the precise processor node name where the exception occurred to the StreamTask#process.

I agree, this is pre-existing behavior. You do not need to fix it in this PR. Could you please create a ticket for it?

cadonna avatar Jul 01 '24 14:07 cadonna

There is also a formatting violation in the builds.

[2024-06-28T23:07:39.905Z] > The following files had format violations:

[2024-06-28T23:07:39.905Z]       src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java

[2024-06-28T23:07:39.905Z]           @@ -33,8 +33,9 @@

[2024-06-28T23:07:39.905Z]            import·org.apache.kafka.streams.processor.api.ProcessorSupplier;

[2024-06-28T23:07:39.905Z]            import·org.apache.kafka.streams.processor.api.Record;

[2024-06-28T23:07:39.905Z]            import·org.apache.kafka.test.MockProcessorSupplier;

[2024-06-28T23:07:39.905Z]           +

[2024-06-28T23:07:39.905Z]           +import·org.junit.jupiter.api.Tag;

[2024-06-28T23:07:39.905Z]            import·org.junit.jupiter.api.Test;

[2024-06-28T23:07:39.905Z]           -import·org.junit.jupiter.api.Tag;

[2024-06-28T23:07:39.905Z]            import·org.junit.jupiter.api.Timeout;

[2024-06-28T23:07:39.905Z]            

[2024-06-28T23:07:39.905Z]            import·java.time.Duration;

[2024-06-28T23:07:39.905Z]   Run './gradlew :streams:spotlessApply' to fix these violations.

[2024-06-28T23:07:39.905Z] 

Some test failures seem related.

cadonna avatar Jul 01 '24 14:07 cadonna

Sorry, the following comment is not correct: https://github.com/apache/kafka/pull/16093#discussion_r1664053104

With my proposal we would get a processing exception handler per processor node. Let me re-iterate on this.

cadonna avatar Jul 03 '24 11:07 cadonna

I think it is fine to set the processing exception handler in the init() call.

cadonna avatar Jul 03 '24 12:07 cadonna

Thanks for the updates @loicgreffier , @sebastienviale , and @Dabz!

I just have one comment on my own proposal 🙂

For the rest LGTM!

Could you write a message to the mailing thread [DISCUSS] Apache Kafka 3.9.0 release asking if KIP-1033 could be added to that release? That is the last PR for KIP-1033, right?

@cadonna

Regarding KIP-1033, there are 3 PRs remaining:

  • https://github.com/apache/kafka/pull/16300: brings processing exception handling on punctuate
  • https://github.com/apache/kafka/pull/16432: bring ErrorHandlerContext on deserialization exception handler
  • https://github.com/apache/kafka/pull/16433: bring ErrorHandlerContext on production exception handler

All PRs would need to be rebased after this one has been merged to ease the review.

loicgreffier avatar Jul 19 '24 08:07 loicgreffier