deap_ray icon indicating copy to clipboard operation
deap_ray copied to clipboard

[actor_pool] Using Actorpool.map_unordered in Deap causes A worker died or was killed while executing task

Open ghost opened this issue 4 years ago • 2 comments

It's mentioned in worker. get that this method will issue a warning if it's running inside async context. The problem I encounter is likely to be caused by this reason.

import ray
import time
from ray.util import ActorPool
from deap import algorithms, base, creator, gp, tools
......

@ray.remote(num_cpus=1)
class Ray_Deap_Map():
    def __init__(self, creator_setup=None, pset_creator=None):
        # issue 946? Ensure non trivial startup to prevent bad load balance across a cluster
        time.sleep(0.01)

        # recreate scope from global
        # For GA no need to provide pset_creator. Both needed for GP
        self.creator_setup = creator_setup
        if creator_setup is not None:
            self.creator_setup()

        self.pset_creator = pset_creator
        if pset_creator is not None:
            self.pset_creator()

    def ray_remote_eval_batch(self, f, zipped_input):
        iterable, id_ = zipped_input
        # attach id so we can reorder the batches
        return [(f(i), id_) for i in iterable]

class Ray_Deap_Map_Manager():
    def __init__(self, creator_setup=None, pset_creator=None):

        # Can adjust the number of processes in ray.init or when launching cluster
        self.n_workers = int(ray.cluster_resources()['CPU'])

        # recreate scope from global (for ex need toolbox in gp too)
        self.creator_setup = creator_setup
        self.pset_creator = pset_creator

    def map(self, func, iterable):

        if self.n_workers == 1:
            # only 1 worker, normal listcomp/map will work fine. Useful for testing code?
            ##results = [func(item) for item in iterable]
            results = list(map(func, iterable))  # forced eval to time it
        else:
            # many workers, lets use ActorPool

            if len(iterable) < self.n_workers:
                n_workers = len(iterable)
            else:
                n_workers = self.n_workers

            n_per_batch = int(len(iterable) / n_workers) + 1
            batches = [iterable[i:i + n_per_batch] for i in range(0, len(iterable), n_per_batch)]
            id_for_reorder = range(len(batches))

            eval_pool = ActorPool(
                [Ray_Deap_Map.remote(self.creator_setup, self.pset_creator) for _ in range(n_workers)])

            unordered_results = list(eval_pool.map_unordered(
                lambda actor, input_tuple: actor.ray_remote_eval_batch.remote(func, input_tuple),
                zip(batches, id_for_reorder)))

            # ensure order of batches
            ordered_batch_results = [batch for batch_id in id_for_reorder for batch in unordered_results if
                                     batch_id == batch[0][1]]

            # flatten batches to list of fitnes
            results = [item[0] for sublist in ordered_batch_results for item in sublist]

        return results

# This is what we register as map in deap toolbox.
# For GA no need to provide pset_creator. Both needed for GP
def ray_deap_map(func, pop, creator_setup=None, pset_creator=None):
    # Manager will determine if batching is needed and crate remote actors to do work
    map_ray_manager = Ray_Deap_Map_Manager(creator_setup, pset_creator)
    results = map_ray_manager.map(func, pop)
    return results

def Tool(pset, max_c, ids):
    toolbox = base.Toolbox()
    ......
    toolbox.register("evaluate", EvaluationBacktest, toolbox=toolbox, max_c=max_c, ids=ids)
    toolbox.register("map", ray_deap_map, creator_setup=Creator_Setup,
                     pset_creator=Pset_Creator)
    return toolbox

def EvaluationBacktest(individual, toolbox, max_c, ids):
    func = toolbox.compile(expr=individual)
    ......
    try:
        df = func(......)
        ......
    except:
        result = -100
    return result,


fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)

Here is the error message:

2021-06-10 21:12:18,290	WARNING worker.py:1115 -- The log monitor on node ALPHA failed with the following error:
OSError: [WinError 87] 参数错误。

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 364, in <module>
    log_monitor.run()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 285, in run
    self.open_closed_files()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 172, in open_closed_files
    self.close_all_files()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 107, in close_all_files
    os.kill(file_info.worker_pid, 0)
SystemError: <built-in function kill> returned a result with an error set

2021-06-10 21:12:18,295	WARNING worker.py:1115 -- A worker died or was killed while executing task ffffffffffffffff151a7c83ccb29b66d20c565901000000.
Traceback (most recent call last):

  File "D:\Factor\GA\GA_LongShortStrategy.py", line 569, in <module>
    pop = algorithms.eaMuPlusLambda(pop, toolbox, mu=mu, lambda_=lambda_,

  File "C:\Users\Administrator\anaconda3\lib\site-packages\deap\algorithms.py", line 320, in eaMuPlusLambda
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)

  File "D:\Factor\GA\RayMap.py", line 89, in ray_deap_map
    results = map_ray_manager.map(func, pop)

  File "D:\Factor\GA\RayMap.py", line 70, in map
    unordered_results = list(eval_pool.map_unordered(

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\util\actor_pool.py", line 91, in map_unordered
    yield self.get_next_unordered()

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\util\actor_pool.py", line 210, in get_next_unordered
    return ray.get(future)

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\worker.py", line 1483, in get
    raise value

RayActorError: The actor died unexpectedly before finishing this task.

Any help would be appreciated.

ghost avatar Jun 11 '21 11:06 ghost

I wonder if the 'ALPHA fail' error you are seeing is similar to this: https://github.com/ray-project/ray/issues/13511#issuecomment-773909246

Can you try to initialize ray like they suggest in that thread: ray.init(log_to_driver=False)

DMTSource avatar Jun 11 '21 13:06 DMTSource

We've set the log_to_driver to False.

num_cpus = min(cpu_count(), 64 - 3)
num_gpus = 0
log_to_driver = False
rayparam = [num_cpus, num_gpus, log_to_driver]
ray.init(num_cpus=rayparam[0], num_gpus=rayparam[1], log_to_driver=rayparam[2])

If we set the log_to_driver to True. Sometimes we get an error report, but we can get results.

2021-06-11 19:05:38,691 WARNING worker.py:1115 -- The log monitor on node ALPHA failed with the following error:
OSError: [WinError 87] 参数错误。

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 364, in <module>
    log_monitor.run()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 285, in run
    self.open_closed_files()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 172, in open_closed_files
    self.close_all_files()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 107, in close_all_files
    os.kill(file_info.worker_pid, 0)
SystemError: <class 'OSError'> returned a result with an error set

GA 耗时:03小时16分20秒  # GA running time: 03 hours 16 minutes 20 seconds

The code for printing the running time of the program is on the last line

yinlinzhan avatar Jun 11 '21 15:06 yinlinzhan