cloud-ml-examples icon indicating copy to clipboard operation
cloud-ml-examples copied to clipboard

Dask on Dataproc GCP

Open anudeepbablu opened this issue 3 years ago • 3 comments

Will contribute notebooks to show workflow to run Dask on GCP Dataproc

anudeepbablu avatar Apr 13 '21 04:04 anudeepbablu

Sounds great, thanks @anudeepbablu!

zronaghi avatar Apr 13 '21 17:04 zronaghi

https://github.com/NVIDIA/nvidia-gcp-samples/blob/master/bigquery-samples/dask-bigquery-connector/bigquery_dataproc_dask_xgboost.ipynb

quasiben avatar Feb 28 '22 15:02 quasiben

https://cloud.google.com/blog/products/data-analytics/improve-data-science-experience-using-scalable-python-data-processing

jacobtomlinson avatar Sep 28 '22 15:09 jacobtomlinson

Seeing this error:

cluster = YarnCluster(worker_class="dask_cuda.CUDAWorker", 
    worker_gpus=1, worker_vcores=4, worker_memory='24GB', 
    worker_env={"CONDA_PREFIX":"/opt/conda/default/"})
cluster.scale(4)

client = Client(cluster)
client

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In [3], line 1
----> 1 client = Client(cluster)
      2 client

File /opt/conda/miniconda3/lib/python3.8/site-packages/distributed/client.py:834, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    831 elif isinstance(getattr(address, "scheduler_address", None), str):
    832     # It's a LocalCluster or LocalCluster-compatible object
    833     self.cluster = address
--> 834     status = getattr(self.cluster, "status")
    835     if status and status in [Status.closed, Status.closing]:
    836         raise RuntimeError(
    837             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    838         )

AttributeError: 'YarnCluster' object has no attribute 'status'

skirui-source avatar Dec 08 '22 04:12 skirui-source

Looks related to https://github.com/dask/dask-yarn/issues/158 which there is a workaround for.

jacobtomlinson avatar Dec 08 '22 14:12 jacobtomlinson

Once you have things working could you write this up as a documentation page in rapidsai/deployment?

jacobtomlinson avatar Dec 09 '22 15:12 jacobtomlinson

Once you have things working could you write this up as a documentation page in rapidsai/deployment?

I have opened PR https://github.com/rapidsai/deployment/pull/99 to update the Dataproc instructions

skirui-source avatar Jan 11 '23 18:01 skirui-source

Update: PR on-hold awaiting this issue to be resolved -- Google team needs to upgrade the dask rapids installation to v22.12.

skirui-source avatar Jan 13 '23 19:01 skirui-source

@jacobtomlinson Blocked by this error, fails to load the parquet dataset, perhaps related to https://github.com/apache/arrow/issues/31812

