nextflow
nextflow copied to clipboard
Support including existing files within watchPath?
New feature
It does not seem like there is a perfect way of picking up existing files and any new files from a root path using a combination of the two channel factories - watchPath and fromPath. Calling them one after the other presents synchronization issues between the two calls. If watchPath is called first, then you may get duplicates, and if it is called second, then you may drop some files created between the calls.
Usage scenario
This is useful when a separate pipeline is generating data that is supposed to be consumed by a Nextflow workflow. That separate pipeline could have been started some time ago, having already generated some files, so the Nextflow workflow needs to catch up and then stay current.
A workflow from epi2me-labs uses watchPath in a way that can drop files.
Suggest implementation
Seems to me that adding an option to watchPath would be the way to go? Unless there is an easy workaround already.
Happy to work on this if people want.
Channel.fromPath is essentially a channel wrapper over the files() function. It might be easier to use that instead to drop duplicates
@bentsherman I don't understand your comment. How may I be able to use the files() function to achieve synchronization across the call to itself and the setting up of the watchPath channel.
My current code looks something like this (assuming that asSychronized() is working as I expect it to):
workflow {
String pattern = 'input/*'
String stop_filename = 'STOP'
Set<String> input_names = ([] as Set<String>).asSynchronized()
new_inputs = Channel
.watchPath(pattern)
.until { it.name == stop_filename }
.filter { input_names.add(it.name) }
new_inputs.view { "New ${it.name}" }
existing_inputs = Channel
.fromPath(pattern)
.filter { input_names.add(it.name) }
existing_inputs.view { "Existing ${it.name}" }
inputs = existing_inputs.mix(new_inputs)
inputs.view()
}
I was imagining that you would query the set of existing files and use that to filter incoming files from watchPath:
def existing_inputs = files(pattern)
def new_inputs = Channel.watchPath(pattern)
.filter { file -> file !in existing_inputs }
def inputs = Channel.of(existing_inputs).mix(new_inputs)
You should also be able to achieve your example with a concurrent set:
def input_names = Collections.newSetFromMap(new ConcurrentHashMap<String>())
I think I understand what you are trying to say with using files, but I think there is some miscommunication. I have the opposite problem.
Consider the following scenarios:
# Scenario 1 - no issues
files created
start watching
list all files
files created
# Scenario 2 - no issues
files created
list all files
start watching
files created
# Scenario 3 - file 'foo' will be missed
files created
list all files
file 'foo' created
start watching
files created
# Scenario 4 - file 'foo' will be counted twice
files created
start watching
file 'foo' created
list all files
files created
I am just looking for a way to list all current and future files matching the pattern without dropping a file or counting a file twice.
You might be able to modify watchPath to (1) start watching files (2) query existing files (3) emit existing files that weren't already caught by the watcher
But taking a step back, the filesystem doesn't seem like a good fit for this kind of event streaming. watchPath is good for simple use cases but eventually it becomes better to handle event dispatch at a higher level, e.g. deploy an external service that launches a Nextflow pipeline whenever an object is uploaded to an S3 bucket
Of course you might be more constrained in an HPC context
I get your point that events, even file creation events, can be obtained from other sources other than the file system. But right now there does not seem to be a mechanism to define channels from such event streams, like pub/sub on cloud providers. Moreover, as you said, it is not easy to set up something like this in local HPC context.
The example you give of launching a new Nextflow pipeline for an event is applicable only when the pipeline takes only one file as an input, but in the majority of use cases, each file will be an element in a channel.
Short of a more formal way to consume events from the outside, I think modifying watchPath is a better solution here. Meanwhile, the workaround that I posted above seems to be working well.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Closing as not planned. The watchPath factory is a useful convenience for simple cases, but the best practice is to use an external event system that triggers a nextflow run on a batch of events, rather than having nextflow listen for events indefinitely