tutorials icon indicating copy to clipboard operation
tutorials copied to clipboard

ray client is not supported with ray data -> /tutorials/beginner/execution_engine.html only works with local cluster

Open andnig opened this issue 8 months ago • 3 comments

When running the "ray" example as provided here, the example fails, if a remote cluster is used.

This ray issue seems related: https://github.com/ray-project/ray/issues/41333

Repro-steps:

  1. set up a ray cluster, not on localhost
import os
import pandas as pd
from fugue import transform 

os.environ["RAY_ADDRESS"] = "ray://<ray-cluster>:10001"

df = pd.DataFrame({"col1": [1,2,3,4], "col2": [1,2,3,4]})

# schema: *, col3:int
def add_cols(df:pd.DataFrame) -> pd.DataFrame:
    return df.assign(col3 = df['col1'] + df['col2'])

It would be nice, if this is documented somewhere (or is there a fix for that?)

Error:

_0 _State.RUNNING -> _State.FAILED  Global node is not initialized.
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[4], line 3
      1 ray.init(ignore_reinit_error=True)
----> 3 ray_df = transform(df, add_cols, engine="ray")
      4 ray_df.show(5)

File /opt/conda/lib/python3.11/site-packages/fugue/workflow/api.py:174, in transform(df, using, schema, params, partition, callback, ignore_errors, persist, as_local, save_path, checkpoint, engine, engine_conf, as_fugue)
    171     else:
    172         tdf.save(save_path, fmt="parquet")
--> 174 dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
    175 if checkpoint:
    176     result = dag.yields["result"].result  # type:ignore

File /opt/conda/lib/python3.11/site-packages/fugue/workflow/workflow.py:1604, in FugueWorkflow.run(self, engine, conf, **kwargs)
   1602             if ctb is None:  # pragma: no cover
   1603                 raise
-> 1604             raise ex.with_traceback(ctb)
   1605         self._computed = True
   1606 return FugueWorkflowResult(self.yields)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:240, in RayExecutionEngine.to_df(self, df, schema)
    239 def to_df(self, df: Any, schema: Any = None) -> DataFrame:
--> 240     return self._to_ray_df(df, schema=schema)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:329, in RayExecutionEngine._to_ray_df(self, df, schema)
    327 def _to_ray_df(self, df: Any, schema: Any = None) -> RayDataFrame:
    328     # TODO: remove this in phase 2
--> 329     res = self._to_auto_df(df, schema)
    330     if not isinstance(res, RayDataFrame):
    331         return RayDataFrame(res)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:342, in RayExecutionEngine._to_auto_df(self, df, schema)
    337     assert_or_throw(
    338         schema is None,
    339         ValueError("schema must be None when df is a DataFrame"),
    340     )
    341     return df
--> 342 return RayDataFrame(df, schema)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/dataframe.py:105, in RayDataFrame.__init__(self, df, schema, internal_schema)
    103 else:
    104     raise ValueError(f"{df} is incompatible with RayDataFrame")
--> 105 rdf, schema = self._apply_schema(rdf, schema, internal_schema)
    106 super().__init__(schema)
    107 self._native = rdf

File /opt/conda/lib/python3.11/site-packages/fugue_ray/dataframe.py:238, in RayDataFrame._apply_schema(self, rdf, schema, internal_schema)
    236 if internal_schema:
    237     return rdf, schema
--> 238 fmt, rdf = get_dataset_format(rdf)
    239 if fmt is None:  # empty
    240     schema = _input_schema(schema).assert_not_empty()

File /opt/conda/lib/python3.11/site-packages/fugue_ray/_utils/dataframe.py:32, in get_dataset_format(df)
     30 def get_dataset_format(df: rd.Dataset) -> Tuple[Optional[str], rd.Dataset]:
     31     df = materialize(df)
---> 32     if df.count() == 0:
     33         return None, df
     34     if ray.__version__ < "2.5.0":  # pragma: no cover

File /opt/conda/lib/python3.11/site-packages/ray/data/dataset.py:2598, in Dataset.count(self)
   2595     return 0
   2597 # For parquet, we can return the count directly from metadata.
-> 2598 meta_count = self._meta_count()
   2599 if meta_count is not None:
   2600     return meta_count

File /opt/conda/lib/python3.11/site-packages/ray/data/dataset.py:5108, in Dataset._meta_count(self)
   5107 def _meta_count(self) -> Optional[int]:
-> 5108     return self._plan.meta_count()

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/plan.py:496, in ExecutionPlan.meta_count(self)
    491     return None
    492 elif self._in_blocks is not None and self._snapshot_blocks is None:
    493     # If the plan only has input blocks, we execute it, so snapshot has output.
    494     # This applies to newly created dataset. For example, initial dataset
    495     # from read, and output datasets of Dataset.split().
--> 496     self.execute()
    497 # Snapshot is now guaranteed to be the final block or None.
    498 return self._get_num_rows_from_blocks_metadata(self._snapshot_blocks)

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/plan.py:628, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read, preserve_order)
    621 metrics_tag = create_dataset_tag(
    622     self._dataset_name, self._dataset_uuid
    623 )
    624 executor = StreamingExecutor(
    625     copy.deepcopy(context.execution_options),
    626     metrics_tag,
    627 )
--> 628 blocks = execute_to_legacy_block_list(
    629     executor,
    630     self,
    631     allow_clear_input_blocks=allow_clear_input_blocks,
    632     dataset_uuid=self._dataset_uuid,
    633     preserve_order=preserve_order,
    634 )
    635 stats = executor.get_stats()
    636 stats_summary_string = stats.to_summary().to_string(
    637     include_parent=False
    638 )

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:125, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid, preserve_order)
    107 """Execute a plan with the new executor and translate it into a legacy block list.
    108 
    109 Args:
   (...)
    117     The output as a legacy block list.
    118 """
    119 dag, stats = _get_execution_dag(
    120     executor,
    121     plan,
    122     allow_clear_input_blocks,
    123     preserve_order,
    124 )
--> 125 bundles = executor.execute(dag, initial_stats=stats)
    126 block_list = _bundles_to_block_list(bundles)
    127 # Set the stats UUID after execution finishes.

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py:132, in StreamingExecutor.execute(self, dag, initial_stats)
    129     self._global_info = ProgressBar("Running", dag.num_outputs_total())
    131 self._output_node: OpState = self._topology[dag]
--> 132 StatsManager.register_dataset_to_stats_actor(
    133     self._dataset_tag,
    134     self._get_operator_tags(),
    135 )
    136 self.start()
    137 self._execution_started = True

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/stats.py:531, in _StatsManager.register_dataset_to_stats_actor(self, dataset_tag, operator_tags)
    530 def register_dataset_to_stats_actor(self, dataset_tag, operator_tags):
--> 531     self._stats_actor().register_dataset.remote(dataset_tag, operator_tags)

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/stats.py:414, in _StatsManager._stats_actor(self, create_if_not_exists)
    412 def _stats_actor(self, create_if_not_exists=True) -> _StatsActor:
    413     if ray._private.worker._global_node is None:
--> 414         raise RuntimeError("Global node is not initialized.")
    415     current_cluster_id = ray._private.worker._global_node.cluster_id
    416     if (
    417         self._stats_actor_handle is None
    418         or self._stats_actor_cluster_id != current_cluster_id
    419     ):

RuntimeError: Global node is not initialized.

andnig avatar Dec 23 '23 11:12 andnig