filesystem_spec icon indicating copy to clipboard operation
filesystem_spec copied to clipboard

Fails with uvloop

Open mrocklin opened this issue 4 years ago • 7 comments

It looks like fsspec fails when used with uvloop, as is sometimes used with higher performing Dask workloads. Here is a non-minimal-but-reproducible error with Dask.

from dask.distributed import Client
import dask.dataframe as dd

client = Client()
df = dd.read_parquet("s3://sra-pub-sars-cov2-metadata-us-east-1/v2/tax_analysis/", storage_options={"anon": True})
df.compute()
---------------------------------------------------------------------
AttributeError                      Traceback (most recent call last)
<ipython-input-1-841b4d3cd8fb> in <module>
      3 
      4 client = Client()
----> 5 df = dd.read_parquet("s3://sra-pub-sars-cov2-metadata-us-east-1/v2/tax_analysis/", storage_options={"anon": True})
      6 df.compute()

~/workspace/dask/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, **kwargs)
    305         index = [index]
    306 
--> 307     read_metadata_result = engine.read_metadata(
    308         fs,
    309         paths,

~/workspace/dask/dask/dataframe/io/parquet/arrow.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, read_from_paths, **kwargs)
    505             split_row_groups,
    506             gather_statistics,
--> 507         ) = cls._gather_metadata(
    508             paths,
    509             fs,

~/workspace/dask/dask/dataframe/io/parquet/arrow.py in _gather_metadata(cls, paths, fs, split_row_groups, gather_statistics, filters, index, dataset_kwargs)
   1676 
   1677         # Step 1: Create a ParquetDataset object
-> 1678         dataset, base, fns = _get_dataset_object(paths, fs, filters, dataset_kwargs)
   1679         if fns == [None]:
   1680             # This is a single file. No danger in gathering statistics

~/workspace/dask/dask/dataframe/io/parquet/arrow.py in _get_dataset_object(paths, fs, filters, dataset_kwargs)
   1636         #       existence of _metadata.  Listing may be much more
   1637         #       expensive in storage systems like S3.
-> 1638         allpaths = fs.glob(paths[0] + fs.sep + "*")
   1639         allpaths, base, fns = _sort_and_analyze_paths(allpaths, fs)
   1640         dataset = pq.ParquetDataset(paths[0], filesystem=fs, filters=filters, **kwargs)

~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/spec.py in glob(self, path, **kwargs)
    523             depth = None if "**" in path else path[ind + 1 :].count("/") + 1
    524 
--> 525         allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs)
    526         # Escape characters special to python regex, leaving our supported
    527         # special characters in place.

~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
    119     def wrapper(*args, **kwargs):
    120         self = obj or args[0]
--> 121         return maybe_sync(func, self, *args, **kwargs)
    122 
    123     return wrapper

~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
     98         if inspect.iscoroutinefunction(func):
     99             # run the awaitable on the loop
--> 100             return sync(loop, func, *args, **kwargs)
    101         else:
    102             # just call the blocking function

~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, callback_timeout, *args, **kwargs)
     69     if error[0]:
     70         typ, exc, tb = error[0]
---> 71         raise exc.with_traceback(tb)
     72     else:
     73         return result[0]

~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in f()
     53             if callback_timeout is not None:
     54                 future = asyncio.wait_for(future, callback_timeout)
---> 55             result[0] = await future
     56         except Exception:
     57             error[0] = sys.exc_info()

~/miniconda/envs/play/lib/python3.8/site-packages/s3fs/core.py in _find(self, path, maxdepth, withdirs, detail)
    537             raise ValueError("Cannot traverse all of S3")
    538         if maxdepth:
--> 539             return super().find(
    540                 bucket + "/" + key, maxdepth=maxdepth, withdirs=withdirs, detail=detail
    541             )

~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/spec.py in find(self, path, maxdepth, withdirs, **kwargs)
    432         out = dict()
    433         detail = kwargs.pop("detail", False)
--> 434         for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
    435             if withdirs:
    436                 files.update(dirs)

~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/spec.py in walk(self, path, maxdepth, **kwargs)
    381         detail = kwargs.pop("detail", False)
    382         try:
--> 383             listing = self.ls(path, detail=True, **kwargs)
    384         except (FileNotFoundError, IOError):
    385             return [], [], []

~/miniconda/envs/play/lib/python3.8/site-packages/s3fs/core.py in ls(self, path, detail, refresh, **kwargs)
    991         """
    992         path = self._strip_protocol(path).rstrip("/")
--> 993         files = maybe_sync(self._ls, self, path, refresh=refresh)
    994         if not files:
    995             files = maybe_sync(self._ls, self, self._parent(path), refresh=refresh)

~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
     91         if inspect.iscoroutinefunction(func):
     92             # run coroutine while pausing this one (because we are within async)
---> 93             return _run_until_done(func(*args, **kwargs))
     94         else:
     95             # make awaitable which then calls the blocking function

~/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py in _run_until_done(coro)
     29     runner = loop.create_task(coro)
     30     while not runner.done():
---> 31         loop._run_once()
     32     asyncio.tasks._current_tasks[loop] = task
     33     return runner.result()

AttributeError: 'Loop' object has no attribute '_run_once'

In [2]: debug
> /home/mrocklin/miniconda/envs/play/lib/python3.8/site-packages/fsspec/asyn.py(31)_run_until_done()
     29     runner = loop.create_task(coro)
     30     while not runner.done():
---> 31         loop._run_once()
     32     asyncio.tasks._current_tasks[loop] = task
     33     return runner.result()

ipdb> pp loop
<uvloop.Loop running=True closed=False debug=False>
ipdb> pp loop

mrocklin avatar Apr 01 '21 14:04 mrocklin

Please try on master and s3fs main - loop._run_once() was removed in the big refactor.

martindurant avatar Apr 01 '21 15:04 martindurant

Please try on master and s3fs main - loop._run_once() was removed in the big refactor.

@martindurant is it possible to have a new release that includes this new refactor (it is hard for downstream to refer revisions on github, since pypi doesn't accept those references)?

isidentical avatar Apr 02 '21 10:04 isidentical

There was a tag yesterday ( https://github.com/intake/filesystem_spec/releases/tag/0.9.0 ). Would you be able to retest and report back @isidentical?

jakirkham avatar Apr 06 '21 18:04 jakirkham

@jakirkham Looks like you probably meant to tag @mrocklin ?

efiop avatar Aug 03 '21 22:08 efiop

Batuhan was asking for a release in the previous comment. Hence why it was directed to him (there has been a release).

Though happy to have more people testing/letting us know whether this is resolved or not :)

jakirkham avatar Aug 18 '21 04:08 jakirkham

sorry to revive this old thread but fsspec today does work with uvloop (in my usecase at least with gcsfs).

at the beginning of your app:

import asyncio
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

Then fs.loop shows its a uvloop instance.

However, if you create the loop manually and pass loop=new_uv_loop, it thread locks.

tasansal avatar Jun 14 '24 16:06 tasansal

However, if you create the loop manually and pass loop=new_uv_loop

In this case, you should create the FS instance inside a coroutine, to ensure everything is set up right.

martindurant avatar Jun 14 '24 19:06 martindurant