cloud-ml-examples
cloud-ml-examples copied to clipboard
Dask on Dataproc GCP
Will contribute notebooks to show workflow to run Dask on GCP Dataproc
Sounds great, thanks @anudeepbablu!
https://github.com/NVIDIA/nvidia-gcp-samples/blob/master/bigquery-samples/dask-bigquery-connector/bigquery_dataproc_dask_xgboost.ipynb
https://cloud.google.com/blog/products/data-analytics/improve-data-science-experience-using-scalable-python-data-processing
Seeing this error:
cluster = YarnCluster(worker_class="dask_cuda.CUDAWorker",
worker_gpus=1, worker_vcores=4, worker_memory='24GB',
worker_env={"CONDA_PREFIX":"/opt/conda/default/"})
cluster.scale(4)
client = Client(cluster)
client
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In [3], line 1
----> 1 client = Client(cluster)
2 client
File /opt/conda/miniconda3/lib/python3.8/site-packages/distributed/client.py:834, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
831 elif isinstance(getattr(address, "scheduler_address", None), str):
832 # It's a LocalCluster or LocalCluster-compatible object
833 self.cluster = address
--> 834 status = getattr(self.cluster, "status")
835 if status and status in [Status.closed, Status.closing]:
836 raise RuntimeError(
837 f"Trying to connect to an already closed or closing Cluster {self.cluster}."
838 )
AttributeError: 'YarnCluster' object has no attribute 'status'
Looks related to https://github.com/dask/dask-yarn/issues/158 which there is a workaround for.
Once you have things working could you write this up as a documentation page in rapidsai/deployment?
Once you have things working could you write this up as a documentation page in rapidsai/deployment?
I have opened PR https://github.com/rapidsai/deployment/pull/99 to update the Dataproc instructions
Update: PR on-hold awaiting this issue to be resolved -- Google team needs to upgrade the dask rapids installation to v22.12
.
@jacobtomlinson Blocked by this error, fails to load the parquet
dataset, perhaps related to https://github.com/apache/arrow/issues/31812
OUTPUT
``` > File already exists. Ready to load at /rapids_hpo/data/airlines.parquet --------------------------------------------------------------------------- ArrowInvalid Traceback (most recent call last) Cell In[7], line 1 ----> 1 df = prepare_dataset(use_full_dataset=True)Cell In[6], line 34, in prepare_dataset(use_full_dataset) 28 print(f" > Download complete {file_name}") 30 input_cols = ["Year", "Month", "DayofMonth", "DayofWeek", "CRSDepTime", "CRSArrTime", 31 "UniqueCarrier", "FlightNum", "ActualElapsedTime", "Origin", "Dest", 32 "Distance", "Diverted"] ---> 34 dataset = cudf.read_parquet(parquet_name) 36 # encode categoricals as numeric 37 for col in dataset.select_dtypes(["object"]).columns:
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/contextlib.py:79, in ContextDecorator.call.
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/cudf/io/parquet.py:420, in read_parquet(filepath_or_buffer, engine, columns, filters, row_groups, strings_to_categorical, use_pandas_metadata, use_python_file_object, categorical_partitions, open_file_options, *args, **kwargs) 413 partition_categories = {} 414 if fs and paths: 415 ( 416 paths, 417 row_groups, 418 partition_keys, 419 partition_categories, --> 420 ) = _process_dataset( 421 paths, 422 fs, 423 filters=filters, 424 row_groups=row_groups, 425 categorical_partitions=categorical_partitions, 426 ) 427 elif filters is not None: 428 raise ValueError("cudf cannot apply filters to open file objects.")
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/contextlib.py:79, in ContextDecorator.call.
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/cudf/io/parquet.py:243, in _process_dataset(paths, fs, filters, row_groups, categorical_partitions) 238 filters = pq._filters_to_expression(filters) 240 # Initialize ds.FilesystemDataset 241 # TODO: Remove the if len(paths) workaround after following bug is fixed: 242 # https://issues.apache.org/jira/browse/ARROW-16438 --> 243 dataset = ds.dataset( 244 source=paths[0] if len(paths) == 1 else paths, 245 filesystem=fs, 246 format="parquet", 247 partitioning="hive", 248 ) 250 file_list = dataset.files 251 if len(file_list) == 0:
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/dataset.py:749, in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes) 738 kwargs = dict( 739 schema=schema, 740 filesystem=filesystem, (...) 745 selector_ignore_prefixes=ignore_prefixes 746 ) 748 if _is_path_like(source): --> 749 return _filesystem_dataset(source, **kwargs) 750 elif isinstance(source, (tuple, list)): 751 if all(_is_path_like(elem) for elem in source):
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/dataset.py:451, in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes) 443 options = FileSystemFactoryOptions( 444 partitioning=partitioning, 445 partition_base_dir=partition_base_dir, 446 exclude_invalid_files=exclude_invalid_files, 447 selector_ignore_prefixes=selector_ignore_prefixes 448 ) 449 factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options) --> 451 return factory.finish(schema)
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/_dataset.pyx:1885, in pyarrow._dataset.DatasetFactory.finish()
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/error.pxi:144, in pyarrow.lib.pyarrow_internal_check_status()
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/error.pxi:100, in pyarrow.lib.check_status()
ArrowInvalid: Error creating dataset. Could not read schema from '/rapids_hpo/data/airlines.parquet': Could not open Parquet input source '/rapids_hpo/data/airlines.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?
Looks like there is something wrong with the dataset. Could you share a link to the full notebook?
I am testing this notebook in a GCP Dataproc cluster using the airline parquet dataset located in my GCS bucket (i think you need to access GCP for this)
In that case my guess from the error would be that either the dataset in GCS is corrupted, or the client/workers can't access the file correctly.
It would help if you could share a complete example of what you ran to get the error. Happy to sync up if that's easier.
Side-note: As you're using the HPO_Demo.ipynb
notebook for testing and therefore are the most familiar with it I've assigned https://github.com/rapidsai/cloud-ml-examples/issues/208 to you.
I loaded the data locally and it works. Seems to be an issue on Dataproc side. I was considering loading the parquet dataset into a BigQuery table as this notebook example does. But I think we should still find time to sync.
OUTPUT
(dataproc) skirui@skirui-HP-Z8-G4-Workstation:~/Desktop$ parquet-tools inspect airline_small.parquet
############ file meta data ############
created_by:
num_columns: 14
num_rows: 200000
num_row_groups: 1
format_version: 1.0
serialized_size: 3102
############ Columns ############
ArrDelayBinary
Year
Month
DayofMonth
DayofWeek
CRSDepTime
CRSArrTime
UniqueCarrier
FlightNum
ActualElapsedTime
Origin
Dest
Distance
Diverted
############ Column(ArrDelayBinary) ############
name: ArrDelayBinary
path: ArrDelayBinary
max_definition_level: 0
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 2%)
############ Column(Year) ############
name: Year
path: Year
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)
############ Column(Month) ############
name: Month
path: Month
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)
############ Column(DayofMonth) ############
name: DayofMonth
path: DayofMonth
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)
############ Column(DayofWeek) ############
name: DayofWeek
path: DayofWeek
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)
############ Column(CRSDepTime) ############
name: CRSDepTime
path: CRSDepTime
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 36%)
############ Column(CRSArrTime) ############
name: CRSArrTime
path: CRSArrTime
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 7%)
############ Column(UniqueCarrier) ############
name: UniqueCarrier
path: UniqueCarrier
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 9%)
############ Column(FlightNum) ############
name: FlightNum
path: FlightNum
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 1%)
############ Column(ActualElapsedTime) ############
name: ActualElapsedTime
path: ActualElapsedTime
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)
############ Column(Origin) ############
name: Origin
path: Origin
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 18%)
############ Column(Dest) ############
name: Dest
path: Dest
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 16%)
############ Column(Distance) ############
name: Distance
path: Distance
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 1%)
############ Column(Diverted) ############
name: Diverted
path: Diverted
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)
Update:
For this issue, I will be testing the bigquery_dataproc_dask_xgboost.ipynb that shows how to use Dask to process dataset from big query leveraging Dask-BigQuery connector on a Dataproc cluster.
Whereas hpo_demo.ipynb will be tested on a EC2 instance as tracked by PR #243
@jacobtomlinson I am seeing this error when Starting a YarnCluster
from dask.distributed import Client
from dask_yarn import YarnCluster
cluster = YarnCluster(worker_class="dask_cuda.CUDAWorker",
worker_gpus=1, worker_vcores=4, worker_memory='24GB',
worker_env={"CONDA_PREFIX":"/opt/conda/default/"})
cluster.scale(4)
23/02/22 05:17:09 INFO client.RMProxy: Connecting to ResourceManager at test-dask-rapids-2212-m/10.138.0.4:8032
23/02/22 05:17:10 INFO client.AHSProxy: Connecting to Application History server at test-dask-rapids-2212-m/10.138.0.4:10200
23/02/22 05:17:10 INFO skein.Driver: Driver started, listening on 40903
23/02/22 05:17:10 INFO conf.Configuration: found resource resource-types.xml at file:/etc/hadoop/conf.empty/resource-types.xml
23/02/22 05:17:10 INFO resource.ResourceUtils: Adding resource type - name = yarn.io/gpu, units = , type = COUNTABLE
23/02/22 05:17:10 INFO skein.Driver: Uploading application resources to hdfs://test-dask-rapids-2212-m/user/root/.skein/application_1677037421032_0005
23/02/22 05:17:11 INFO skein.Driver: Submitting application...
23/02/22 05:17:11 INFO impl.YarnClientImpl: Submitted application application_1677037421032_0005
/opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/dask_yarn/core.py:687: RuntimeWarning: coroutine 'rpc.close_rpc' was never awaited
self.scheduler_comm.close_rpc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
client = Client(cluster)
client
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[8], line 1
----> 1 client = Client(cluster)
2 client
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/distributed/client.py:884, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
881 elif isinstance(getattr(address, "scheduler_address", None), str):
882 # It's a LocalCluster or LocalCluster-compatible object
883 self.cluster = address
--> 884 status = self.cluster.status
885 if status in (Status.closed, Status.closing):
886 raise RuntimeError(
887 f"Trying to connect to an already closed or closing Cluster {self.cluster}."
888 )
AttributeError: 'YarnCluster' object has no attribute 'status'
Looks related to this issue https://github.com/dask/dask-yarn/issues/155
As a workaround can you try client = Client(cluster.scheduler_address)
.
@jacobtomlinson Blocked by this when attempting to read from BigQuery table.. I have tried following the instructions to enable authentication with service account but no success, we might need to pair on this?
{
"error": {
"code": 401,
"message": "Request is missing required authentication credential. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.",
"errors": [
{
"message": "Login Required.",
"domain": "global",
"reason": "required",
"location": "Authorization",
"locationType": "header"
}
],
"status": "UNAUTHENTICATED",
"details": [
{
"@type": "type.googleapis.com/google.rpc.ErrorInfo",
"reason": "CREDENTIALS_MISSING",
"domain": "googleapis.com",
"metadata": {
"method": "google.cloud.bigquery.v2.TableService.GetTable",
"service": "bigquery.googleapis.com"
}
}
]
}
}
TRACEBACK
Forbidden Traceback (most recent call last)
Cell In[5], line 7
4 n_workers = len(workers)
5 print('Number of GPU workers: ', n_workers)
----> 7 ddf = dask_bigquery.read_gbq(
8 project_id="k80-exploration",
9 dataset_id="spark_rapids",
10 table_id="nyc_taxi_0",
11 )
13 ddf.head()
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/dask_bigquery/core.py:123, in read_gbq(project_id, dataset_id, table_id, row_filter, columns, max_stream_count, read_kwargs)
121 read_kwargs = read_kwargs or {}
122 with bigquery_clients(project_id) as (bq_client, bqs_client):
--> 123 table_ref = bq_client.get_table(f"{dataset_id}.{table_id}")
124 if table_ref.table_type == "VIEW":
125 raise TypeError("Table type VIEW not supported")
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/cloud/bigquery/client.py:1011, in Client.get_table(self, table, retry, timeout)
1009 path = table_ref.path
1010 span_attributes = {"path": path}
-> 1011 api_response = self._call_api(
1012 retry,
1013 span_name="BigQuery.getTable",
1014 span_attributes=span_attributes,
1015 method="GET",
1016 path=path,
1017 timeout=timeout,
1018 )
1019 return Table.from_api_repr(api_response)
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/cloud/bigquery/client.py:759, in Client._call_api(self, retry, span_name, span_attributes, job_ref, headers, **kwargs)
755 if span_name is not None:
756 with create_span(
757 name=span_name, attributes=span_attributes, client=self, job_ref=job_ref
758 ):
--> 759 return call()
761 return call()
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/api_core/retry.py:349, in Retry.__call__.<locals>.retry_wrapped_func(*args, **kwargs)
345 target = functools.partial(func, *args, **kwargs)
346 sleep_generator = exponential_sleep_generator(
347 self._initial, self._maximum, multiplier=self._multiplier
348 )
--> 349 return retry_target(
350 target,
351 self._predicate,
352 sleep_generator,
353 self._timeout,
354 on_error=on_error,
355 )
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/api_core/retry.py:191, in retry_target(target, predicate, sleep_generator, timeout, on_error, **kwargs)
189 for sleep in sleep_generator:
190 try:
--> 191 return target()
193 # pylint: disable=broad-except
194 # This function explicitly must deal with broad exceptions.
195 except Exception as exc:
File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/cloud/_http/__init__.py:494, in JSONConnection.api_request(self, method, path, query_params, data, content_type, headers, api_base_url, api_version, expect_json, _target_object, timeout, extra_api_info)
482 response = self._make_request(
483 method=method,
484 url=url,
(...)
490 extra_api_info=extra_api_info,
491 )
493 if not 200 <= response.status_code < 300:
--> 494 raise exceptions.from_http_response(response)
496 if expect_json and response.content:
497 return response.json()
Forbidden: 403 GET https://bigquery.googleapis.com/bigquery/v2/projects/k80-exploration/datasets/spark_rapids/tables/nyc_taxi_0?prettyPrint=false: Access Denied: Table k80-exploration:spark_rapids.nyc_taxi_0: Permission bigquery.tables.get denied on table k80-exploration:spark_rapids.nyc_taxi_0 (or it may not exist).
The error message also says or it may not exist
so I had a poke through the k80-exploration
project in the Google Cloud Console and couldn't find a dataset called spark_rapids
or anything related to nyc_taxi
data. Perhaps this dataset has been deleted?
Client(cluster.scheduler_address)
Are there any plans for a version release that doesn't require a workaround like this?
In my case, the workaround did not help - Using Client(cluster.scheduler_address)
instead of Client(cluster)
results in the cluster never connecting.
python==3.9.16
dask==2023.9.2
dask-yarn=0.8.1 # (also tried 0.9)
distributed==2023.9.2
Closing in favour of https://github.com/GoogleCloudDataproc/initialization-actions/issues/1137