azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

EventProcessorClient with azure-messaging-eventhubs-checkpointstore-blob >= 1.13.0 does not run

Open gsharon2022 opened this issue 3 years ago • 5 comments

Running the sample code (below) runs great with azure-messaging-eventhubs-checkpointstore-blob 1.12.2 or earlier, but with 1.13.0 or 1.14.0 it fails with errors

2022-07-12 15:15:41,532 [reactor-http-kqueue-2] ERROR reactor.core.publisher.Operators  - Operator called default onErrorDropped
com.azure.storage.blob.models.BlobStorageException: Status code 412, "<?xml version="1.0" encoding="utf-8"?><Error><Code>ConditionNotMet</Code><Message>The condition specified using HTTP conditional header(s) is not met.
RequestId:eb78e682-901e-008a-3a23-96f346000000
Time:2022-07-12T19:15:41.7583851Z</Message></Error>"
	at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:627)
	at com.azure.core.implementation.http.rest.ResponseExceptionConstructorCache.invoke(ResponseExceptionConstructorCache.java:56)
	at com.azure.core.implementation.http.rest.RestProxyBase.instantiateUnexpectedException(RestProxyBase.java:378)
	at com.azure.core.implementation.http.rest.AsyncRestProxy.lambda$ensureExpectedStatus$1(AsyncRestProxy.java:116)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:126)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:129)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
	at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:703)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1373)
	at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1247)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1287)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
	at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:383)
	at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:211)
	at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:289)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
2022-07-12 15:15:41,537 [reactor-http-kqueue-4] WARN  com.azure.messaging.eventhubs.PartitionBasedLoadBalancer  - {"az.sdk.message":"Failed to claim ownership.","exception":"Status code 412, \"<?xml version=\"1.0\" encoding=\"utf-8\"?><Error><Code>ConditionNotMet</Code><Message>The condition specified using HTTP conditional header(s) is not met.\nRequestId:4b11df8e-d01e-000d-6b23-966023000000\nTime:2022-07-12T19:15:41.7531291Z</Message></Error>\"","partitionId":"4"}
2022-07-12 15:15:41,538 [reactor-http-kqueue-4] WARN  com.azure.messaging.eventhubs.PartitionBasedLoadBalancer  - Error while listing checkpoints

Sample code:

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
         System.out.printf("Processing event from partition %s with sequence number %d %n",
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber());
        System.out.printf(eventContext.getEventData().getBodyAsString());
        if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };

    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
    };

    public static void main(String args[]) throws InterruptedException {
        Logger log = Logger.getRootLogger();
        log.setLevel(Level.WARN);
        PatternLayout layout = new PatternLayout("%d{ISO8601} [%t] %-5p %c %x - %m%n");
        log.addAppender(new ConsoleAppender(layout));

        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                .connectionString(STORAGE_CONNECTION_STRING)
                .containerName(storageContainerName)
//                .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS))
                .buildAsyncClient();

        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
                .connectionString(EH_CONNECTION_STRING)
                .consumerGroup(consumerGroupName)
                .processEvent(PARTITION_PROCESSOR)
                .processError(ERROR_HANDLER)
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
        // Starts the event processor
        eventProcessorClient.start();
        log.warn("eventProcessorClient started, waiting for events. "+ eventProcessorClient.getIdentifier());
        Thread.sleep(1000 * 60 * 3); // run for 3 min.
        eventProcessorClient.stop();
}

with:

    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs</artifactId>
            <version>5.12.2</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core</artifactId>
            <version>1.30.0</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core-http-netty</artifactId>
            <version>1.12.3</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core-amqp</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-storage-blob</artifactId>
            <version>12.18.0</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-tcnative</artifactId>
            <version>2.0.53.Final</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-tcnative-boringssl-static</artifactId>
            <version>2.0.53.Final</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
    </dependencies>

gsharon2022 avatar Jul 12 '22 19:07 gsharon2022

I run into same issue. Rollback version solves the issue so far.

katagaStyle avatar Jul 21 '22 07:07 katagaStyle

Thanks for reporting this... we've also noticed it and are investigating.

conniey avatar Jul 26 '22 00:07 conniey

We are also facing this issue. Started coming recently with newer versions, from what we've seen.

shubhambhattar avatar Aug 01 '22 10:08 shubhambhattar

Bumped into the same issue. Please resolve asap. 👍

kul avatar Aug 01 '22 10:08 kul

getting the same issue

gauravdhim1990 avatar Aug 02 '22 06:08 gauravdhim1990

To update this thread, the 412 error is normal as part of load balancing. Before the July release, we logged these and continued onwards. In the July release, we logged these errors in addition to calling the user's error handler. To align with other languages' client library behaviour, and because there is no actionable item for the user to take, we're thinking about reverting to the original behaviour.

conniey avatar Aug 17 '22 17:08 conniey

Hello @conniey what is the solution you are proposing for it? we are using azure-messaging-eventhub:5.13.0, azure-messaging-eventhub-checkpointstore-blob: 1.15.0, azure-storage-blob: 12.19.0,

We are still seeing same issue, could you please assist us, and in my case it is not even reading any messages from EH. Thanks in advance.

lovababu avatar Sep 12 '22 10:09 lovababu

@conniey, we are also seeing the same issue happening more frequently. If 412 error is a normal procedure while load balancing, why is it logged as an error?

tinolazreg avatar Sep 14 '22 11:09 tinolazreg

@conniey I still see the issue. Only way to get it to working is rolling back to previous version. Your comment says "we're thinking about reverting to the original behaviour." - could you please tell me what you are going to revert and when it will happen? Thanks.

kumaran-sowrirajan avatar Sep 14 '22 21:09 kumaran-sowrirajan

tagging @joshfree for getting attention and visibility, thanks.

lovababu avatar Sep 17 '22 06:09 lovababu

@conniey

I verified with the azure-messaging-eventhubs-checkpointstore-blob v1.15.1, and an exception is still thrown and passed to the user's error handler. If the error 412 is part of the normal load balancing, it is not an error, and because of that, it should not be passed to the user's error handler. Anyway, there is nothing a user can do about it.

zeljko-mirovic avatar Sep 30 '22 12:09 zeljko-mirovic

Greetings, I can concur with zeljko that the message still shows up.

{"az.sdk.message":"Failed to claim ownership.","exception":"Status code 412, \"?<?xml version=\"1.0\" encoding=\"utf-8\"?><Error><Code>ConditionNotMet</Code><Message>The condition specified using HTTP conditional header(s) is not met.\nRequestId:c79c60bd-301e-004c-0a24-ea02b1000000\nTime:2022-10-27T16:53:22.8486534Z</Message></Error>\"","partitionId":"1"}
com.azure.storage.blob.models.BlobStorageException: Status code 412, "?<?xml version="1.0" encoding="utf-8"?><Error><Code>ConditionNotMet</Code><Message>The condition specified using HTTP conditional header(s) is not met.
RequestId:c79c60bd-301e-004c-0a24-ea02b1000000
Time:2022-10-27T16:53:22.8486534Z</Message></Error>"

If this isn't an error, it would be great that someone could change it from error to warning.

Is there any ETA for this to happen?

Thank you for your time. Have a nice day.

acarlstein avatar Oct 27 '22 16:10 acarlstein

The follow-up 412 error issue is tracked in #31672. We will work a fix for it.

liukun-msft avatar Nov 03 '22 06:11 liukun-msft