dask-cloudprovider
dask-cloudprovider copied to clipboard
Reading Data from Azure Storage is failing sporadically with Internal Error
Steps to reproduce: I have created Dask Cluster inside AzureML environment using the following code:
amlcluster = AzureMLCluster(ws,
vm_size="STANDARD_D1",
environment_definition=ws.environments['AzureML-Dask-CPU'],
initial_node_count=0,
scheduler_idle_timeout=10800,
vnet='vnet',
subnet='subnet',
vnet_resource_group='resourcegroup',
ct_name="biswasdask",
)
Next open the jupyter lab using the link returned by amlcluster.jupyter_link
As per my understanding I am into the scheduler node of the cluster now.
I am trying to read parquet file from Azure Data Lake Gen 2 Storage with the following code:
storage_options={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}
ddf = dd.read_parquet('az://<my-container>/<mypath>/20201101.parquet', storage_options=storage_options)
It fails with the error: Server encountered an internal error. Please try again after some time.
In fact, if I try it for the second time, it works. So the failure is sporadic in nature.
However, the same code works everytime without any failure if I execute it outside the scheduler node (The VM from which I have created the AzureMLCluser.
Here is the detailed stack trace:
---------------------------------------------------------------------------
StorageErrorException Traceback (most recent call last)
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/azure/storage/blob/aio/_list_blobs_helper.py in _get_next_cb(self, continuation_token)
74 cls=return_context_and_deserialized,
---> 75 use_location=self.location_mode)
76 except StorageErrorException as error:
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/azure/storage/blob/_generated/aio/operations_async/_container_operations_async.py in list_blob_hierarchy_segment(self, delimiter, prefix, marker, maxresults, include, timeout, request_id, cls, **kwargs)
1328 map_error(status_code=response.status_code, response=response, error_map=error_map)
-> 1329 raise models.StorageErrorException(response, self._deserialize)
1330
StorageErrorException: Operation returned an invalid status 'Server encountered an internal error. Please try again after some time.'
During handling of the above exception, another exception occurred:
HttpResponseError Traceback (most recent call last)
<ipython-input-3-9a4d92abd786> in <module>
1 storage_options={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}
2
----> 3 ddf = dd.read_parquet('az://ads-hs/cortix_data_root/EcoEnergy/day_sum/yr=2020/mn=11/org_id=R02ERUS01/site_id=R02ERUS010001/20201101.parquet', storage_options=storage_options)
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, chunksize, **kwargs)
237 filters=filters,
238 split_row_groups=split_row_groups,
--> 239 **kwargs,
240 )
241
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, **kwargs)
206 # correspond to a row group (populated below).
207 parts, pf, gather_statistics, fast_metadata, base_path = _determine_pf_parts(
--> 208 fs, paths, gather_statistics, **kwargs
209 )
210
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
183 base = None
184 pf = ParquetFile(
--> 185 paths[0], open_with=fs.open, sep=fs.sep, **kwargs.get("file", {})
186 )
187
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep)
109 fn2 = join_path(fn, '_metadata')
110 self.fn = fn2
--> 111 with open_with(fn2, 'rb') as f:
112 self._parse_header(f, verify)
113 fn = fn2
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/fsspec/spec.py in open(self, path, mode, block_size, cache_options, **kwargs)
901 autocommit=ac,
902 cache_options=cache_options,
--> 903 **kwargs
904 )
905 if not ac:
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/adlfs/spec.py in _open(self, path, mode, block_size, autocommit, cache_options, cache_type, **kwargs)
1257 cache_options=cache_options,
1258 cache_type=cache_type,
-> 1259 **kwargs,
1260 )
1261
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/adlfs/spec.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
1348 if self.mode == "rb":
1349 if not hasattr(self, "details"):
-> 1350 self.details = self.fs.info(self.path)
1351 self.size = self.details["size"]
1352 self.cache = caches[cache_type](
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/adlfs/spec.py in info(self, path, refresh, **kwargs)
490 fetch_from_azure = (path and self._ls_from_cache(path) is None) or refresh
491 if fetch_from_azure:
--> 492 return maybe_sync(self._info, self, path)
493 return super().info(path)
494
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
98 if inspect.iscoroutinefunction(func):
99 # run the awaitable on the loop
--> 100 return sync(loop, func, *args, **kwargs)
101 else:
102 # just call the blocking function
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/fsspec/asyn.py in sync(loop, func, callback_timeout, *args, **kwargs)
69 if error[0]:
70 typ, exc, tb = error[0]
---> 71 raise exc.with_traceback(tb)
72 else:
73 return result[0]
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/fsspec/asyn.py in f()
53 if callback_timeout is not None:
54 future = asyncio.wait_for(future, callback_timeout)
---> 55 result[0] = await future
56 except Exception:
57 error[0] = sys.exc_info()
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/adlfs/spec.py in _info(self, path, **kwargs)
511 if out:
512 return out[0]
--> 513 out = await self._ls(path, **kwargs)
514 path = path.rstrip("/")
515 out1 = [o for o in out if o["name"].rstrip("/") == path]
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/adlfs/spec.py in _ls(self, path, invalidate_cache, delimiter, return_glob, **kwargs)
685 outblobs = []
686 try:
--> 687 async for next_blob in blobs:
688 if depth in [0, 1] and path == "":
689 outblobs.append(next_blob)
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/azure/core/async_paging.py in __anext__(self)
152 if self._page_iterator is None:
153 self._page_iterator = self.by_page()
--> 154 return await self.__anext__()
155 if self._page is None:
156 # Let it raise StopAsyncIteration
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/azure/core/async_paging.py in __anext__(self)
155 if self._page is None:
156 # Let it raise StopAsyncIteration
--> 157 self._page = await self._page_iterator.__anext__()
158 return await self.__anext__()
159 try:
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/azure/core/async_paging.py in __anext__(self)
97 raise StopAsyncIteration("End of paging")
98 try:
---> 99 self._response = await self._get_next(self.continuation_token)
100 except AzureError as error:
101 if not error.continuation_token:
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/azure/storage/blob/aio/_list_blobs_helper.py in _get_next_cb(self, continuation_token)
75 use_location=self.location_mode)
76 except StorageErrorException as error:
---> 77 process_storage_error(error)
78
79 async def _extract_data_cb(self, get_next_return):
/azureml-envs/azureml_d407e2694bdeecd1113b9f2a6efdddf7/lib/python3.6/site-packages/azure/storage/blob/_shared/response_handlers.py in process_storage_error(storage_error)
145 error.error_code = error_code
146 error.additional_info = additional_data
--> 147 raise error
148
149
HttpResponseError: Server encountered an internal error. Please try again after some time.
RequestId:342d95f9-a01e-0035-486f-bece1b000000
Time:2020-11-19T12:30:37.2957133Z
ErrorCode:InternalError
Error:None