azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

[BUG] java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-1, when using uploadWithResponse method of BlobClient

Open RichaNegi opened this issue 1 year ago • 3 comments

Describe the bug We are uploading file to azure using uploadWithResponse of BlobClient, then we download the filestream and upload the same file to a new location using the same uploadWithResponse method. When calling the function uploadWithResponse of BlobClient during the second upload using the filestream(from azure), we get the error: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-1

This error occurs for files > 2MB.

Exception or Stack Trace "Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-1","\tat reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)","\tat reactor.core.publisher.Mono.block(Mono.java:1712)","\tat com.azure.storage.blob.specialized.BlobInputStream.dispatchRead(BlobInputStream.java:79)","\tat com.azure.storage.common.StorageInputStream.readInternal(StorageInputStream.java:344)","\tat com.azure.storage.common.StorageInputStream.read(StorageInputStream.java:317)","\tat com.azure.core.util.FluxUtil.lambda$toFluxByteBuffer$6(FluxUtil.java:298)","\tat reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:271)","\tat reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:213)","\tat reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.request(FluxFilterFuseable.java:411)","\tat reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)","\tat reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerComplete(FluxConcatMapNoPrefetch.java:274)","\tat reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:887)","\tat reactor.core.publisher.FluxConcatMap$WeakScalarSubscription.request(FluxConcatMap.java:480)","\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)","\tat reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338)","\tat reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.request(FluxSwitchOnFirst.java:704)","\tat reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:471)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.request(FluxMergeSequential.java:298)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:471)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.request(FluxMergeSequential.java:298)","\tat reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:430)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.innerComplete(FluxMergeSequential.java:335)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialInner.onComplete(FluxMergeSequential.java:591)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)","\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:147)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)","\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:246)","\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)","\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)","\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)","\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236)","\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)","\tat reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)","\tat reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:159)","\tat reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)","\tat reactor.core.publisher.FluxCallable.subscribe(FluxCallable.java:40)","\tat reactor.core.publisher.Mono.subscribe(Mono.java:4495)","\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)","\tat reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)","\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)","\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)","\tat reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onNext(FluxDoOnEach.java:173)","\tat reactor.core.publisher.FluxDoOnEach$DoOnEachFuseableSubscriber.onNext(FluxDoOnEach.java:281)","\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)","\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)","\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)","\tat reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)","\tat reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)","\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)","\tat reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2811)","\tat reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180)","\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)","\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)","\tat reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)","\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)","\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2547)","\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2341)","\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2215)","\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)","\tat reactor.core.publisher.Mono.subscribe(Mono.java:4495)","\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)","\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)","\tat reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223)","\tat reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)","\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)","\tat reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:211)","\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)","\tat reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)","\tat reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:118)","\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)","\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)","\tat reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415)","\tat reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446)","\tat reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:500)","\tat reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:768)","\tat reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)","\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)","\tat io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)","\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)","\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)","\tat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)","\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)","\tat io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)","\tat io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1349)","\tat io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1389)","\tat io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)","\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)","\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)","\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)","\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)","\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)","\tat io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)","\tat io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)","\tat io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)","\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)","\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)","\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)","\t... 1 more","\tSuppressed: java.lang.Exception: #block terminated with an error","\t\tat reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:103)","\t\tat reactor.core.publisher.Mono.block(Mono.java:1712)","\t\tat com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:156)","\t\tat com.azure.storage.blob.BlobClient.uploadWithResponse(BlobClient.java:337)","\t\tat

To Reproduce Steps to reproduce the behavior: Upload the file using uploadWithResponse method of BlobClient, download the filestream from azure and try to upload again to a new location.

Code Snippet

//To get input stream
BlobClientBase versionBlobClient = blockBlobClient.getVersionClient(versionId);
InputStream inputStream versionBlobClient.openInputStream();

//Upload
BlobClient blobClient = client.getBlobContainerClient(bucketName).getBlobClient(fileName);

ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions().setBlockSizeLong(1024L)
				.setMaxSingleUploadSizeLong(2048L)
				.setMaxConcurrency(5);
BlobParallelUploadOptions parallelUploadOptions = new BlobParallelUploadOptions(inputStream)
				.setParallelTransferOptions(parallelTransferOptions);
try {
	blobClient.uploadWithResponse(parallelUploadOptions, null, null);
}

Expected behavior No exception.

Screenshots If applicable, add screenshots to help explain your problem.

