KAFKA-16448: Catch and handle processing exceptions
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR actually catches processing exceptions.
Jira: https://issues.apache.org/jira/browse/KAFKA-16448.
Contributors
@Dabz @sebastienviale @loicgreffier
Depends On
https://github.com/apache/kafka/pull/16092 https://github.com/apache/kafka/pull/16090
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Thanks for the PR, some minor comments around code.
Thanks for the review @apoorvmittal10.
As mentioned, this PR is part of a biggest PR that is being divided (#15973).
It should be rebase (to ease the final review) and merged after the following PRs have been merged:
- #16090 reviewed by @cadonna
- #16092, not reviewed, but maybe not necessary as this PR can bring the changes as well (let me know)
Thanks for the PR, some minor comments around code.
Thanks for the review @apoorvmittal10.
As mentioned, this PR is part of a biggest PR that is being divided (#15973).
It should be rebase (to ease the final review) and merged after the following PRs have been merged:
- KAFKA-16448: Add ProcessingExceptionHandler interface and implementations #16090 reviewed by @cadonna
- KAFKA-16448: Add ProcessingExceptionHandler in Streams configuration #16092, not reviewed, but maybe not necessary as this PR can bring the changes as well (let me know)
Sounds good!
Thanks for the PR, @loicgreffier!
Here my feedback!
@cadonna The last PR update solved most of the points. I let some conversations open waiting for your feedback
@cadonna Updated! I think only this conversation remains actually open: https://github.com/apache/kafka/pull/16093#discussion_r1648627868
I restarted the build since it ran into a timeout.
The failures of org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFails* seem related to this PR.
The e.getMessage() does not contain "Injected test exception" anymore.
It is now getCause().getMessage() that contains "Injected test exception"
Fix is pushed
The e.getMessage() does not contain "Injected test exception" anymore.
It is now getCause().getMessage() that contains "Injected test exception"
Fix is pushed
@cadonna This is because now the processing exception is caught and wrapped into a StreamsException (when handler set to FAIL):
throw new StreamsException("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
e);
Thus, in the uncaught exception handler:
- The exception message is the message of the
StreamsException: "Processing exception handler is set to fail upon..." - The cause message is the actual processing exception message: "Injected test exception"
Maybe additional assertions should be added in these tests to validate the exception and the cause
The e.getMessage() does not contain "Injected test exception" anymore. It is now getCause().getMessage() that contains "Injected test exception" Fix is pushed
@cadonna This is because now the processing exception is caught and wrapped into a
StreamsException(when handler set to FAIL):throw new StreamsException("Processing exception handler is set to fail upon" + " a processing error. If you would rather have the streaming pipeline" + " continue after a processing error, please set the " + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", e);Thus, in the uncaught exception handler:
* The exception message is the message of the `StreamsException`: "Processing exception handler is set to fail upon..." * The cause message is the actual processing exception message: "Injected test exception"Maybe additional assertions should be added in these tests to validate the exception and the cause
I did not realize this! Thank you for raising this! Your code is breaking uncaught exception handler set with the old deprecated uncaught exception handler. I have to ask how we handle backward compatibility of deprecated APIs. I would say we need to be backwards compatible.
@cadonna
Thanks for the remarks, I did not realize that either!
I see two solutions for Backward compatibility:
1/ log an error and throw the original exception:
private final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
} catch (final Exception e) {
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.recordContext().rawRecord().key(),
internalProcessorContext.recordContext().rawRecord().value(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());
final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler
.handle(errorHandlerContext, record, e);
if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
throw new StreamsException(e);
} else {
droppedRecordsSensor.record();
}
}
2 / append the original error message to the new Exception
} catch (final Exception e) {
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.recordContext().rawRecord().key(),
internalProcessorContext.recordContext().rawRecord().value(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());
final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler
.handle(errorHandlerContext, record, e);
if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
throw new StreamsException(String.format("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately. %s", e.getMessage()),
e);
} else {
droppedRecordsSensor.record();
}
}
Or maybe you are thinking of another solution ?
Is only the message the issue or also the type of the exception? What exception is passed to the old uncaught exception handler in the eos tests without this PR? Is it a StreamsException or the original exception?
It seems to be a StreamsException:
https://github.com/apache/kafka/blob/8b6013f851fec537401f29769be5608c2d246747/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L754
Could you nevertheless verify it?
Actually it is a StreamException but from line 752:
https://github.com/apache/kafka/blob/8b6013f851fec537401f29769be5608c2d246747/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L751-L755
Actually it is a StreamException but from line 752:
https://github.com/apache/kafka/blob/8b6013f851fec537401f29769be5608c2d246747/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L751-L755
I meant without the processing exception handler code in this PR. Then it should be line 754, right?
I executed the test from the trunk in debug mode, I confirm it is line 752
Alright, that only the message is the issue regarding backwards compatibility.
Regarding this comment https://github.com/apache/kafka/pull/16093#issuecomment-2194117929, what do you prefer ?
@cadonna @sebastienviale
Before this PR, I think processing exceptions are caught and converted to StreamsException here:
https://github.com/apache/kafka/blob/3ebad6349de7d121a31f9d47c5ede7d6bbfac4d1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L805
Thus the uncaught exception handler is receiving a StreamsException with the following message:
Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad happened...
at com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:168)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
➡️ If we want to respect the backward compatibility, we might need to throw this from the ProcessorNode#process:
if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
throw new StreamsException(
String.format(
"Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
internalProcessorContext.taskId(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),
getStacktraceString(e)
),
e
);
} else {
droppedRecordsSensor.record();
}
Note that this is not 100% backward compatible as the message received by the uncaught exception handler before the PR is:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad happened...
And after:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-MAPVALUES-0000000003, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad happened...
The processor name where the exception is caught is not the same (more accurate after the PR). This might be acceptable tho.. @cadonna ?
@loicgreffier @cadonna
Seems ok for me but in this case we are losing the information:
Processing exception handler is set to fail upon a processing error. If you would rather have the streaming pipeline continue after a processing error, please set the PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG appropriately
What do you think about the following proposal.
When the processing exception handler replies with FAIL, wrap the exception into an internal exception (to be created) named FailedProcessingException or simliar. This internal exception is ignored by upstream processing exception handlers. When it reaches the catch blocks in StreamTask, the wrapped processing exception is unwrapped, wrapped into a StreamsException, and re-thrown.
The messages
Processing exception handler is set to fail upon a processing error. If you would rather have the streaming pipeline continue after a processing error, please set the PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG appropriately
is logged on error level where the processing exception handler's return value is checked (as in Sébastien's option 1 above) and contains the processing exception.
Since StreamsException is a public exception, users might think that it is a good idea to throw a StreamsException in their user code. Using a StreamsException would circumvent the processing exception handler because for a processor node such an exception comes from a downstream processor node and it is just re-thrown. We cannot completely avoid this with an internal exception, but an internal exception makes it clear that it should not be used in user code.
What do you think about the following proposal.
When the processing exception handler replies with FAIL, wrap the exception into an internal exception (to be created) named FailedProcessingException or simliar. This internal exception is ignored by upstream processing exception handlers. When it reaches the catch blocks in StreamTask, the wrapped processing exception is unwrapped, wrapped into a StreamsException, and re-thrown.
The messages
Processing exception handler is set to fail upon a processing error. If you would rather have the streaming pipeline continue after a processing error, please set the PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG appropriately
is logged on error level where the processing exception handler's return value is checked (as in Sébastien's option 1 above) and contains the processing exception.
Since StreamsException is a public exception, users might think that it is a good idea to throw a StreamsException in their user code. Using a StreamsException would circumvent the processing exception handler because for a processor node such an exception comes from a downstream processor node and it is just re-thrown. We cannot completely avoid this with an internal exception, but an internal exception makes it clear that it should not be used in user code.
@sebastienviale @cadonna Completely agree. FailedProcessingException is in line with my previous answer and deal with this scenario I was not totally comfortable with:
The drawback is exceptions of type
StreamsExceptionwon't be handled by the processing exception handler. Thus, if a user manually throws aStreamsExceptionin one of its processor nodes, it won't be handled.
@cadonna PR updated. FailedProcessingException has been introduced.
Note that:
➡️ 1. By default, FailedProcessingException appears in the logs:
2024-06-28T23:03:34.862+02:00 ERROR 31724 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [streams-map] Encountered the following exception during processing and sent shutdown request for the entire application.
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=org.apache.kafka.streams.processor.internals.FailedProcessingException: java.lang.RuntimeException: Something bad happened...
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:217)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:848)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:848)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778)
at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1982)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1000)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: java.lang.RuntimeException: Something bad happened...
at com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
... 26 more
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:804) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1982) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1000) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
Caused by: org.apache.kafka.streams.processor.internals.FailedProcessingException: java.lang.RuntimeException: Something bad happened...
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:217) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:848) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:848) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
... 6 common frames omitted
Caused by: java.lang.RuntimeException: Something bad happened...
at com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32) ~[classes/:na]
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
... 26 common frames omitted
In case of FailedProcessingException, the cause is used to create the StreamsException so the stack trace stays the same before and after the PR:
Throwable processingException = e instanceof FailedProcessingException ? e.getCause() : e;
final StreamsException error = new StreamsException(
String.format(
"Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
id(),
processorContext.currentNode().name(),
record.topic(),
record.partition(),
record.offset(),
getStacktraceString(processingException)
),
processingException
➡️ 2. The processing exception handler logs:
2024-06-28T23:03:34.743+02:00 WARN 31724 --- [-StreamThread-1] s.e.LogAndFailProcessingExceptionHandler : Exception caught during message processing, processor node: KSTREAM-MAPVALUES-0000000003, taskId: 0_0, source topic: PERSON_TOPIC, source partition: 0, source offset: 0
While le TaskExecutor logs:
2024-06-28T23:58:14.337+02:00 ERROR 28932 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [streams-map] Encountered the following exception during processing and sent shutdown request for the entire application.
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad happened...
Both log that an exception occurred in different processors.
Maybe this could be improved (or not) in another PR. The FailedProcessingException could be used to pass the precise processor node name where the exception occurred to the StreamTask#process.
Maybe this could be improved (or not) in another PR. The FailedProcessingException could be used to pass the precise processor node name where the exception occurred to the StreamTask#process.
I agree, this is pre-existing behavior. You do not need to fix it in this PR. Could you please create a ticket for it?
There is also a formatting violation in the builds.
[2024-06-28T23:07:39.905Z] > The following files had format violations:
[2024-06-28T23:07:39.905Z] src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
[2024-06-28T23:07:39.905Z] @@ -33,8 +33,9 @@
[2024-06-28T23:07:39.905Z] import·org.apache.kafka.streams.processor.api.ProcessorSupplier;
[2024-06-28T23:07:39.905Z] import·org.apache.kafka.streams.processor.api.Record;
[2024-06-28T23:07:39.905Z] import·org.apache.kafka.test.MockProcessorSupplier;
[2024-06-28T23:07:39.905Z] +
[2024-06-28T23:07:39.905Z] +import·org.junit.jupiter.api.Tag;
[2024-06-28T23:07:39.905Z] import·org.junit.jupiter.api.Test;
[2024-06-28T23:07:39.905Z] -import·org.junit.jupiter.api.Tag;
[2024-06-28T23:07:39.905Z] import·org.junit.jupiter.api.Timeout;
[2024-06-28T23:07:39.905Z]
[2024-06-28T23:07:39.905Z] import·java.time.Duration;
[2024-06-28T23:07:39.905Z] Run './gradlew :streams:spotlessApply' to fix these violations.
[2024-06-28T23:07:39.905Z]
Some test failures seem related.
Sorry, the following comment is not correct: https://github.com/apache/kafka/pull/16093#discussion_r1664053104
With my proposal we would get a processing exception handler per processor node. Let me re-iterate on this.
I think it is fine to set the processing exception handler in the init() call.
Thanks for the updates @loicgreffier , @sebastienviale , and @Dabz!
I just have one comment on my own proposal 🙂
For the rest LGTM!
Could you write a message to the mailing thread
[DISCUSS] Apache Kafka 3.9.0 releaseasking if KIP-1033 could be added to that release? That is the last PR for KIP-1033, right?
@cadonna
Regarding KIP-1033, there are 3 PRs remaining:
- https://github.com/apache/kafka/pull/16300: brings processing exception handling on punctuate
- https://github.com/apache/kafka/pull/16432: bring
ErrorHandlerContexton deserialization exception handler - https://github.com/apache/kafka/pull/16433: bring
ErrorHandlerContexton production exception handler
All PRs would need to be rebased after this one has been merged to ease the review.