gcsfs icon indicating copy to clipboard operation
gcsfs copied to clipboard

credentials error with distributed

Open rabernat opened this issue 6 years ago • 18 comments

I am trying to use gcsfs via distributed in pangeo-data/pangeo#150. I have uncovered what seems like a serialization bug.

This works from my notebook (the token appears to be cached):

fs = gcsfs.GCSFileSystem(project='pangeo-181919')
fs.buckets

It returns the four buckets: ['pangeo', 'pangeo-data', 'pangeo-data-private', 'zarr_store_test'].

Now I create a distributed cluster and client and use it to run the same command:

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=False)
client = Client(cluster)
client.run(lambda : fs.buckets)

I get the following error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-25-3de328a517c7> in <module>()
----> 1 client.run(lambda : fs.buckets)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in run(self, function, *args, **kwargs)
   1906          '192.168.0.101:9000': 'running}
   1907         """
-> 1908         return self.sync(self._run, function, *args, **kwargs)
   1909 
   1910     @gen.coroutine

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    601             return future
    602         else:
--> 603             return sync(self.loop, func, *args, **kwargs)
    604 
    605     def __repr__(self):

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    251             e.wait(10)
    252     if error[0]:
--> 253         six.reraise(*error[0])
    254     else:
    255         return result[0]

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py in f()
    235             yield gen.moment
    236             thread_state.asynchronous = True
--> 237             result[0] = yield make_coro()
    238         except Exception as exc:
    239             logger.exception(exc)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1067                             exc_info = None
   1068                     else:
-> 1069                         yielded = self.gen.send(value)
   1070 
   1071                     if stack_context._state.contexts is not orig_stack_contexts:

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in _run(self, function, *args, **kwargs)
   1860                 results[key] = resp['result']
   1861             elif resp['status'] == 'error':
-> 1862                 six.reraise(*clean_exception(**resp))
   1863         raise gen.Return(results)
   1864 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

<ipython-input-25-3de328a517c7> in <lambda>()
----> 1 client.run(lambda : fs.buckets)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in buckets()
    449     def buckets(self):
    450         """Return list of available project buckets."""
--> 451         return [b["name"] for b in self._list_buckets()["items"]]
    452 
    453     @classmethod

<decorator-gen-128> in _list_buckets()

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _list_buckets()
    568         items = []
    569         page = self._call(
--> 570             'get', 'b/', project=self.project
    571         )
    572 

<decorator-gen-123> in _call()

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _call()
    429             try:
    430                 time.sleep(2**retry - 1)
--> 431                 r = meth(self.base + path, params=kwargs, json=json)
    432                 validate_response(r, path)
    433                 break

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/requests/sessions.py in get()
    519 
    520         kwargs.setdefault('allow_redirects', True)
--> 521         return self.request('GET', url, **kwargs)
    522 
    523     def options(self, url, **kwargs):

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/auth/transport/requests.py in request()
    195         request_headers = headers.copy() if headers is not None else {}
    196 
--> 197         self.credentials.before_request(
    198             self._auth_request, method, url, request_headers)
    199 

AttributeError: 'AuthorizedSession' object has no attribute 'credentials'

rabernat avatar Mar 12 '18 02:03 rabernat

How is the authorization happening in this case?

martindurant avatar Mar 12 '18 13:03 martindurant

I originally called fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='browser'). From then on, I didn't need to authenticate any more.

I am also trying to use a service account json token, but that doesn't work (see #89). These two issues are coupled.

rabernat avatar Mar 12 '18 14:03 rabernat

OK, makes sense. Yes, the browser method makes a token which is cached in the file ~/.gcs_tokens - you will need to distribute this to all nodes to use this method.

martindurant avatar Mar 12 '18 15:03 martindurant

There are no nodes. It's a local, threaded cluster.

rabernat avatar Mar 13 '18 13:03 rabernat

Can you explicitly try token='cache' ?

martindurant avatar Mar 13 '18 13:03 martindurant

It works with token='cache'

rabernat avatar Mar 13 '18 14:03 rabernat

Excellent! So it seems that in the default case (token=None), google_default must be returning some form of non-working credentials. I'm not sure what we can do to check that the credentials are valid, as there could well be valid credentials that do not have bucket_list privilege. Maybe try to list some known public bucket?

martindurant avatar Mar 13 '18 14:03 martindurant

Isn't that what the check=True option is for in gcsfs.mapping.GCSMap?

rabernat avatar Mar 13 '18 15:03 rabernat

That check has higher demands: it lists the buckets to ensure the given bucket exists and tries to write to it too.

martindurant avatar Mar 13 '18 15:03 martindurant

For example gcs.ls('genomics-public-data/references') should always work, as this is in google's set of public reference datasets; this could be a "check" option on the GCSFileSystem instance, done for each auth type. Thoughts?

martindurant avatar Mar 13 '18 17:03 martindurant

@martindurant Can you provide some references to best way to distribute .gcs_tokens to worker nodes?

Is there something like this for gcsfs https://github.com/dask/s3fs/pull/28

jgerardsimcock avatar Apr 10 '18 22:04 jgerardsimcock

The general assumption was that each worker would perform its own authentication, so you would need the google-default login correctly set-up (via CLI or google config files in special locations), a metadata service all can reach, a local token file on each worker, or the special gcs cache file on each worker. The token= parameter must be in the storage options passed to the backend and match the type of auth desired.

As you can see in https://github.com/dask/gcsfs/issues/91, many workers attempting to authenticate at the same time appears to cause problems, sometimes. In such a case, you may wish to distribute the token directly, assuming the network is secure. This is not documented, but https://github.com/dask/gcsfs/issues/91#issuecomment-377163476 should work.

martindurant avatar Apr 10 '18 22:04 martindurant

@martindurant Thank you! If/when I get this working, would you accept a PR with documentation?

jgerardsimcock avatar Apr 10 '18 22:04 jgerardsimcock

Of course!

martindurant avatar Apr 10 '18 22:04 martindurant

@martindurant per-worker browser-based authentication is a tough route, especially when used in connection with dask-kubernetes or a pangeo-style setup. I've been experimenting with json-based authentication using some combination of client.upload and mapping subprocess calls to mount the fuse directories manually before starting an analysis, but every time a worker dies and is rebooted automatically by the kubernetes engine I get a giant stream of FileNotFound errors. any suggestions on doing this at worker setup time?

delgadom avatar Apr 11 '18 15:04 delgadom

if distributed + gcsfs were able to recognize the setup of the fuse system as a required worker dependency that would do the trick. is there some hook we could customize to do this manually?

delgadom avatar Apr 11 '18 15:04 delgadom

Once you have done browser authentication, you will have a ~/.gcs_tokens file, which you distribute to workers, and use the token='cache' method. The automatic way to ensure the file appears for every worker pod in the right place will depend on your kubernetes configuration.

martindurant avatar Apr 11 '18 15:04 martindurant

ok thanks

delgadom avatar Apr 13 '18 16:04 delgadom