DEHB icon indicating copy to clipboard operation
DEHB copied to clipboard

ERROR - Failed to communicate with scheduler during heartbeat. followed by TimeoutError: No valid workers found

Open u3Izx9ql7vW4 opened this issue 1 year ago • 2 comments

Hi,

I'm using DEHB to find parameters for several models running in a loop. During execution I get the following error which effectively terminates the program.

TimeoutError: No valid workers found

Monitoring CPU usage, I see that utilization for the python process gradually climbs to 100%, even though each run only uses 2 workers, and the host computer has 8 cores.

The code instantiating the DEHB instance is this

dehb = DEHB(
            f  = self.objective,
            cs  = self.conf_space,
            dimensions = len(self.conf_space.get_hyperparameters()),
            min_budget = 2,
            max_budget = 10,
            n_workers  = 2,
            output_path="./temp"
        )
trajectory, runtime, history = dehb.run(
                total_cost        = self.total_cost,
                verbose           = False,
                save_intermediate = False,
                seed              = self.seed,
                train_X           = train_X,
                train_y           = train_y,
                valid_X           = valid_X,
                valid_y           = valid_y,
                max_budget  = dehb.max_budget, 
                save_history  = False,
            )

The rest of the code is roughly taken from the random forest example found here: https://automl.github.io/DEHB/latest/examples/01.1_Optimizing_RandomForest_using_DEHB/

Here is the full error right before the No valid worker found error:

2024-05-13 16:53:36,517 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.11/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.11/site-packages/distributed/worker.py", line 1252, in heartbeat
    response = await retry_operation(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/distributed/utils_comm.py", line 455, in retry_operation
    return await retry(
           ^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/distributed/utils_comm.py", line 434, in retry
    return await coro()
           ^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/distributed/core.py", line 1394, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/distributed/core.py", line 1153, in send_recv
    response = await comm.read(deserializers=deserializers)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/distributed/comm/tcp.py", line 237, in read
    convert_stream_closed_error(self, e)
  File "/opt/homebrew/lib/python3.11/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:54877 remote=tcp://127.0.0.1:54831>: Stream is closed
2024-05-13 16:53:37,512 - distributed.nanny - WRN - Restarting worker
2024-05-13 16:53:37,523 - distributed.nanny - WRN - Restarting worker
2024-05-13 16:54:11,726 - distributed.core - ERR - Exception while handling op scatter
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.11/site-packages/distributed/core.py", line 969, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/distributed/scheduler.py", line 6022, in scatter
    raise TimeoutError("No valid workers found")
TimeoutError: No valid workers found

u3Izx9ql7vW4 avatar May 14 '24 00:05 u3Izx9ql7vW4

Hi,

Does n_workers=1 work fine?

Also, is it possible to reproduce your issue on the Random Forest example? Could you also share the distributed,dask package versions?

Thanks.

Neeratyoy avatar May 14 '24 09:05 Neeratyoy

Hey, I am able to run both the random forest example and a small toy example I implemented myself with 2 workers. I used a fresh python 3.11 environment and installed dehb + scikit-learn (for the RF example). From the error messages themselves, I could imagine there is something wrong with dask or distributed. Could you try to give us a minimal example when it crashes for you?

Here's my toy example for reference:

import time
import typing

import ConfigSpace
import numpy as np

from dehb import DEHB


def create_toy_searchspace():
    """Creates a toy searchspace with a single hyperparameter.

    Can be used in order to instantiate a DEHB instance for simple unittests not
    requiring a proper configuration space for optimization.


    Returns:
        ConfigurationSpace: Toy searchspace
    """
    cs = ConfigSpace.ConfigurationSpace()
    cs.add_hyperparameter(
        ConfigSpace.UniformFloatHyperparameter("x0", lower=3, upper=10, log=False))
    return cs

def create_toy_optimizer(configspace: ConfigSpace.ConfigurationSpace, min_fidelity: float,
                         max_fidelity: float, eta: int,
                         objective_function: typing.Callable, n_workers: int):
    """Creates a DEHB instance.

    Args:
        configspace (ConfigurationSpace): Searchspace to use
        min_fidelity (float): Minimum fidelity for DEHB
        max_fidelity (float): Maximum fidelity for DEHB
        eta (int): Eta parameter of DEHB
        objective_function (Callable): Function to optimize

    Returns:
        _type_: _description_
    """
    dim = len(configspace.get_hyperparameters())
    return DEHB(f=objective_function, cs=configspace, dimensions=dim,
                min_fidelity=min_fidelity, output_path="./logs",
                max_fidelity=max_fidelity, eta=eta, n_workers=n_workers)

def objective_function(x: ConfigSpace.Configuration, fidelity: float=5, **kwargs):
    """Toy objective function.

    Args:
        x (ConfigSpace.Configuration): Configuration to evaluate
        fidelity (float): fidelity to evaluate x on

    Returns:
        dict: Result dictionary
    """
    time.sleep(2)
    y = np.random.uniform()
    cost = np.random.randint(1, 5)
    result = {
        "fitness": y,
        "cost": cost,
        "info": {
            "hello": "world",
        },
    }
    return result

if __name__ == "__main__":
    cs = create_toy_searchspace()
    opt = create_toy_optimizer(cs, min_fidelity=3, max_fidelity=27, eta=3, 
                               objective_function=objective_function, n_workers=2)

    traj, runtime, history = opt.run(total_cost=30, verbose=True)

Bronzila avatar May 14 '24 13:05 Bronzila