Fails with uvloop
It looks like fsspec fails when used with uvloop, as is sometimes used with higher performing Dask workloads. Here is a non-minimal-but-reproducible error with Dask.
from dask.distributed import Client
import dask.dataframe as dd
client = Client()
df = dd.read_parquet("s3://sra-pub-sars-cov2-metadata-us-east-1/v2/tax_analysis/", storage_options={"anon": True})
df.compute()
---------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-1-841b4d3cd8fb> in <module>
3
4 client = Client()
----> 5 df = dd.read_parquet("s3://sra-pub-sars-cov2-metadata-us-east-1/v2/tax_analysis/", storage_options={"anon": True})
6 df.compute()
~/workspace/dask/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, **kwargs)
305 index = [index]
306
--> 307 read_metadata_result = engine.read_metadata(
308 fs,
309 paths,
~/workspace/dask/dask/dataframe/io/parquet/arrow.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, read_from_paths, **kwargs)
505 split_row_groups,
506 gather_statistics,
--> 507 ) = cls._gather_metadata(
508 paths,
509 fs,
~/workspace/dask/dask/dataframe/io/parquet/arrow.py in _gather_metadata(cls, paths, fs, split_row_groups, gather_statistics, filters, index, dataset_kwargs)
1676
1677 # Step 1: Create a ParquetDataset object
-> 1678 dataset, base, fns = _get_dataset_object(paths, fs, filters, dataset_kwargs)
1679 if fns == [None]:
1680 # This is a single file. No danger in gathering statistics
~/workspace/dask/dask/dataframe/io/parquet/arrow.py in _get_dataset_object(paths, fs, filters, dataset_kwargs)
1636 # existence of _metadata. Listing may be much more
1637 # expensive in storage systems like S3.
-> 1638 allpaths = fs.glob(paths[0] + fs.sep + "*")
1639 allpaths, base, fns = _sort_and_analyze_paths(allpaths, fs)
1640 dataset = pq.ParquetDataset(paths[0], filesystem=fs, filters=filters, **kwargs)
~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/spec.py in glob(self, path, **kwargs)
523 depth = None if "**" in path else path[ind + 1 :].count("/") + 1
524
--> 525 allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs)
526 # Escape characters special to python regex, leaving our supported
527 # special characters in place.
~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
119 def wrapper(*args, **kwargs):
120 self = obj or args[0]
--> 121 return maybe_sync(func, self, *args, **kwargs)
122
123 return wrapper
~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
98 if inspect.iscoroutinefunction(func):
99 # run the awaitable on the loop
--> 100 return sync(loop, func, *args, **kwargs)
101 else:
102 # just call the blocking function
~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, callback_timeout, *args, **kwargs)
69 if error[0]:
70 typ, exc, tb = error[0]
---> 71 raise exc.with_traceback(tb)
72 else:
73 return result[0]
~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in f()
53 if callback_timeout is not None:
54 future = asyncio.wait_for(future, callback_timeout)
---> 55 result[0] = await future
56 except Exception:
57 error[0] = sys.exc_info()
~/miniconda/envs/play/lib/python3.8/site-packages/s3fs/core.py in _find(self, path, maxdepth, withdirs, detail)
537 raise ValueError("Cannot traverse all of S3")
538 if maxdepth:
--> 539 return super().find(
540 bucket + "/" + key, maxdepth=maxdepth, withdirs=withdirs, detail=detail
541 )
~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/spec.py in find(self, path, maxdepth, withdirs, **kwargs)
432 out = dict()
433 detail = kwargs.pop("detail", False)
--> 434 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
435 if withdirs:
436 files.update(dirs)
~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/spec.py in walk(self, path, maxdepth, **kwargs)
381 detail = kwargs.pop("detail", False)
382 try:
--> 383 listing = self.ls(path, detail=True, **kwargs)
384 except (FileNotFoundError, IOError):
385 return [], [], []
~/miniconda/envs/play/lib/python3.8/site-packages/s3fs/core.py in ls(self, path, detail, refresh, **kwargs)
991 """
992 path = self._strip_protocol(path).rstrip("/")
--> 993 files = maybe_sync(self._ls, self, path, refresh=refresh)
994 if not files:
995 files = maybe_sync(self._ls, self, self._parent(path), refresh=refresh)
~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
91 if inspect.iscoroutinefunction(func):
92 # run coroutine while pausing this one (because we are within async)
---> 93 return _run_until_done(func(*args, **kwargs))
94 else:
95 # make awaitable which then calls the blocking function
~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in _run_until_done(coro)
29 runner = loop.create_task(coro)
30 while not runner.done():
---> 31 loop._run_once()
32 asyncio.tasks._current_tasks[loop] = task
33 return runner.result()
AttributeError: 'Loop' object has no attribute '_run_once'
In [2]: debug
> /home/mrocklin/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py(31)_run_until_done()
29 runner = loop.create_task(coro)
30 while not runner.done():
---> 31 loop._run_once()
32 asyncio.tasks._current_tasks[loop] = task
33 return runner.result()
ipdb> pp loop
<uvloop.Loop running=True closed=False debug=False>
ipdb> pp loop
Please try on master and s3fs main - loop._run_once() was removed in the big refactor.
Please try on master and s3fs main - loop._run_once() was removed in the big refactor.
@martindurant is it possible to have a new release that includes this new refactor (it is hard for downstream to refer revisions on github, since pypi doesn't accept those references)?
There was a tag yesterday ( https://github.com/intake/filesystem_spec/releases/tag/0.9.0 ). Would you be able to retest and report back @isidentical?
@jakirkham Looks like you probably meant to tag @mrocklin ?
Batuhan was asking for a release in the previous comment. Hence why it was directed to him (there has been a release).
Though happy to have more people testing/letting us know whether this is resolved or not :)
sorry to revive this old thread but fsspec today does work with uvloop (in my usecase at least with gcsfs).
at the beginning of your app:
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
Then fs.loop shows its a uvloop instance.
However, if you create the loop manually and pass loop=new_uv_loop, it thread locks.
However, if you create the loop manually and pass loop=new_uv_loop
In this case, you should create the FS instance inside a coroutine, to ensure everything is set up right.