nextflow
nextflow copied to clipboard
Error while publishing directories on azure
Bug report
Expected behavior and actual behavior
On the microsoft azure platform, when a process outputs a directory containing files and subfolders, nextflow fails to copy these to the publishDir. The files are uploaded from the worker node to the process working directory, but not copied to the publishDir. Copying single files to the publishDir works without problems.
Steps to reproduce the problem
nextflow.enable.dsl=2
process sayHello {
publishDir "${params.outdir}", mode: "copy"
input:
val x
script:
"""
echo '$x world!'
mkdir -p hello-results
cd hello-results
echo "$x" > "${x}.txt"
"""
output:
path "hello-results" , emit: results
}
workflow {
sayHello("hello")
}
nextflow run main.nf -c azure.config -w az://myblobstorage/work --outdir az://myblobstorage/results
Program output
com.azure.storage.blob.models.BlobStorageException: Status code 400, "<?xml version="1.0" encoding="utf-8"?><Error><Code>InvalidInput</Code><Message>One of the request inputs is not valid.
RequestId:719fe262-f01e-010b-0cce-074b26000000
Time:2022-01-12T16:10:20.9587019Z</Message></Error>"
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at com.azure.core.http.rest.RestProxy.instantiateUnexpectedException(RestProxy.java:334)
at com.azure.core.http.rest.RestProxy.lambda$ensureExpectedStatus$5(RestProxy.java:375)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:320)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:337)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onSubscribe(MonoCacheTime.java:276)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:191)
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:178)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:96)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:121)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:374)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:373)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:429)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:655)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1526)
at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1287)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1324)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Mono.block(Mono.java:1680)
at com.azure.core.util.polling.PollerFlux.lambda$new$1(PollerFlux.java:95)
at com.azure.core.util.polling.DefaultSyncPoller.<init>(DefaultSyncPoller.java:71)
at com.azure.core.util.polling.PollerFlux.getSyncPoller(PollerFlux.java:202)
at com.azure.storage.blob.specialized.BlobClientBase.beginCopy(BlobClientBase.java:382)
at com.azure.storage.blob.specialized.BlobClientBase.beginCopy(BlobClientBase.java:357)
at com.azure.storage.blob.specialized.BlobClientBase.beginCopy(BlobClientBase.java:318)
at nextflow.cloud.azure.nio.AzFileSystem.copy(AzFileSystem.groovy:412)
at nextflow.cloud.azure.nio.AzFileSystemProvider.copy(AzFileSystemProvider.groovy:431)
at nextflow.file.FileHelper.copyPath(FileHelper.groovy:934)
at nextflow.processor.PublishDir.processFileImpl(PublishDir.groovy:420)
at nextflow.processor.PublishDir.processFile(PublishDir.groovy:333)
at nextflow.processor.PublishDir.safeProcessFile(PublishDir.groovy:319)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:107)
at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:323)
at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1268)
at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1035)
at org.codehaus.groovy.runtime.InvokerHelper.invokePogoMethod(InvokerHelper.java:1029)
at org.codehaus.groovy.runtime.InvokerHelper.invokeMethod(InvokerHelper.java:1012)
at org.codehaus.groovy.runtime.InvokerHelper.invokeMethodSafe(InvokerHelper.java:101)
at nextflow.processor.PublishDir$_apply1_closure1.doCall(PublishDir.groovy:292)
at nextflow.processor.PublishDir$_apply1_closure1.call(PublishDir.groovy)
at groovy.lang.Closure.run(Closure.java:493)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 common frames omitted```
### Environment
* Nextflow version: 21.12.1-edge build 5653
* Java version: Groovy 3.0.9 on OpenJDK 64-Bit Server VM 11.0.13+8-Ubuntu-0ubuntu1.20.04
* Operating system: Linux 5.11.0-1021-azure
* Bash version: 5.0.17
This error is very likely due to a permission issue in the target blob storage rather than to NF itself.
I just run your exactly same code without any problem. And the hello-results
dir is correctly published to the blob I indicated as outdir
.
![results](https://user-images.githubusercontent.com/2822686/149189136-dafe0826-5110-418c-aaf8-8ac9b463bbc2.png)
I think I found out more about the source of this error. When looking at the azure storage logs i can see a CopyBlob operation with a directory source instead of a file. CopyBlob does not support recursivly uploading directories so its return the 400 error.
Somewhere in the publish process nextflow does not recognise the path as a directory. And instead of looping over the directory contents and publishing each file it tries to upload the directory as a file.
The weird thing is that i cannot reproduce this behavior in a newly created azure environment.
I have added two print statement in FileHelper.copyPath to distinguish between the file and directory branch:
https://github.com/nextflow-io/nextflow/blob/807a816a5319d06002f13c64ec332aa10cf7ee33/modules/nf-commons/src/main/nextflow/file/FileHelper.groovy#L913-L942
In my "fresh" environment where everything works fine i get:
Jan-14 13:43:49.512 [FileTransfer-1] INFO nextflow.file.FileHelper - Copy source is directory: /test/work/d3/c741c704d6a51dff74945bb77ba73d/hello-results
In my original environment i get this:
Jan-14 13:05:28.036 [FileTransfer-1] INFO nextflow.file.FileHelper - Copy source is file: /scrnapipeline/nextflow/work/9c/269f96f0c7e5fe81dc9549eaab1b46/hello-results
For some reason the Files.isDirectoy call returns the wrong result
Really hard to tell why this happens. I assume the two environments are the same, as well as the code that you run. I can't reproduce it, so I'm afraid I can't further investigate.
The directory concept does not exist in Azure blob container, therefore it's simulated creating a file .azure_blob_dir
https://github.com/nextflow-io/nextflow/blob/635dc2db19bcdd3d0f6d40f8344415acf01e9916/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystem.groovy#L307-L315
I'm not sure 100% sure but it could be a glitch in the directory detected here
https://github.com/nextflow-io/nextflow/blob/635dc2db19bcdd3d0f6d40f8344415acf01e9916/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystem.groovy#L366-L389
the best way to fix would be to replicate the problem here
https://github.com/nextflow-io/nextflow/blob/635dc2db19bcdd3d0f6d40f8344415acf01e9916/plugins/nf-azure/src/test/nextflow/cloud/azure/nio/AzNioTest.groovy
The current code seems to expect that directories are not valid blob objects and the AzFileAttributes must raise an exception to get detected as a directory.
https://github.com/nextflow-io/nextflow/blob/635dc2db19bcdd3d0f6d40f8344415acf01e9916/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystem.groovy#L433-L444
For some reason some directories in my environment use a different way to encode directories. A directory can also be an empty blob with metadata key "hdi_isfolder": "true":
{
"content": "",
"deleted": false,
"metadata": {
"hdi_isfolder": "true"
},
"name": "work/e4/502f03597a0bfc86c9aede8d16e68e/hello-results",
"properties": {
"appendBlobCommittedBlockCount": null,
"blobTier": "Hot",
"blobTierChangeTime": null,
"blobTierInferred": true,
"blobType": "BlockBlob",
"contentLength": 0,
"contentRange": null,
"contentSettings": {
"cacheControl": null,
"contentDisposition": null,
"contentEncoding": null,
"contentLanguage": null,
"contentMd5": null,
"contentType": null
},
"copy": {
"completionTime": null,
"id": null,
"progress": null,
"source": null,
"status": null,
"statusDescription": null
},
"creationTime": "2022-01-17T08:46:08+00:00",
"deletedTime": null,
"etag": "\"0x8D9D995CE6C61FE\"",
"lastModified": "2022-01-17T08:46:08+00:00",
"lease": {
"duration": null,
"state": "available",
"status": "unlocked"
},
"pageBlobSequenceNumber": null,
"pageRanges": null,
"remainingRetentionDays": null,
"serverEncrypted": true
},
"snapshot": null
}
When I modify the AzFileAttributes(BlobClient) constructor to this the file/folder detection works in my environment.
AzFileAttributes(BlobClient client) {
final props = client.getProperties()
objectId = "/${client.containerName}/${client.blobName}"
creationTime = time(props.getCreationTime())
updateTime = time(props.getLastModified())
directory = client.blobName.endsWith('/')
size = props.getBlobSize()
// Check for empty blobs with hdi_isfolder metadata key
final meta = props.getMetadata()
if (meta.containsKey("hdi_isfolder") && props.getBlobSize() == 0) {
directory = meta.get("hdi_isfolder")
}
}
I think that this issue can be closed based on the PR @fbdtemme submitted, right?
It is not, I think @manuelesimi accidently linked to this issue instead of #2563 in that merge request (#2576).
Adding this to AzFileAttributes(BlobClient) constructor works in my azure environment.
// Check for empty blobs with hdi_isfolder metadata key
final meta = props.getMetadata()
if (meta.containsKey("hdi_isfolder") && props.getBlobSize() == 0) {
directory = meta.get("hdi_isfolder")
}
But I am not sure if I should make a pull request since I cannot reproduce the problem in a test environment.
Ah, I see - thanks for the update Florian. Let me take a look and circle back.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Whoops, forgot to update here. Lately, I've been working extensively with the Azure Batch env and haven't seen this issue occur with Nextflow v22.04
or v22.06.1-edge
which is working as expected.
I just wanted to note that I have the same problem publishing directories on Az storage. Environment: Nextflow v.23.04.0 Storage: Hierarchical Namespace (Datalake Gen 2)
I assume this will be fixed by https://github.com/nextflow-io/nextflow/pull/4046 . I will come back and verify that when I update that code.