pulsar-java-spring-boot-starter
pulsar-java-spring-boot-starter copied to clipboard
[BUG] - Exception passed to PulsarErrorHandler is always of type InvocationTargetException
Describe the bug When an exception is thrown in a Consumer, I expect to retrieve that exception in the FailedMessage passed to the PulsarErrorHandler. Instead, an exception of type InvocationTargetException is always passed. This is a problem when logging that exception cause with some tools it makes it hard to distinguish the different errors, for example in rollbar I see a list of errors that seems all the same:

And I have to click on each one and check the call stack to find out which is the "real" error.
To Reproduce Steps to reproduce the behaviour:
- Setup a PulsarErrorHandler
- Throw an exception
- Observe that is always passed an InvocationTargetException in the failed message
Expected behaviour The real exception cause should be passed in the failed message
Additional context
The issue is in the try catch of the ConsumerAggregator.subscribe()
method:
private Consumer<?> subscribe(String generatedConsumerName, ConsumerHolder holder) {
try {
final String consumerName = stringValueResolver.resolveStringValue(holder.getAnnotation().consumerName());
final String subscriptionName = stringValueResolver.resolveStringValue(holder.getAnnotation().subscriptionName());
final String topicName = stringValueResolver.resolveStringValue(holder.getAnnotation().topic());
final String namespace = stringValueResolver.resolveStringValue(holder.getAnnotation().namespace());
final SubscriptionType subscriptionType = urlBuildService.getSubscriptionType(holder);
final ConsumerBuilder<?> consumerBuilder = pulsarClient
.newConsumer(SchemaUtils.getSchema(holder.getAnnotation().serialization(),
holder.getAnnotation().clazz()))
.consumerName(urlBuildService.buildPulsarConsumerName(consumerName, generatedConsumerName))
.subscriptionName(urlBuildService.buildPulsarSubscriptionName(subscriptionName, generatedConsumerName))
.topic(urlBuildService.buildTopicUrl(topicName, namespace))
.subscriptionType(subscriptionType)
.subscriptionInitialPosition(holder.getAnnotation().initialPosition())
.messageListener((consumer, msg) -> {
try {
final Method method = holder.getHandler();
method.setAccessible(true);
if (holder.isWrapped()) {
method.invoke(holder.getBean(), wrapMessage(msg));
} else {
method.invoke(holder.getBean(), msg.getValue());
}
consumer.acknowledge(msg);
} catch (Exception e) { // --> It will be always of type InvocationTargetException!!
consumer.negativeAcknowledge(msg);
sink.tryEmitNext(new FailedMessage(e, consumer, msg));
}
});
Since the try {} is on the reflection method "invoke()" the resulting exception will be always an InvocationTargetException, while the "real" exception should be in the e.cause property.
A solution can be to build the FailedMessage passing cause:
sink.tryEmitNext(new FailedMessage(e.cause, consumer, msg));
As a workaround we are logging the exception cause in the PulsarErrorHandler instead of the exception itself, but I think that the right behaviour should to pass the real cause in the FailedMessage. The fix could be a breaking change if others are already logging the exception cause as a workaround like we are doing
Thanks, @shodo, this is a great point!
Actually, there is a use case where the exception is not the type of InvocationTarget
- for example, when you set a wrong type for your incoming parameter, the cause will be null
then. However, I improved this and added another getter called consumerException
that will return a cause, and that will handle this use case + it won't be breaking change. This will be merged with linked PR.
Thanks a lot!
Hi @majusko!!
I've seen there is a conflict in your PR between the test:
testBasicDeadLetterRetryPolicy
and
testMessageError2Handling
The test of the retry policy enqueues three exceptions to the consumerAggregator.sink that are not handled by any consumerAggregator.onError handler.
So when you register the onError handler in the messageError2 test, you won't receive the expected NullPointerException but you receive the previous exception in the sink:
final Disposable disposable = consumerAggregator.onError(($) -> {
//All this assertions are failing and the receivedError guard won't be set to True!!
Assertions.assertEquals($.getConsumer().getTopic(), urlBuildService.buildTopicUrl("topic-for-error-2"));
Assertions.assertEquals($.getMessage().getValue(), messageToSend);
Assertions.assertNotNull($.getException());
Assertions.assertNotNull($.getConsumerException());
receivedError.set(true);
});
I've solved setting an empty handler also in the policy test, so that exceptions are dequeued from the sink:
@Test
void testBasicDeadLetterRetryPolicy() throws PulsarClientException {
producer.send("topic-retry", new MyMsg(VALIDATION_STRING));
final Disposable disposable = consumerAggregator.onError(($) -> {}); //just to dequeue exceptions from the sink
await().untilTrue(testConsumers.mockRetryCountListenerReceived);
Assertions.assertEquals(3, testConsumers.failTwiceRetryCount.get());
await().untilTrue(testConsumerInterceptor.onAckTimeoutSendReceived);
disposable.dispose();
}