adlfs icon indicating copy to clipboard operation
adlfs copied to clipboard

Future <Future pending> attached to a different loop when trying to access a storage account with service principal credentials

Open faurholdt opened this issue 3 years ago • 13 comments

I'm having problems loading a dataset from azure when running dask on a cluster and using service principal credentials as stroage options.

example:

import coiled
from dask.distributed import Client
import dask.dataframe as dd


coiled.create_software_environment(
    name="azure-pip",
    pip=["dask[complete]==2021.9.1","adlfs==2021.10.0","fsspec==2021.10.0", "pyarrow"]
)

cluster = coiled.Cluster(n_workers=5, software="azure-pip")
client = Client(cluster)

storage_options = {"account_name": "my-storage",
    "tenant_id":"my-tenant",
    "client_id":"my-client-id",
    "client_secret":"my-client-secret"}

df = dd.read_csv("abfs://taxidata/yellow_tripdata_2020-02.csv", storage_options=storage_options)

df.tip_amount.sum().compute()

Will throw an error

RuntimeError: Task <Task pending name='Task-12' coro=<_runner() running at /opt/conda/envs/coiled/lib/python3.9/site-packages/fsspec/asyn.py:25> cb=[_chain_future.<locals>._call_set_state() at /opt/conda/envs/coiled/lib/python3.9/asyncio/futures.py:391]> got Future <Future pending> attached to a different loop

Other things I've tried: Creating an instance of aio DefaultAzureCrendentials and using that in my storage_options via the credential kwarg. This will result in a cloudpickle error. Basically the AzureCredential class cannot be pickled it seems.

Running it with on my local computer and not on a cluster. This works as expected.

I've also tried a lot of different versions of dask, distributed, adlfs and so on. But always the same example.

Do you know how to get around this? Am I doing something wrong or is this a bug?

Environment:

  • Dask version: 2019.9.1
  • Python version: 3.9.0
  • Operating System: ubuntu
  • Install method (conda, pip, source): pip

faurholdt avatar Oct 05 '21 11:10 faurholdt

Have you tried with the September release of adlfs and fsspec?I don’t think that’s it, but there is an entrypoint addition to adlfs that exploited a bug in fsspec. That is not present prior to the October release of adlfs.

Currently, adlfs isn’t unit tested against coiled, but our team uses it daily in production with distributed and dask.

It’s interesting that you can’t pass the credential directly. I’ve done that before without issue as well…

hayesgb avatar Oct 06 '21 02:10 hayesgb

@isidentical — the stacktrace in the above issue suggests it may be related to the callback logic. Have you seen anything like this?

hayesgb avatar Oct 06 '21 02:10 hayesgb

the stacktrace in the above issue suggests it may be related to the callback logic.

I don't think the 'callback' in the future is related with our fsspec.callbacks but rather just a callback of the native asyncio.Future object (e.g).

Have you seen anything like this?

I have, but those were generally missing asyncio.set_event_loop() calls on the fs level, which I believe was fixed by https://github.com/dask/adlfs/pull/229.

isidentical avatar Oct 06 '21 07:10 isidentical

Running it with on my local computer and not on a cluster.

Are you using the same versions of everything (adlfs, fsspec) when running on the local computer?

isidentical avatar Oct 06 '21 07:10 isidentical

Yes, with the exact same env I am able to execute the above example from local.

So, this part works as expected if I don't create a cluster

import dask.dataframe as dd

storage_options = {"account_name": "my-storage",
    "tenant_id":"my-tenant",
    "client_id":"my-client-id",
    "client_secret":"my-client-secret"}

df = dd.read_csv("abfs://taxidata/yellow_tripdata_2020-02.csv", storage_options=storage_options)

df.tip_amount.sum().compute()

However, I don't think the problem is related to coiled. If i create a Client with a LocalCluster I get the same error. Example:


import dask.dataframe as dd
from dask.distributed import Client

client = Client()

storage_options = {"account_name": "my-storage",
    "tenant_id":"my-tenant",
    "client_id":"my-client-id",
    "client_secret":"my-client-secret"}

df = dd.read_csv("abfs://taxidata/yellow_tripdata_2020-02.csv", storage_options=storage_options)

df.tip_amount.sum().compute()

will return the same Future <Future pending> attached to a different loop error

faurholdt avatar Oct 10 '21 08:10 faurholdt

I don't use Dask, but if you can provide a short reproducer with using the adlfs interace, I'd be happy to try and test it locally.

isidentical avatar Oct 11 '21 14:10 isidentical

Sorry for the late response, I've been trying a couple of things.

