Ray version
What ray version are you using in the code? I notice that it is ray 3.0.0. However, the latest ray is 2.9.1
While running the code, I get the following error, have you ever seen it before?
(PredictionWorker pid=22743) [2024-01-25 12:10:29,553] [INFO] [utils.py:838:see_memory_usage] CPU Virtual Memory: used = 22.86 GB, percent = 3.1%
2024-01-25 12:10:32,206 WARNING actor_pool_map_operator.py:267 -- To ensure full parallelization across an actor pool of size 1, the specified batch size should be at most 0. Your configured batch size for this operator was 8.
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /mnt/deepspeed_inference_actors.py:176 in
│ │
│ 173 ) │
│ 174 │
│ 175 # %% │
│ ❱ 176 print(pred.to_pandas()) │
│ 177 │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/dataset.py:3705 in to_pandas │
│ │
│ 3702 │ │ │ A Pandas DataFrame created from this dataset, containing a limited │
│ 3703 │ │ │ number of records. │
│ 3704 │ │ """ │
│ ❱ 3705 │ │ count = self.count() │
│ 3706 │ │ if count > limit: │
│ 3707 │ │ │ raise ValueError( │
│ 3708 │ │ │ │ f"the dataset has more than the given limit of {limit} " │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/dataset.py:2262 in count │
│ │
│ 2259 │ │ │
│ 2260 │ │ return sum( │
│ 2261 │ │ │ ray.get( │
│ ❱ 2262 │ │ │ │ [get_num_rows.remote(block) for block in self.get_internal_block_refs()] │
│ 2263 │ │ │ ) │
│ 2264 │ │ ) │
│ 2265 │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/dataset.py:4235 in │
│ get_internal_block_refs │
│ │
│ 4232 │ │ Returns: │
│ 4233 │ │ │ A list of references to this dataset's blocks. │
│ 4234 │ │ """ │
│ ❱ 4235 │ │ blocks = self._plan.execute().get_blocks() │
│ 4236 │ │ self._synchronize_progress_bar() │
│ 4237 │ │ return blocks │
│ 4238 │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/plan.py:591 in │
│ execute │
│ │
│ 588 │ │ │ │ │ ) │
│ 589 │ │ │ │ else: │
│ 590 │ │ │ │ │ executor = BulkExecutor(copy.deepcopy(context.execution_options)) │
│ ❱ 591 │ │ │ │ blocks = execute_to_legacy_block_list( │
│ 592 │ │ │ │ │ executor, │
│ 593 │ │ │ │ │ self, │
│ 594 │ │ │ │ │ allow_clear_input_blocks=allow_clear_input_blocks, │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_comp │
│ at.py:116 in execute_to_legacy_block_list │
│ │
│ 113 │ │ preserve_order, │
│ 114 │ ) │
│ 115 │ bundles = executor.execute(dag, initial_stats=stats) │
│ ❱ 116 │ block_list = _bundles_to_block_list(bundles) │
│ 117 │ # Set the stats UUID after execution finishes. │
│ 118 │ _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid) │
│ 119 │ return block_list │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_comp │
│ at.py:349 in _bundles_to_block_list │
│ │
│ 346 def _bundles_to_block_list(bundles: Iterator[RefBundle]) -> BlockList: │
│ 347 │ blocks, metadata = [], [] │
│ 348 │ owns_blocks = True │
│ ❱ 349 │ for ref_bundle in bundles: │
│ 350 │ │ if not ref_bundle.owns_blocks: │
│ 351 │ │ │ owns_blocks = False │
│ 352 │ │ for block, meta in ref_bundle.blocks: │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces. │
│ py:548 in next │
│ │
│ 545 │ │ return next(self._it) │
│ 546 │ │
│ 547 │ def next(self) -> RefBundle: │
│ ❱ 548 │ │ return self.get_next() │
│ 549 │
│ 550 │
│ 551 class Executor: │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_e │
│ xecutor.py:129 in get_next │
│ │
│ 126 │ │ │ │ │ │ else: │
│ 127 │ │ │ │ │ │ │ raise StopIteration │
│ 128 │ │ │ │ │ elif isinstance(item, Exception): │
│ ❱ 129 │ │ │ │ │ │ raise item │
│ 130 │ │ │ │ │ else: │
│ 131 │ │ │ │ │ │ # Otherwise return a concrete RefBundle. │
│ 132 │ │ │ │ │ │ if self._outer._global_info: │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_e │
│ xecutor.py:187 in run │
│ │
│ 184 │ │ """ │
│ 185 │ │ try: │
│ 186 │ │ │ # Run scheduling loop until complete. │
│ ❱ 187 │ │ │ while self._scheduling_loop_step(self._topology) and not self._shutdown: │
│ 188 │ │ │ │ pass │
│ 189 │ │ except Exception as e: │
│ 190 │ │ │ # Propagate it to the result iterator. │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_e │
│ xecutor.py:256 in _scheduling_loop_step │
│ │
│ 253 │ │ │ │ break │
│ 254 │ │ │ if DEBUG_TRACE_SCHEDULING: │
│ 255 │ │ │ │ _debug_dump_topology(topology) │
│ ❱ 256 │ │ │ topology[op].dispatch_next_task() │
│ 257 │ │ │ cur_usage = TopologyResourceUsage.of(topology) │
│ 258 │ │ │ op = select_operator_to_run( │
│ 259 │ │ │ │ topology, │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_e │
│ xecutor_state.py:197 in dispatch_next_task │
│ │
│ 194 │ │ """Move a bundle from the operator inqueue to the operator itself.""" │
│ 195 │ │ for i, inqueue in enumerate(self.inqueues): │
│ 196 │ │ │ if inqueue: │
│ ❱ 197 │ │ │ │ self.op.add_input(inqueue.popleft(), input_index=i) │
│ 198 │ │ │ │ return │
│ 199 │ │ assert False, "Nothing to dispatch" │
│ 200 │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/operators/m │
│ ap_operator.py:193 in add_input │
│ │
│ 190 │ │ │ # If the bundler has a full bundle, add it to the operator's task submission │
│ 191 │ │ │ # queue. │
│ 192 │ │ │ bundle = self._block_ref_bundler.get_next_bundle() │
│ ❱ 193 │ │ │ self._add_bundled_input(bundle) │
│ 194 │ │
│ 195 │ def _get_runtime_ray_remote_args( │
│ 196 │ │ self, input_bundle: Optional[RefBundle] = None │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/operators/a │
│ ctor_pool_map_operator.py:147 in _add_bundled_input │
│ │
│ 144 │ def _add_bundled_input(self, bundle: RefBundle): │
│ 145 │ │ self._bundle_queue.append(bundle) │
│ 146 │ │ # Try to dispatch all bundles in the queue, including this new bundle. │
│ ❱ 147 │ │ self._dispatch_tasks() │
│ 148 │ │
│ 149 │ def _dispatch_tasks(self): │
│ 150 │ │ """Try to dispatch tasks from the bundle buffer to the actor pool. │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/operators/a │
│ ctor_pool_map_operator.py:160 in _dispatch_tasks │
│ │
│ 157 │ │ while self._bundle_queue: │
│ 158 │ │ │ # Pick an actor from the pool. │
│ 159 │ │ │ if self._actor_locality_enabled: │
│ ❱ 160 │ │ │ │ actor = self._actor_pool.pick_actor(self._bundle_queue[0]) │
│ 161 │ │ │ else: │
│ 162 │ │ │ │ actor = self._actor_pool.pick_actor() │
│ 163 │ │ │ if actor is None: │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/operators/a │
│ ctor_pool_map_operator.py:596 in pick_actor │
│ │
│ 593 │ │ │ return None │
│ 594 │ │ │
│ 595 │ │ if locality_hint: │
│ ❱ 596 │ │ │ preferred_loc = self._get_location(locality_hint) │
│ 597 │ │ else: │
│ 598 │ │ │ preferred_loc = None │
│ 599 │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/operators/a │
│ ctor_pool_map_operator.py:760 in _get_location │
│ │
│ 757 │ │ Returns: │
│ 758 │ │ │ A node id associated with the bundle, or None if unknown. │
│ 759 │ │ """ │
│ ❱ 760 │ │ return bundle.get_cached_location() │
│ 761 │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces. │
│ py:105 in get_cached_location │
│ │
│ 102 │ │ │ ref = self.blocks[0][0] │
│ 103 │ │ │ # This call is pretty fast for owned objects (~5k/s), so we don't need to │
│ 104 │ │ │ # batch it for now. │
│ ❱ 105 │ │ │ locs = ray.experimental.get_object_locations([ref]) │
│ 106 │ │ │ nodes = locs[ref]["node_ids"] │
│ 107 │ │ │ if nodes: │
│ 108 │ │ │ │ self._cached_location = nodes[0] │
│ │
│ /opt/conda/envs/domino-ray/lib/python3.10/site-packages/ray/experimental/locations.py:38 in │
│ get_object_locations │
│ │
│ 35 │ """ │
│ 36 │ if not ray.is_initialized(): │
│ 37 │ │ raise RuntimeError("Ray hasn't been initialized.") │
│ ❱ 38 │ return ray._private.worker.global_worker.core_worker.get_object_locations( │
│ 39 │ │ obj_refs, timeout_ms │
│ 40 │ ) │
│ 41 │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: 'Worker' object has no attribute 'core_worker'
Ray 3.0.0 was the nightly. This repository is no longer maintained!