Setup (please complete the following information):

  • Library/Libraries: com.azure:azure-storage-blob:12.20.1
  • Java version: 17
  • App Server/Environment: Tomcat
  • Frameworks: Spring Boot

If you suspect a dependency version mismatch (e.g. you see NoClassDefFoundError, NoSuchMethodError or similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:

  • verbose dependency tree (mvn dependency:tree -Dverbose)
  • exception message, full stack trace, and any available logs

Additional context Add any other context about the problem here.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • [x] Bug Description Added
  • [x] Repro Steps Added
  • [x] Setup information Added

RichaNegi avatar Feb 28 '24 10:02 RichaNegi

@alzimmermsft could you help Storage by taking a first look at this?

/cc @ibrahimrabab

joshfree avatar Feb 28 '24 15:02 joshfree

Thanks for reporting this @RichaNegi

This issue appears due to the InputStream returned by the BlobClient blocking downloads from the service but when the InputStream is passed to the upload method the stream is converted from InputStream to Flux<ByteBuffer> due to the sync-over-async nature of the Storage SDKs.

For now, I'll work with you on a workaround for this. Is there a reason that this code pattern is used rather than the Copy Blob REST API?

alzimmermsft avatar Feb 28 '24 18:02 alzimmermsft

Hello @alzimmermsft , and thank you for your feedback and proposal. Rational on our side for such use case is our framework is doing streaming between various source type and destination, like sftp to sftp or sftp to blobstore. In this ticket situation it is blob to blob. We could potentially update our code to use "copy" instead of "uploadWithResponse", can you please tell us if behind the scene it would bring some optimizations or any additional benefits, or would it only fix this issue?

bferrenq avatar Feb 29 '24 08:02 bferrenq

Hi @alzimmermsft where can we check the new version delivered from that change?

baiglin avatar Mar 05 '24 08:03 baiglin

@ibrahimrabab when release is done this month can you update this issue with the version containing the fix.

alzimmermsft avatar Mar 05 '24 13:03 alzimmermsft

Hello @alzimmermsft , Thank you for the fix. Can you please tell us if behind the scene using "copy" instead of "uploadWithResponse" would bring some optimizations or any additional benefits?

bferrenq avatar Mar 11 '24 12:03 bferrenq

Hi @bferrenq,

Using [https://learn.microsoft.com/rest/api/storageservices/copy-blob](Copy Blob) or [https://learn.microsoft.com/rest/api/storageservices/copy-blob-from-url](Copy Blob from URL) have the benefits where you don't need to make download and upload requests, letting the service itself handle the copying. From the get-go, this can reduce the number of API calls you're making and removes your own network from needing to handle the downloading and uploading.

That said, there are more nuances to the copying operations that need to be accounted for, and I would recommend reading the REST API documentation on those APIs for a better understanding.

alzimmermsft avatar Mar 11 '24 12:03 alzimmermsft

Hi @alzimmermsft "removes your own network from needing to handle the downloading and uploading." would be indeed amazing, hence my clarification request, will read your links. Thank you!

bferrenq avatar Mar 11 '24 13:03 bferrenq

Hi @alzimmermsft , Thanks for the fix. Do you have any updates on when we can expect the new version?

RichaNegi avatar Mar 20 '24 06:03 RichaNegi

Hello @ibrahimrabab, do you know if the fix was released and the number if it is the case ? Thanks

baiglin avatar Apr 04 '24 13:04 baiglin

@baiglin @RichaNegi Hi there, we are planning our GA release for sometime in the next week or so. I will update on this thread once the release has been rolled out. Thanks!

ibrahimrabab avatar Apr 11 '24 22:04 ibrahimrabab

Hello @ibrahimrabab , I took the latest GA release, imo bom 1.2.23, that is linked to azure-storage-blob 12.25.4 that does NOT contain fix. Is this a mistake, can we safely override azure-storage-blob and azure-storage-common to use more recent version containing the fix? Thx for your support, Benoit

bferrenq avatar May 27 '24 12:05 bferrenq

This issue also exists with the StorageImplUtils.blockWithOptionalTimeout() method that's currently breaking things for me

h4xmd avatar May 31 '24 13:05 h4xmd

Hello, One more hint to reproduce the issue. In our situation, this is happening only while doing a copy of large files (>300 MiB).

bferrenq avatar May 31 '24 14:05 bferrenq