gcsfs
gcsfs copied to clipboard
credentials error with distributed
I am trying to use gcsfs via distributed in pangeo-data/pangeo#150. I have uncovered what seems like a serialization bug.
This works from my notebook (the token appears to be cached):
fs = gcsfs.GCSFileSystem(project='pangeo-181919')
fs.buckets
It returns the four buckets: ['pangeo', 'pangeo-data', 'pangeo-data-private', 'zarr_store_test']
.
Now I create a distributed cluster and client and use it to run the same command:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=False)
client = Client(cluster)
client.run(lambda : fs.buckets)
I get the following error:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-25-3de328a517c7> in <module>()
----> 1 client.run(lambda : fs.buckets)
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in run(self, function, *args, **kwargs)
1906 '192.168.0.101:9000': 'running}
1907 """
-> 1908 return self.sync(self._run, function, *args, **kwargs)
1909
1910 @gen.coroutine
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
601 return future
602 else:
--> 603 return sync(self.loop, func, *args, **kwargs)
604
605 def __repr__(self):
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
251 e.wait(10)
252 if error[0]:
--> 253 six.reraise(*error[0])
254 else:
255 return result[0]
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py in f()
235 yield gen.moment
236 thread_state.asynchronous = True
--> 237 result[0] = yield make_coro()
238 except Exception as exc:
239 logger.exception(exc)
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py in run(self)
1053
1054 try:
-> 1055 value = future.result()
1056 except Exception:
1057 self.had_exception = True
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py in run(self)
1067 exc_info = None
1068 else:
-> 1069 yielded = self.gen.send(value)
1070
1071 if stack_context._state.contexts is not orig_stack_contexts:
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in _run(self, function, *args, **kwargs)
1860 results[key] = resp['result']
1861 elif resp['status'] == 'error':
-> 1862 six.reraise(*clean_exception(**resp))
1863 raise gen.Return(results)
1864
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
690 value = tp()
691 if value.__traceback__ is not tb:
--> 692 raise value.with_traceback(tb)
693 raise value
694 finally:
<ipython-input-25-3de328a517c7> in <lambda>()
----> 1 client.run(lambda : fs.buckets)
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in buckets()
449 def buckets(self):
450 """Return list of available project buckets."""
--> 451 return [b["name"] for b in self._list_buckets()["items"]]
452
453 @classmethod
<decorator-gen-128> in _list_buckets()
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
49 logger.log(logging.DEBUG - 1, tb_io.getvalue())
50
---> 51 return f(self, *args, **kwargs)
52
53
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _list_buckets()
568 items = []
569 page = self._call(
--> 570 'get', 'b/', project=self.project
571 )
572
<decorator-gen-123> in _call()
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
49 logger.log(logging.DEBUG - 1, tb_io.getvalue())
50
---> 51 return f(self, *args, **kwargs)
52
53
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _call()
429 try:
430 time.sleep(2**retry - 1)
--> 431 r = meth(self.base + path, params=kwargs, json=json)
432 validate_response(r, path)
433 break
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/requests/sessions.py in get()
519
520 kwargs.setdefault('allow_redirects', True)
--> 521 return self.request('GET', url, **kwargs)
522
523 def options(self, url, **kwargs):
/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/auth/transport/requests.py in request()
195 request_headers = headers.copy() if headers is not None else {}
196
--> 197 self.credentials.before_request(
198 self._auth_request, method, url, request_headers)
199
AttributeError: 'AuthorizedSession' object has no attribute 'credentials'
How is the authorization happening in this case?
I originally called fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='browser')
. From then on, I didn't need to authenticate any more.
I am also trying to use a service account json token, but that doesn't work (see #89). These two issues are coupled.
OK, makes sense. Yes, the browser method makes a token which is cached in the file ~/.gcs_tokens
- you will need to distribute this to all nodes to use this method.
There are no nodes. It's a local, threaded cluster.
Can you explicitly try token='cache'
?
It works with token='cache'
Excellent! So it seems that in the default case (token=None
), google_default must be returning some form of non-working credentials. I'm not sure what we can do to check that the credentials are valid, as there could well be valid credentials that do not have bucket_list privilege. Maybe try to list some known public bucket?
Isn't that what the check=True
option is for in gcsfs.mapping.GCSMap
?
That check has higher demands: it lists the buckets to ensure the given bucket exists and tries to write to it too.
For example gcs.ls('genomics-public-data/references')
should always work, as this is in google's set of public reference datasets; this could be a "check" option on the GCSFileSystem instance, done for each auth type. Thoughts?
@martindurant Can you provide some references to best way to distribute .gcs_tokens
to worker nodes?
Is there something like this for gcsfs https://github.com/dask/s3fs/pull/28
The general assumption was that each worker would perform its own authentication, so you would need the google-default login correctly set-up (via CLI or google config files in special locations), a metadata service all can reach, a local token file on each worker, or the special gcs cache file on each worker. The token=
parameter must be in the storage options passed to the backend and match the type of auth desired.
As you can see in https://github.com/dask/gcsfs/issues/91, many workers attempting to authenticate at the same time appears to cause problems, sometimes. In such a case, you may wish to distribute the token directly, assuming the network is secure. This is not documented, but https://github.com/dask/gcsfs/issues/91#issuecomment-377163476 should work.
@martindurant Thank you! If/when I get this working, would you accept a PR with documentation?
Of course!
@martindurant per-worker browser-based authentication is a tough route, especially when used in connection with dask-kubernetes or a pangeo-style setup. I've been experimenting with json-based authentication using some combination of client.upload
and mapping subprocess calls to mount the fuse directories manually before starting an analysis, but every time a worker dies and is rebooted automatically by the kubernetes engine I get a giant stream of FileNotFound
errors. any suggestions on doing this at worker setup time?
if distributed + gcsfs were able to recognize the setup of the fuse system as a required worker dependency that would do the trick. is there some hook we could customize to do this manually?
Once you have done browser authentication, you will have a ~/.gcs_tokens
file, which you distribute to workers, and use the token='cache'
method. The automatic way to ensure the file appears for every worker pod in the right place will depend on your kubernetes configuration.
ok thanks