azure-sdk-for-java
azure-sdk-for-java copied to clipboard
[BUG] java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-1, when using uploadWithResponse method of BlobClient
Describe the bug We are uploading file to azure using uploadWithResponse of BlobClient, then we download the filestream and upload the same file to a new location using the same uploadWithResponse method. When calling the function uploadWithResponse of BlobClient during the second upload using the filestream(from azure), we get the error: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-1
This error occurs for files > 2MB.
Exception or Stack Trace "Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-1","\tat reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)","\tat reactor.core.publisher.Mono.block(Mono.java:1712)","\tat com.azure.storage.blob.specialized.BlobInputStream.dispatchRead(BlobInputStream.java:79)","\tat com.azure.storage.common.StorageInputStream.readInternal(StorageInputStream.java:344)","\tat com.azure.storage.common.StorageInputStream.read(StorageInputStream.java:317)","\tat com.azure.core.util.FluxUtil.lambda$toFluxByteBuffer$6(FluxUtil.java:298)","\tat reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:271)","\tat reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:213)","\tat reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.request(FluxFilterFuseable.java:411)","\tat reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)","\tat reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerComplete(FluxConcatMapNoPrefetch.java:274)","\tat reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:887)","\tat reactor.core.publisher.FluxConcatMap$WeakScalarSubscription.request(FluxConcatMap.java:480)","\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)","\tat reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338)","\tat reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.request(FluxSwitchOnFirst.java:704)","\tat reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:471)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.request(FluxMergeSequential.java:298)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:471)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.request(FluxMergeSequential.java:298)","\tat reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:430)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.innerComplete(FluxMergeSequential.java:335)","\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialInner.onComplete(FluxMergeSequential.java:591)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)","\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:147)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)","\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:246)","\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)","\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)","\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)","\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236)","\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)","\tat reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)","\tat reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:159)","\tat reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)","\tat reactor.core.publisher.FluxCallable.subscribe(FluxCallable.java:40)","\tat reactor.core.publisher.Mono.subscribe(Mono.java:4495)","\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)","\tat reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)","\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)","\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)","\tat reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onNext(FluxDoOnEach.java:173)","\tat reactor.core.publisher.FluxDoOnEach$DoOnEachFuseableSubscriber.onNext(FluxDoOnEach.java:281)","\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)","\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)","\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)","\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)","\tat reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)","\tat reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)","\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)","\tat reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2811)","\tat reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180)","\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)","\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)","\tat reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)","\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)","\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2547)","\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2341)","\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2215)","\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)","\tat reactor.core.publisher.Mono.subscribe(Mono.java:4495)","\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)","\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)","\tat reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223)","\tat reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)","\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)","\tat reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:211)","\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)","\tat reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)","\tat reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:118)","\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)","\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)","\tat reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415)","\tat reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446)","\tat reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:500)","\tat reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:768)","\tat reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)","\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)","\tat io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)","\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)","\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)","\tat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)","\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)","\tat io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)","\tat io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1349)","\tat io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1389)","\tat io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)","\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)","\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)","\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)","\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)","\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)","\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)","\tat io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)","\tat io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)","\tat io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)","\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)","\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)","\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)","\t... 1 more","\tSuppressed: java.lang.Exception: #block terminated with an error","\t\tat reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:103)","\t\tat reactor.core.publisher.Mono.block(Mono.java:1712)","\t\tat com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:156)","\t\tat com.azure.storage.blob.BlobClient.uploadWithResponse(BlobClient.java:337)","\t\tat
To Reproduce Steps to reproduce the behavior: Upload the file using uploadWithResponse method of BlobClient, download the filestream from azure and try to upload again to a new location.
Code Snippet
//To get input stream
BlobClientBase versionBlobClient = blockBlobClient.getVersionClient(versionId);
InputStream inputStream versionBlobClient.openInputStream();
//Upload
BlobClient blobClient = client.getBlobContainerClient(bucketName).getBlobClient(fileName);
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions().setBlockSizeLong(1024L)
.setMaxSingleUploadSizeLong(2048L)
.setMaxConcurrency(5);
BlobParallelUploadOptions parallelUploadOptions = new BlobParallelUploadOptions(inputStream)
.setParallelTransferOptions(parallelTransferOptions);
try {
blobClient.uploadWithResponse(parallelUploadOptions, null, null);
}
Expected behavior No exception.
Screenshots If applicable, add screenshots to help explain your problem.
Setup (please complete the following information):
- Library/Libraries: com.azure:azure-storage-blob:12.20.1
- Java version: 17
- App Server/Environment: Tomcat
- Frameworks: Spring Boot
If you suspect a dependency version mismatch (e.g. you see NoClassDefFoundError, NoSuchMethodError or similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:
- verbose dependency tree (
mvn dependency:tree -Dverbose) - exception message, full stack trace, and any available logs
Additional context Add any other context about the problem here.
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- [x] Bug Description Added
- [x] Repro Steps Added
- [x] Setup information Added
@alzimmermsft could you help Storage by taking a first look at this?
/cc @ibrahimrabab
Thanks for reporting this @RichaNegi
This issue appears due to the InputStream returned by the BlobClient blocking downloads from the service but when the InputStream is passed to the upload method the stream is converted from InputStream to Flux<ByteBuffer> due to the sync-over-async nature of the Storage SDKs.
For now, I'll work with you on a workaround for this. Is there a reason that this code pattern is used rather than the Copy Blob REST API?
Hello @alzimmermsft , and thank you for your feedback and proposal. Rational on our side for such use case is our framework is doing streaming between various source type and destination, like sftp to sftp or sftp to blobstore. In this ticket situation it is blob to blob. We could potentially update our code to use "copy" instead of "uploadWithResponse", can you please tell us if behind the scene it would bring some optimizations or any additional benefits, or would it only fix this issue?
Hi @alzimmermsft where can we check the new version delivered from that change?
@ibrahimrabab when release is done this month can you update this issue with the version containing the fix.
Hello @alzimmermsft , Thank you for the fix. Can you please tell us if behind the scene using "copy" instead of "uploadWithResponse" would bring some optimizations or any additional benefits?
Hi @bferrenq,
Using [https://learn.microsoft.com/rest/api/storageservices/copy-blob](Copy Blob) or [https://learn.microsoft.com/rest/api/storageservices/copy-blob-from-url](Copy Blob from URL) have the benefits where you don't need to make download and upload requests, letting the service itself handle the copying. From the get-go, this can reduce the number of API calls you're making and removes your own network from needing to handle the downloading and uploading.
That said, there are more nuances to the copying operations that need to be accounted for, and I would recommend reading the REST API documentation on those APIs for a better understanding.
Hi @alzimmermsft "removes your own network from needing to handle the downloading and uploading." would be indeed amazing, hence my clarification request, will read your links. Thank you!
Hi @alzimmermsft , Thanks for the fix. Do you have any updates on when we can expect the new version?
Hello @ibrahimrabab, do you know if the fix was released and the number if it is the case ? Thanks
@baiglin @RichaNegi Hi there, we are planning our GA release for sometime in the next week or so. I will update on this thread once the release has been rolled out. Thanks!
Hello @ibrahimrabab , I took the latest GA release, imo bom 1.2.23, that is linked to azure-storage-blob 12.25.4 that does NOT contain fix. Is this a mistake, can we safely override azure-storage-blob and azure-storage-common to use more recent version containing the fix? Thx for your support, Benoit
This issue also exists with the StorageImplUtils.blockWithOptionalTimeout() method that's currently breaking things for me
Hello, One more hint to reproduce the issue. In our situation, this is happening only while doing a copy of large files (>300 MiB).