EventProcessorClient with azure-messaging-eventhubs-checkpointstore-blob >= 1.13.0 does not run
Running the sample code (below) runs great with azure-messaging-eventhubs-checkpointstore-blob 1.12.2 or earlier, but with 1.13.0 or 1.14.0 it fails with errors
2022-07-12 15:15:41,532 [reactor-http-kqueue-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
com.azure.storage.blob.models.BlobStorageException: Status code 412, "<?xml version="1.0" encoding="utf-8"?><Error><Code>ConditionNotMet</Code><Message>The condition specified using HTTP conditional header(s) is not met.
RequestId:eb78e682-901e-008a-3a23-96f346000000
Time:2022-07-12T19:15:41.7583851Z</Message></Error>"
at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:627)
at com.azure.core.implementation.http.rest.ResponseExceptionConstructorCache.invoke(ResponseExceptionConstructorCache.java:56)
at com.azure.core.implementation.http.rest.RestProxyBase.instantiateUnexpectedException(RestProxyBase.java:378)
at com.azure.core.implementation.http.rest.AsyncRestProxy.lambda$ensureExpectedStatus$1(AsyncRestProxy.java:116)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:126)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:129)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:703)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1373)
at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1247)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1287)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:383)
at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:211)
at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:289)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
2022-07-12 15:15:41,537 [reactor-http-kqueue-4] WARN com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - {"az.sdk.message":"Failed to claim ownership.","exception":"Status code 412, \"<?xml version=\"1.0\" encoding=\"utf-8\"?><Error><Code>ConditionNotMet</Code><Message>The condition specified using HTTP conditional header(s) is not met.\nRequestId:4b11df8e-d01e-000d-6b23-966023000000\nTime:2022-07-12T19:15:41.7531291Z</Message></Error>\"","partitionId":"4"}
2022-07-12 15:15:41,538 [reactor-http-kqueue-4] WARN com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Error while listing checkpoints
Sample code:
public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
System.out.printf("Processing event from partition %s with sequence number %d %n",
eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber());
System.out.printf(eventContext.getEventData().getBodyAsString());
if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {
eventContext.updateCheckpoint();
}
};
public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
};
public static void main(String args[]) throws InterruptedException {
Logger log = Logger.getRootLogger();
log.setLevel(Level.WARN);
PatternLayout layout = new PatternLayout("%d{ISO8601} [%t] %-5p %c %x - %m%n");
log.addAppender(new ConsoleAppender(layout));
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString(STORAGE_CONNECTION_STRING)
.containerName(storageContainerName)
// .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS))
.buildAsyncClient();
EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
.connectionString(EH_CONNECTION_STRING)
.consumerGroup(consumerGroupName)
.processEvent(PARTITION_PROCESSOR)
.processError(ERROR_HANDLER)
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
// Starts the event processor
eventProcessorClient.start();
log.warn("eventProcessorClient started, waiting for events. "+ eventProcessorClient.getIdentifier());
Thread.sleep(1000 * 60 * 3); // run for 3 min.
eventProcessorClient.stop();
}
with:
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.12.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.30.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.18.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative</artifactId>
<version>2.0.53.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>2.0.53.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
I run into same issue. Rollback version solves the issue so far.
Thanks for reporting this... we've also noticed it and are investigating.
We are also facing this issue. Started coming recently with newer versions, from what we've seen.
Bumped into the same issue. Please resolve asap. 👍
getting the same issue
To update this thread, the 412 error is normal as part of load balancing. Before the July release, we logged these and continued onwards. In the July release, we logged these errors in addition to calling the user's error handler. To align with other languages' client library behaviour, and because there is no actionable item for the user to take, we're thinking about reverting to the original behaviour.
Hello @conniey what is the solution you are proposing for it? we are using azure-messaging-eventhub:5.13.0, azure-messaging-eventhub-checkpointstore-blob: 1.15.0, azure-storage-blob: 12.19.0,
We are still seeing same issue, could you please assist us, and in my case it is not even reading any messages from EH. Thanks in advance.
@conniey, we are also seeing the same issue happening more frequently. If 412 error is a normal procedure while load balancing, why is it logged as an error?
@conniey I still see the issue. Only way to get it to working is rolling back to previous version. Your comment says "we're thinking about reverting to the original behaviour." - could you please tell me what you are going to revert and when it will happen? Thanks.
tagging @joshfree for getting attention and visibility, thanks.
@conniey
I verified with the azure-messaging-eventhubs-checkpointstore-blob v1.15.1, and an exception is still thrown and passed to the user's error handler. If the error 412 is part of the normal load balancing, it is not an error, and because of that, it should not be passed to the user's error handler. Anyway, there is nothing a user can do about it.
Greetings, I can concur with zeljko that the message still shows up.
{"az.sdk.message":"Failed to claim ownership.","exception":"Status code 412, \"?<?xml version=\"1.0\" encoding=\"utf-8\"?><Error><Code>ConditionNotMet</Code><Message>The condition specified using HTTP conditional header(s) is not met.\nRequestId:c79c60bd-301e-004c-0a24-ea02b1000000\nTime:2022-10-27T16:53:22.8486534Z</Message></Error>\"","partitionId":"1"}
com.azure.storage.blob.models.BlobStorageException: Status code 412, "?<?xml version="1.0" encoding="utf-8"?><Error><Code>ConditionNotMet</Code><Message>The condition specified using HTTP conditional header(s) is not met.
RequestId:c79c60bd-301e-004c-0a24-ea02b1000000
Time:2022-10-27T16:53:22.8486534Z</Message></Error>"
If this isn't an error, it would be great that someone could change it from error to warning.
Is there any ETA for this to happen?
Thank you for your time. Have a nice day.
The follow-up 412 error issue is tracked in #31672. We will work a fix for it.