tutorials
tutorials copied to clipboard
ray client is not supported with ray data -> /tutorials/beginner/execution_engine.html only works with local cluster
When running the "ray" example as provided here, the example fails, if a remote cluster is used.
This ray issue seems related: https://github.com/ray-project/ray/issues/41333
Repro-steps:
- set up a ray cluster, not on localhost
import os
import pandas as pd
from fugue import transform
os.environ["RAY_ADDRESS"] = "ray://<ray-cluster>:10001"
df = pd.DataFrame({"col1": [1,2,3,4], "col2": [1,2,3,4]})
# schema: *, col3:int
def add_cols(df:pd.DataFrame) -> pd.DataFrame:
return df.assign(col3 = df['col1'] + df['col2'])
It would be nice, if this is documented somewhere (or is there a fix for that?)
Error:
_0 _State.RUNNING -> _State.FAILED Global node is not initialized.
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[4], line 3
1 ray.init(ignore_reinit_error=True)
----> 3 ray_df = transform(df, add_cols, engine="ray")
4 ray_df.show(5)
File /opt/conda/lib/python3.11/site-packages/fugue/workflow/api.py:174, in transform(df, using, schema, params, partition, callback, ignore_errors, persist, as_local, save_path, checkpoint, engine, engine_conf, as_fugue)
171 else:
172 tdf.save(save_path, fmt="parquet")
--> 174 dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
175 if checkpoint:
176 result = dag.yields["result"].result # type:ignore
File /opt/conda/lib/python3.11/site-packages/fugue/workflow/workflow.py:1604, in FugueWorkflow.run(self, engine, conf, **kwargs)
1602 if ctb is None: # pragma: no cover
1603 raise
-> 1604 raise ex.with_traceback(ctb)
1605 self._computed = True
1606 return FugueWorkflowResult(self.yields)
File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:240, in RayExecutionEngine.to_df(self, df, schema)
239 def to_df(self, df: Any, schema: Any = None) -> DataFrame:
--> 240 return self._to_ray_df(df, schema=schema)
File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:329, in RayExecutionEngine._to_ray_df(self, df, schema)
327 def _to_ray_df(self, df: Any, schema: Any = None) -> RayDataFrame:
328 # TODO: remove this in phase 2
--> 329 res = self._to_auto_df(df, schema)
330 if not isinstance(res, RayDataFrame):
331 return RayDataFrame(res)
File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:342, in RayExecutionEngine._to_auto_df(self, df, schema)
337 assert_or_throw(
338 schema is None,
339 ValueError("schema must be None when df is a DataFrame"),
340 )
341 return df
--> 342 return RayDataFrame(df, schema)
File /opt/conda/lib/python3.11/site-packages/fugue_ray/dataframe.py:105, in RayDataFrame.__init__(self, df, schema, internal_schema)
103 else:
104 raise ValueError(f"{df} is incompatible with RayDataFrame")
--> 105 rdf, schema = self._apply_schema(rdf, schema, internal_schema)
106 super().__init__(schema)
107 self._native = rdf
File /opt/conda/lib/python3.11/site-packages/fugue_ray/dataframe.py:238, in RayDataFrame._apply_schema(self, rdf, schema, internal_schema)
236 if internal_schema:
237 return rdf, schema
--> 238 fmt, rdf = get_dataset_format(rdf)
239 if fmt is None: # empty
240 schema = _input_schema(schema).assert_not_empty()
File /opt/conda/lib/python3.11/site-packages/fugue_ray/_utils/dataframe.py:32, in get_dataset_format(df)
30 def get_dataset_format(df: rd.Dataset) -> Tuple[Optional[str], rd.Dataset]:
31 df = materialize(df)
---> 32 if df.count() == 0:
33 return None, df
34 if ray.__version__ < "2.5.0": # pragma: no cover
File /opt/conda/lib/python3.11/site-packages/ray/data/dataset.py:2598, in Dataset.count(self)
2595 return 0
2597 # For parquet, we can return the count directly from metadata.
-> 2598 meta_count = self._meta_count()
2599 if meta_count is not None:
2600 return meta_count
File /opt/conda/lib/python3.11/site-packages/ray/data/dataset.py:5108, in Dataset._meta_count(self)
5107 def _meta_count(self) -> Optional[int]:
-> 5108 return self._plan.meta_count()
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/plan.py:496, in ExecutionPlan.meta_count(self)
491 return None
492 elif self._in_blocks is not None and self._snapshot_blocks is None:
493 # If the plan only has input blocks, we execute it, so snapshot has output.
494 # This applies to newly created dataset. For example, initial dataset
495 # from read, and output datasets of Dataset.split().
--> 496 self.execute()
497 # Snapshot is now guaranteed to be the final block or None.
498 return self._get_num_rows_from_blocks_metadata(self._snapshot_blocks)
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/plan.py:628, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read, preserve_order)
621 metrics_tag = create_dataset_tag(
622 self._dataset_name, self._dataset_uuid
623 )
624 executor = StreamingExecutor(
625 copy.deepcopy(context.execution_options),
626 metrics_tag,
627 )
--> 628 blocks = execute_to_legacy_block_list(
629 executor,
630 self,
631 allow_clear_input_blocks=allow_clear_input_blocks,
632 dataset_uuid=self._dataset_uuid,
633 preserve_order=preserve_order,
634 )
635 stats = executor.get_stats()
636 stats_summary_string = stats.to_summary().to_string(
637 include_parent=False
638 )
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:125, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid, preserve_order)
107 """Execute a plan with the new executor and translate it into a legacy block list.
108
109 Args:
(...)
117 The output as a legacy block list.
118 """
119 dag, stats = _get_execution_dag(
120 executor,
121 plan,
122 allow_clear_input_blocks,
123 preserve_order,
124 )
--> 125 bundles = executor.execute(dag, initial_stats=stats)
126 block_list = _bundles_to_block_list(bundles)
127 # Set the stats UUID after execution finishes.
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py:132, in StreamingExecutor.execute(self, dag, initial_stats)
129 self._global_info = ProgressBar("Running", dag.num_outputs_total())
131 self._output_node: OpState = self._topology[dag]
--> 132 StatsManager.register_dataset_to_stats_actor(
133 self._dataset_tag,
134 self._get_operator_tags(),
135 )
136 self.start()
137 self._execution_started = True
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/stats.py:531, in _StatsManager.register_dataset_to_stats_actor(self, dataset_tag, operator_tags)
530 def register_dataset_to_stats_actor(self, dataset_tag, operator_tags):
--> 531 self._stats_actor().register_dataset.remote(dataset_tag, operator_tags)
File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/stats.py:414, in _StatsManager._stats_actor(self, create_if_not_exists)
412 def _stats_actor(self, create_if_not_exists=True) -> _StatsActor:
413 if ray._private.worker._global_node is None:
--> 414 raise RuntimeError("Global node is not initialized.")
415 current_cluster_id = ray._private.worker._global_node.cluster_id
416 if (
417 self._stats_actor_handle is None
418 or self._stats_actor_cluster_id != current_cluster_id
419 ):
RuntimeError: Global node is not initialized.
Hi @andnig ,
Sorry for the late response. Just got back from holidays. We test this on Anyscale and it is supposed on a remote cluster. Can you give me your versions for Ray and Fugue?
Hey, no problem. I worked around the issue, so it was ultimately not a blocker for me.
fugue: 0.8.6 ray: 2.9.0
I think the issue might be that ray data does not support ray client. Ray data seems to be used in the Dataframe example. (See here for more details: https://github.com/ray-project/ray/issues/41333#issuecomment-1828715864)
As noted, this is separate from #41333 because that is only a workaround to address the fact that rat data does not support using an already connected client without wrapping your actor in a task. This issue is asking for tay client support, not asking for a workaround in its absence. Under the hood, ray data just uses cached RemoteFunction objects (IE what the @ray.remote decorator creates). The only real difference I see is they are invoking remote directly instead of using decorators. Is there no way to configure ray data to connect to an existing cluster's head node without scheduling it as a task through the client (essentially wrapping it in another RemoteFunction)?