pulsar-java-spring-boot-starter icon indicating copy to clipboard operation
pulsar-java-spring-boot-starter copied to clipboard

[BUG] - Exception passed to PulsarErrorHandler is always of type InvocationTargetException

Open shodo opened this issue 2 years ago • 4 comments

Describe the bug When an exception is thrown in a Consumer, I expect to retrieve that exception in the FailedMessage passed to the PulsarErrorHandler. Instead, an exception of type InvocationTargetException is always passed. This is a problem when logging that exception cause with some tools it makes it hard to distinguish the different errors, for example in rollbar I see a list of errors that seems all the same:

Screenshot 2022-06-10 at 10 16 30

And I have to click on each one and check the call stack to find out which is the "real" error.

To Reproduce Steps to reproduce the behaviour:

  1. Setup a PulsarErrorHandler
  2. Throw an exception
  3. Observe that is always passed an InvocationTargetException in the failed message

Expected behaviour The real exception cause should be passed in the failed message

Additional context The issue is in the try catch of the ConsumerAggregator.subscribe() method:

private Consumer<?> subscribe(String generatedConsumerName, ConsumerHolder holder) {
        try {
            final String consumerName = stringValueResolver.resolveStringValue(holder.getAnnotation().consumerName());
            final String subscriptionName = stringValueResolver.resolveStringValue(holder.getAnnotation().subscriptionName());
            final String topicName = stringValueResolver.resolveStringValue(holder.getAnnotation().topic());
            final String namespace = stringValueResolver.resolveStringValue(holder.getAnnotation().namespace());
            final SubscriptionType subscriptionType = urlBuildService.getSubscriptionType(holder);
            final ConsumerBuilder<?> consumerBuilder = pulsarClient
                .newConsumer(SchemaUtils.getSchema(holder.getAnnotation().serialization(),
                    holder.getAnnotation().clazz()))
                .consumerName(urlBuildService.buildPulsarConsumerName(consumerName, generatedConsumerName))
                .subscriptionName(urlBuildService.buildPulsarSubscriptionName(subscriptionName, generatedConsumerName))
                .topic(urlBuildService.buildTopicUrl(topicName, namespace))
                .subscriptionType(subscriptionType)
                .subscriptionInitialPosition(holder.getAnnotation().initialPosition())
                .messageListener((consumer, msg) -> {
                    try {
                        final Method method = holder.getHandler();
                        method.setAccessible(true);

                        if (holder.isWrapped()) {
                            method.invoke(holder.getBean(), wrapMessage(msg));
                        } else {
                            method.invoke(holder.getBean(), msg.getValue());
                        }

                        consumer.acknowledge(msg);
                    } catch (Exception e) { // --> It will be always of type InvocationTargetException!!
                        consumer.negativeAcknowledge(msg);
                        sink.tryEmitNext(new FailedMessage(e, consumer, msg));
                    }
                });

Since the try {} is on the reflection method "invoke()" the resulting exception will be always an InvocationTargetException, while the "real" exception should be in the e.cause property.

A solution can be to build the FailedMessage passing cause:

sink.tryEmitNext(new FailedMessage(e.cause, consumer, msg));

As a workaround we are logging the exception cause in the PulsarErrorHandler instead of the exception itself, but I think that the right behaviour should to pass the real cause in the FailedMessage. The fix could be a breaking change if others are already logging the exception cause as a workaround like we are doing

shodo avatar Jun 10 '22 08:06 shodo

Thanks, @shodo, this is a great point!

majusko avatar Jun 17 '22 09:06 majusko

Actually, there is a use case where the exception is not the type of InvocationTarget - for example, when you set a wrong type for your incoming parameter, the cause will be null then. However, I improved this and added another getter called consumerException that will return a cause, and that will handle this use case + it won't be breaking change. This will be merged with linked PR.

majusko avatar Sep 03 '22 22:09 majusko

Thanks a lot!

shodo avatar Sep 24 '22 10:09 shodo

Hi @majusko!! I've seen there is a conflict in your PR between the test: testBasicDeadLetterRetryPolicy and testMessageError2Handling

The test of the retry policy enqueues three exceptions to the consumerAggregator.sink that are not handled by any consumerAggregator.onError handler.
So when you register the onError handler in the messageError2 test, you won't receive the expected NullPointerException but you receive the previous exception in the sink:

  final Disposable disposable = consumerAggregator.onError(($) -> {
  //All this assertions are failing and the receivedError guard won't be set to True!!
            Assertions.assertEquals($.getConsumer().getTopic(), urlBuildService.buildTopicUrl("topic-for-error-2"));
            Assertions.assertEquals($.getMessage().getValue(), messageToSend);
            Assertions.assertNotNull($.getException());
            Assertions.assertNotNull($.getConsumerException());

            receivedError.set(true);
        });

I've solved setting an empty handler also in the policy test, so that exceptions are dequeued from the sink:

 @Test
    void testBasicDeadLetterRetryPolicy() throws PulsarClientException {

        producer.send("topic-retry", new MyMsg(VALIDATION_STRING));

        final Disposable disposable = consumerAggregator.onError(($) -> {}); //just to dequeue exceptions from the sink

        await().untilTrue(testConsumers.mockRetryCountListenerReceived);

        Assertions.assertEquals(3, testConsumers.failTwiceRetryCount.get());

        await().untilTrue(testConsumerInterceptor.onAckTimeoutSendReceived);

        disposable.dispose();
    }

shodo avatar Sep 24 '22 12:09 shodo