OUTPUT ``` > File already exists. Ready to load at /rapids_hpo/data/airlines.parquet --------------------------------------------------------------------------- ArrowInvalid Traceback (most recent call last) Cell In[7], line 1 ----> 1 df = prepare_dataset(use_full_dataset=True)

Cell In[6], line 34, in prepare_dataset(use_full_dataset) 28 print(f" > Download complete {file_name}") 30 input_cols = ["Year", "Month", "DayofMonth", "DayofWeek", "CRSDepTime", "CRSArrTime", 31 "UniqueCarrier", "FlightNum", "ActualElapsedTime", "Origin", "Dest", 32 "Distance", "Diverted"] ---> 34 dataset = cudf.read_parquet(parquet_name) 36 # encode categoricals as numeric 37 for col in dataset.select_dtypes(["object"]).columns:

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/contextlib.py:79, in ContextDecorator.call..inner(*args, **kwds) 76 @wraps(func) 77 def inner(*args, **kwds): 78 with self._recreate_cm(): ---> 79 return func(*args, **kwds)

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/cudf/io/parquet.py:420, in read_parquet(filepath_or_buffer, engine, columns, filters, row_groups, strings_to_categorical, use_pandas_metadata, use_python_file_object, categorical_partitions, open_file_options, *args, **kwargs) 413 partition_categories = {} 414 if fs and paths: 415 ( 416 paths, 417 row_groups, 418 partition_keys, 419 partition_categories, --> 420 ) = _process_dataset( 421 paths, 422 fs, 423 filters=filters, 424 row_groups=row_groups, 425 categorical_partitions=categorical_partitions, 426 ) 427 elif filters is not None: 428 raise ValueError("cudf cannot apply filters to open file objects.")

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/contextlib.py:79, in ContextDecorator.call..inner(*args, **kwds) 76 @wraps(func) 77 def inner(*args, **kwds): 78 with self._recreate_cm(): ---> 79 return func(*args, **kwds)

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/cudf/io/parquet.py:243, in _process_dataset(paths, fs, filters, row_groups, categorical_partitions) 238 filters = pq._filters_to_expression(filters) 240 # Initialize ds.FilesystemDataset 241 # TODO: Remove the if len(paths) workaround after following bug is fixed: 242 # https://issues.apache.org/jira/browse/ARROW-16438 --> 243 dataset = ds.dataset( 244 source=paths[0] if len(paths) == 1 else paths, 245 filesystem=fs, 246 format="parquet", 247 partitioning="hive", 248 ) 250 file_list = dataset.files 251 if len(file_list) == 0:

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/dataset.py:749, in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes) 738 kwargs = dict( 739 schema=schema, 740 filesystem=filesystem, (...) 745 selector_ignore_prefixes=ignore_prefixes 746 ) 748 if _is_path_like(source): --> 749 return _filesystem_dataset(source, **kwargs) 750 elif isinstance(source, (tuple, list)): 751 if all(_is_path_like(elem) for elem in source):

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/dataset.py:451, in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes) 443 options = FileSystemFactoryOptions( 444 partitioning=partitioning, 445 partition_base_dir=partition_base_dir, 446 exclude_invalid_files=exclude_invalid_files, 447 selector_ignore_prefixes=selector_ignore_prefixes 448 ) 449 factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options) --> 451 return factory.finish(schema)

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/_dataset.pyx:1885, in pyarrow._dataset.DatasetFactory.finish()

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/error.pxi:144, in pyarrow.lib.pyarrow_internal_check_status()

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/error.pxi:100, in pyarrow.lib.check_status()

ArrowInvalid: Error creating dataset. Could not read schema from '/rapids_hpo/data/airlines.parquet': Could not open Parquet input source '/rapids_hpo/data/airlines.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?

skirui-source avatar Feb 02 '23 02:02 skirui-source

Looks like there is something wrong with the dataset. Could you share a link to the full notebook?

jacobtomlinson avatar Feb 03 '23 15:02 jacobtomlinson

I am testing this notebook in a GCP Dataproc cluster using the airline parquet dataset located in my GCS bucket (i think you need to access GCP for this)

skirui-source avatar Feb 06 '23 21:02 skirui-source

In that case my guess from the error would be that either the dataset in GCS is corrupted, or the client/workers can't access the file correctly.

It would help if you could share a complete example of what you ran to get the error. Happy to sync up if that's easier.

Side-note: As you're using the HPO_Demo.ipynb notebook for testing and therefore are the most familiar with it I've assigned https://github.com/rapidsai/cloud-ml-examples/issues/208 to you.

jacobtomlinson avatar Feb 07 '23 11:02 jacobtomlinson

I loaded the data locally and it works. Seems to be an issue on Dataproc side. I was considering loading the parquet dataset into a BigQuery table as this notebook example does. But I think we should still find time to sync.

OUTPUT
(dataproc) skirui@skirui-HP-Z8-G4-Workstation:~/Desktop$ parquet-tools inspect airline_small.parquet

############ file meta data ############
created_by:
num_columns: 14
num_rows: 200000
num_row_groups: 1
format_version: 1.0
serialized_size: 3102


############ Columns ############
ArrDelayBinary
Year
Month
DayofMonth
DayofWeek
CRSDepTime
CRSArrTime
UniqueCarrier
FlightNum
ActualElapsedTime
Origin
Dest
Distance
Diverted

############ Column(ArrDelayBinary) ############
name: ArrDelayBinary
path: ArrDelayBinary
max_definition_level: 0
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 2%)

############ Column(Year) ############
name: Year
path: Year
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(Month) ############
name: Month
path: Month
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(DayofMonth) ############
name: DayofMonth
path: DayofMonth
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(DayofWeek) ############
name: DayofWeek
path: DayofWeek
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(CRSDepTime) ############
name: CRSDepTime
path: CRSDepTime
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 36%)

############ Column(CRSArrTime) ############
name: CRSArrTime
path: CRSArrTime
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 7%)

############ Column(UniqueCarrier) ############
name: UniqueCarrier
path: UniqueCarrier
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 9%)

############ Column(FlightNum) ############
name: FlightNum
path: FlightNum
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 1%)

############ Column(ActualElapsedTime) ############
name: ActualElapsedTime
path: ActualElapsedTime
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(Origin) ############
name: Origin
path: Origin
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 18%)

############ Column(Dest) ############
name: Dest
path: Dest
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 16%)

############ Column(Distance) ############
name: Distance
path: Distance
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 1%)

############ Column(Diverted) ############
name: Diverted
path: Diverted
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

skirui-source avatar Feb 13 '23 21:02 skirui-source

Update:

For this issue, I will be testing the bigquery_dataproc_dask_xgboost.ipynb that shows how to use Dask to process dataset from big query leveraging Dask-BigQuery connector on a Dataproc cluster.

Whereas hpo_demo.ipynb will be tested on a EC2 instance as tracked by PR #243

skirui-source avatar Feb 16 '23 02:02 skirui-source

@jacobtomlinson I am seeing this error when Starting a YarnCluster

from dask.distributed import Client
from dask_yarn import YarnCluster
​
cluster = YarnCluster(worker_class="dask_cuda.CUDAWorker", 
    worker_gpus=1, worker_vcores=4, worker_memory='24GB', 
    worker_env={"CONDA_PREFIX":"/opt/conda/default/"})
cluster.scale(4)
23/02/22 05:17:09 INFO client.RMProxy: Connecting to ResourceManager at test-dask-rapids-2212-m/10.138.0.4:8032
23/02/22 05:17:10 INFO client.AHSProxy: Connecting to Application History server at test-dask-rapids-2212-m/10.138.0.4:10200
23/02/22 05:17:10 INFO skein.Driver: Driver started, listening on 40903
23/02/22 05:17:10 INFO conf.Configuration: found resource resource-types.xml at file:/etc/hadoop/conf.empty/resource-types.xml
23/02/22 05:17:10 INFO resource.ResourceUtils: Adding resource type - name = yarn.io/gpu, units = , type = COUNTABLE
23/02/22 05:17:10 INFO skein.Driver: Uploading application resources to hdfs://test-dask-rapids-2212-m/user/root/.skein/application_1677037421032_0005
23/02/22 05:17:11 INFO skein.Driver: Submitting application...
23/02/22 05:17:11 INFO impl.YarnClientImpl: Submitted application application_1677037421032_0005
/opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/dask_yarn/core.py:687: RuntimeWarning: coroutine 'rpc.close_rpc' was never awaited
  self.scheduler_comm.close_rpc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
client = Client(cluster)
client
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[8], line 1
----> 1 client = Client(cluster)
      2 client

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/distributed/client.py:884, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    881 elif isinstance(getattr(address, "scheduler_address", None), str):
    882     # It's a LocalCluster or LocalCluster-compatible object
    883     self.cluster = address
--> 884     status = self.cluster.status
    885     if status in (Status.closed, Status.closing):
    886         raise RuntimeError(
    887             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    888         )

AttributeError: 'YarnCluster' object has no attribute 'status'

skirui-source avatar Feb 22 '23 05:02 skirui-source

Looks related to this issue https://github.com/dask/dask-yarn/issues/155

jacobtomlinson avatar Feb 22 '23 14:02 jacobtomlinson

As a workaround can you try client = Client(cluster.scheduler_address).

jacobtomlinson avatar Feb 22 '23 15:02 jacobtomlinson

@jacobtomlinson Blocked by this when attempting to read from BigQuery table.. I have tried following the instructions to enable authentication with service account but no success, we might need to pair on this?

{
  "error": {
    "code": 401,
    "message": "Request is missing required authentication credential. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.",
    "errors": [
      {
        "message": "Login Required.",
        "domain": "global",
        "reason": "required",
        "location": "Authorization",
        "locationType": "header"
      }
    ],
    "status": "UNAUTHENTICATED",
    "details": [
      {
        "@type": "type.googleapis.com/google.rpc.ErrorInfo",
        "reason": "CREDENTIALS_MISSING",
        "domain": "googleapis.com",
        "metadata": {
          "method": "google.cloud.bigquery.v2.TableService.GetTable",
          "service": "bigquery.googleapis.com"
        }
      }
    ]
  }
}
TRACEBACK
Forbidden                                 Traceback (most recent call last)
Cell In[5], line 7
      4 n_workers = len(workers)
      5 print('Number of GPU workers: ', n_workers)
----> 7 ddf = dask_bigquery.read_gbq(
      8     project_id="k80-exploration",
      9     dataset_id="spark_rapids",
     10     table_id="nyc_taxi_0",
     11 )
     13 ddf.head()

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/dask_bigquery/core.py:123, in read_gbq(project_id, dataset_id, table_id, row_filter, columns, max_stream_count, read_kwargs)
    121 read_kwargs = read_kwargs or {}
    122 with bigquery_clients(project_id) as (bq_client, bqs_client):
--> 123     table_ref = bq_client.get_table(f"{dataset_id}.{table_id}")
    124     if table_ref.table_type == "VIEW":
    125         raise TypeError("Table type VIEW not supported")

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/cloud/bigquery/client.py:1011, in Client.get_table(self, table, retry, timeout)
   1009 path = table_ref.path
   1010 span_attributes = {"path": path}
-> 1011 api_response = self._call_api(
   1012     retry,
   1013     span_name="BigQuery.getTable",
   1014     span_attributes=span_attributes,
   1015     method="GET",
   1016     path=path,
   1017     timeout=timeout,
   1018 )
   1019 return Table.from_api_repr(api_response)

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/cloud/bigquery/client.py:759, in Client._call_api(self, retry, span_name, span_attributes, job_ref, headers, **kwargs)
    755 if span_name is not None:
    756     with create_span(
    757         name=span_name, attributes=span_attributes, client=self, job_ref=job_ref
    758     ):
--> 759         return call()
    761 return call()

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/api_core/retry.py:349, in Retry.__call__.<locals>.retry_wrapped_func(*args, **kwargs)
    345 target = functools.partial(func, *args, **kwargs)
    346 sleep_generator = exponential_sleep_generator(
    347     self._initial, self._maximum, multiplier=self._multiplier
    348 )
--> 349 return retry_target(
    350     target,
    351     self._predicate,
    352     sleep_generator,
    353     self._timeout,
    354     on_error=on_error,
    355 )

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/api_core/retry.py:191, in retry_target(target, predicate, sleep_generator, timeout, on_error, **kwargs)
    189 for sleep in sleep_generator:
    190     try:
--> 191         return target()
    193     # pylint: disable=broad-except
    194     # This function explicitly must deal with broad exceptions.
    195     except Exception as exc:

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/cloud/_http/__init__.py:494, in JSONConnection.api_request(self, method, path, query_params, data, content_type, headers, api_base_url, api_version, expect_json, _target_object, timeout, extra_api_info)
    482 response = self._make_request(
    483     method=method,
    484     url=url,
   (...)
    490     extra_api_info=extra_api_info,
    491 )
    493 if not 200 <= response.status_code < 300:
--> 494     raise exceptions.from_http_response(response)
    496 if expect_json and response.content:
    497     return response.json()

Forbidden: 403 GET https://bigquery.googleapis.com/bigquery/v2/projects/k80-exploration/datasets/spark_rapids/tables/nyc_taxi_0?prettyPrint=false: Access Denied: Table k80-exploration:spark_rapids.nyc_taxi_0: Permission bigquery.tables.get denied on table k80-exploration:spark_rapids.nyc_taxi_0 (or it may not exist).

skirui-source avatar Mar 06 '23 20:03 skirui-source

The error message also says or it may not exist so I had a poke through the k80-exploration project in the Google Cloud Console and couldn't find a dataset called spark_rapids or anything related to nyc_taxi data. Perhaps this dataset has been deleted?

jacobtomlinson avatar Mar 07 '23 11:03 jacobtomlinson

Client(cluster.scheduler_address)

Are there any plans for a version release that doesn't require a workaround like this? In my case, the workaround did not help - Using Client(cluster.scheduler_address) instead of Client(cluster) results in the cluster never connecting.

python==3.9.16
dask==2023.9.2
dask-yarn=0.8.1  # (also tried 0.9)
distributed==2023.9.2

ebaker-gh avatar Jan 16 '24 20:01 ebaker-gh

Closing in favour of https://github.com/GoogleCloudDataproc/initialization-actions/issues/1137

jacobtomlinson avatar Feb 21 '24 16:02 jacobtomlinson