I seem to have no problem creating a filestorage object and walking/listing etc through my datalake.

So I am not certain I am able to create the issue without dask. As I said, it seems that the issue arises whenever you create a distributed Client. I assume that it has to do with the thread pool that is created.

faurholdt avatar Oct 17 '21 06:10 faurholdt

I have the same problem "got Future <Future pending> attached to a different loop" when using windows credentials with storage_options={'account_name': account_name, 'anon': False}. I use dask version dask.__version__ ='2022.7.0'

This exact code failes

import dask.dataframe as ddf
from dask.distributed import Client
print(f'{dask.__version__ =}')
client = Client()  # start distributed scheduler locally.

glob_path = 'raw/2020_*.csv'
STORAGE_OPTIONS_FAILS = {'account_name': account_name, 'anon': False}

ddf_data = ddf.read_csv(
    f'az://{glob_path}',
    storage_options=STORAGE_OPTIONS_FAILS,
    sep=';' ... )
ddf_data.describe().compute()

I can only avoid the "got Future <Future pending> attached to a different loop" error if I specify the account_key:

STORAGE_OPTIONS_WORKS = {'account_name': account_name, 'account_key': account_key}
ddf_data = ddf.read_csv(
    f'az://{glob_path}',
    storage_options=STORAGE_OPTIONS_WORKS,
    sep=';' ...)
ddf_data.describe().compute()

I assume it is DefaultAzureCredential or the SharedTokenCacheCredential which I use that is computed in a "different loop". But I have no clue. Do some of you maintainers (@hayesgb ) have an idea?

PeterFogh avatar Nov 25 '22 15:11 PeterFogh

After updating my adlfs version I do not have the "got Future attached to a different loop" problem anymore. Old version:

  - python=3.9.13=h6244533_2
  - adlfs=2022.10.0=pyhd8ed1ab_0
  - azure-common=1.1.27=pyhd3eb1b0_0
  - azure-core=1.26.0=pyhd8ed1ab_0
  - azure-datalake-store=0.0.51=pyh9f0ad1d_0
  - azure-identity=1.11.0=pyhd8ed1ab_0
  - azure-keyvault-secrets=4.6.0=pyhd8ed1ab_0
  - azure-storage-blob=12.13.1=pyhd8ed1ab_0
  - dask=2022.7.0=py39haa95532_0
  - dask-core=2022.7.0=py39haa95532_0

New versions:

  - python=3.10.8=h966fe2a_1
  - adlfs=2022.11.2=pyhd8ed1ab_0 
  - azure-common=1.1.27=pyhd3eb1b0_0
  - azure-core=1.26.1=pyhd8ed1ab_0
  - azure-datalake-store=0.0.51=pyh9f0ad1d_0
  - azure-identity=1.12.0=pyhd8ed1ab_0
  - azure-keyvault-secrets=4.6.0=pyhd8ed1ab_0
  - azure-storage-blob=12.14.1=pyhd8ed1ab_0
  - dask=2022.7.0=py310haa95532_0
  - dask-core=2022.7.0=py310haa95532_0

@faurholdt can you tryout the new package versions and see if you still have the problem?

PeterFogh avatar Nov 29 '22 10:11 PeterFogh

@PeterFogh Due to job changes, I do not have access to my dask code anymore, so I am not able to re-run. I am okay with closing the issue if a solution has been found.

faurholdt avatar Nov 29 '22 10:11 faurholdt

Okay @faurholdt thanks for the quick reply, :)

PeterFogh avatar Nov 29 '22 10:11 PeterFogh

I cannot close the issue - @faurholdt please close it if you can - else I'm sure @hayesgb can? :)

PeterFogh avatar Nov 29 '22 10:11 PeterFogh

I had a similar problem: I use dask.compute() to process dask.delayed functions, which in turn use fsspec and the adlfs to read files on azure. The authentication failed with the same exception and error message "Future <Future pending> attached to a different loop". The error occurs with more basic ThreadPoolExecutor implementation too instead of dask-compute(). I solved the issue (after hours of frustration), by performing a fs.ls('some-container') at the beginning in the main thread. That way, the authentication works fine and then the parallelized file reading works too. Here is how I initialize the fsspec file system in the main thread:

fs : AbstractFileSystem = fsspec.filesystem(
    "abfs", 
    account_name=ACCOUNT,
    credential=DefaultAzureCredential()
)

log.info("Acquiring credentials IN MAIN THREAD!!!")
fs.ls(path=SRC_ROOT)

jeromerg avatar Aug 15 '23 10:08 jeromerg