adlfs
adlfs copied to clipboard
[BUG] async not working?
What happened:
As a warmup, we've been trying to do async fs.exists() for abfs and are unsure if async is not supported, version issue, bug on our end, etc
What you expected to happen:
Async existence check returns True / False, not an exception
Minimal Complete Verifiable Example:
fs = fsspec.filesystem('abfs', asynchronous=True, account_name='...', account_key='...')
#session = await fs.set_session() # exn: no method defined
await fs.exists('abfs://container/folder/file') # exn: Loop is not running
Anything else we need to know?:
Environment:
- Dask version:
2021.04.0 - fsspec:
2021.06.01 - adlfs:
v0.7.3 - Python version: 3.7
- Operating System: Ubuntu
- Install method (conda, pip, source): conda (rapids 0.19) + updates
At this point in time, the asynchronous parameter introduced by fsspec will not enable adlfs to be called asynchronously (i.e. using the await keyword). Instead, operations within the package are executed asynchronously, such that traditional sync calls (i.e. fs.exists()) triggers async execution under the hood.
Adding direct async calls under the hood is on the roadmap.
Hmm, this makes me unsure about safety in a concurrent environment. Both in general, and especially for async=True, how should we think about it?
- Any file system / fsspec safety/consistency issues when concurrent dask tasks call remote read caching?
- If we have concurrent fsspec IO dask tasks + regular dask compute tasks, is it safe to interleave / how?
- ... or it's all unsafe, so need to be a blocking call on a singular main process, and we flip async=True to allow for some of the internal async?
For example, would something like this make sense for remote cached reads to free up a dask worker for compute while fsspec does IO on it?
def fn_actual(queue):
fs = fsspec.filesystem('abfs', asynchronous=True, account_name='...', account_key='...')
# ... remote read cached call .... # <-- consistency safe?
ok = fs.exists('abfs://container/folder/file')
q.put(ok) # assert ok
def fn_task_entry():
secede() # free dask worker for compute tasks while IO proceeds
queue = Queue()
p = Process(target=fn_actual, args=(queue))
p.start()
out = q.get()
p.join()
rejoin()
return out
# multiple clients call this concurrently:
exists = await dask_client.gather([dask_client.submit(fn_task_entry)])
Another thought it is we separate IO workers from compute workers via worker resource labels, where each core counts as 1 IO resource + 1 compute resource, and keep fsspec calls as blocking. (That assumes concurrent async use across workers is safe... or maybe we have 1 dedicated fsspec worker...).
I found one chunk in the docs about this, . You should not mix block- and file-caches in the same directory, and Only “simplecache” is guaranteed thread/process-safe. (https://filesystem-spec.readthedocs.io/en/latest/features.html#caching-files-locally)
Any guidance here? Sounds like we can do this async off-main-thread, but cannot interleave filecache writes, so should enforce a global fsspec writelock on those tasks
Regarding caching. The default caching for AzureBlobFile is "bytes", but that can be set as a parameter.
Implementing a global lock in fsspec would need to involve @martindurant.
The idea of separating IO and compute workers is interesting. I'd like to see what the impact to execution time is, since IO is occurring off the main thread in the current implementation.
If you examine fsspec's AsyncFileSystem, you'll see that setting asynchronous=False sets AsyncFileSystem._loop = None. I'm assuming in async operations that the loop does not need to be passed in explicitly, but I'd like to get @martindurant 's input on this.
@lmeyerov , have you tried (note the underscore)
await fs._exists('abfs://container/folder/file')
?
This is how it is spelled in the other async implementations. In async mode, you must not call any of the methods that call sync().
Sounds like we can do this async off-main-thread, but cannot interleave filecache writes, so should enforce a global fsspec writelock on those tasks
Simpler than implementing and exposing a lock, would be to make the filecache threadsafe. It's the updating of the metadata file that can cause (occasional) races, if someone wants to have a go. (It is also possible that two threads download the same file simultaneously, which is not critical, but would be wasteful). The same is true for blockcache, although it should be checked whether the same file can be opened from multiple threads.
I'm assuming in async operations that the loop does not need to be passed in explicitly, but I'd like to get @martindurant 's input on this.
Correct, if you have asynchronous=True, you should be using await and coroutines only, and the loop is the current running loop. By default, fsspec has its own loop in a thread. Tthe parameter is there for two cases:
- the user wants another thread/loop to run fsspec for sync calls
- you will use async mode, but the instance was created outside of the loop (not recommended!)
Yes, we have talked about the possibility to treat IO tasks differently in dask, but no concrete work yet. In some cases, IO can use significant CPU (e.g., HTTP automatic compression/encryption); what really sets IO apart is high latency. It may be reasonable for a dask thread to secede during an fsspec blocking call.
In async mode, you must not call any of the methods that call sync().
FYI - At the moment, the constructor doesn't play particularly nice here, as it sets up an exit handler that calls sync:
weakref.finalize(self, sync, self.loop, close_service_client, self)
on line: https://github.com/fsspec/adlfs/blob/092685f102c5cd215550d10e8347e5bce0e2b93d/adlfs/spec.py#L319
So it's possible to use async mode and call the _ prefixed async functions directly, but it does generate some undesirable noise on garbage collection.
the constructor doesn't play particularly nice here, as it sets up an exit handler that calls sync
s3fs and gcsfs have complex cleanup methods to deal with whether it gets called from async, and whether the loop is still running. The simplest approach is to not register a cleanup when in async mode and require calling code to do the cleanup itself.
The simplest approach is to not register a cleanup when in async mode and require calling code to do the cleanup itself.
Agreed - the noise on gc is the only problem I'm having using the adlfs package in async mode - everything else appears to work fine.
Hi all, I've been dealing with the same lately. The thing I've ended up doing is I simply silenced the RuntimeError raised in the finalizer and called the cleanup finalizer's supposed to do myself.
I've also noticed that if I finish with my work and discard the fs object, when I try to connect again, it fails, because it reuses the previously-used instance. This was failing my tests.
Calling fs.do_connect() eventually fixed this issue for me.
We've been trying to use adlfs asynchronously (using underscore-prefixed coroutines) and as I've said previously, apart from the annoying RuntimeErrors raised in the weakref finalizers, everything goes well. But currently, we are unable to combine this approach with filecache implementation (because filecache uses non-underscore-prefixed methods instead and raises RuntimeErorrs again). Is there any way we could make use of async in adlfs while still be able to use filecache filesystem?
I agree, it would be nice to have an async version of SimpleCache (and perhaps the others, although they are far less used). For the one class, it should not be too hard to write. It would require explicit instantiation, though, since there is no good way right now to decide on async or otherwise when automatically constructing filesystem instances from chained URLs.