mango
mango copied to clipboard
"OpenBLAS warning: precompiled NUM_THREADS exceeded" and "resource_tracker: There appear to be ## leaked semaphore objects to clean up at shutdown"
After 1-2 iterations, the program is terminated with the following warnings:
OpenBLAS warning: precompiled NUM_THREADS exceeded, adding auxiliary array for thread metadata.
To avoid this warning, please rebuild your copy of OpenBLAS with a larger NUM_THREADS setting
or set the environment variable OPENBLAS_NUM_THREADS to 64 or lower double free or corruption (out)
/home/kemove/.pyenv/versions/3.12.2/lib/python3.12/multiprocessing/resource_tracker.py:254: UserWarning: resource_tracker: There appear to be 96 leaked semaphore objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
I understand that I could perhaps set OPENBLAS_NUM_THREADS=1 before the program starts to sidestep the issue as some google results indicated, but it also slows down the entire process.
Here's my code:
@scheduler.custom(n_jobs=n_jobs)
def objective(params_batch):
jobs = []
t1 = time.time()
nworker = num_workers(False)
client.set_metadata(["workload_info", "total"], len(params_batch))
client.set_metadata(["workload_info", "workers"], nworker)
client.set_metadata(["workload_info", "finished"], 0)
for i, params in enumerate(params_batch):
new_df = df
hpid, _ = get_hpid(params)
priority = 1
if model:
priority = (
priority + 1 if model.trainable_on_cpu(**params) else priority
)
if "topk_covar" in params:
if "covar_dist" in params:
new_df = select_randk_covars(
df, ranked_features, params["covar_dist"], params["topk_covar"]
)
else:
new_df = select_topk_features(
df, ranked_features, params["topk_covar"]
)
future = client.submit(
validate_hyperparams,
args,
new_df,
covar_set_id,
hps_id,
params,
resources={"POWER": power_demand(args, params)},
retries=1,
locks=locks,
key=f"{validate_hyperparams.__name__}-{hpid}",
priority=priority,
)
future.add_done_callback(hps_task_callback)
jobs.append(future)
if i < nworker:
interval = random.randint(1000, 5000) / 1000.0
time.sleep(interval)
results = client.gather(jobs, errors="skip")
elapsed = round(time.time() - t1, 3)
# logger.info("gathered results type %s, len: %s", type(results), len(results))
# logger.info("gathered results: %s", results)
results = [(p, l) for p, l in results if p is not None and l is not None]
if len(results) == 0:
logger.warning(
"Results not available at this iteration. Elapsed: %s", elapsed
)
return [], []
params, loss = zip(*results)
params = list(params)
loss = list(loss)
logger.info("Elapsed: %s, Successful results: %s", elapsed, len(results))
# restart client here to free up memory
if args.restart_workers:
client.restart()
return params, loss
warmstart_tuples = None
if resume:
warmstart_tuples = preload_warmstart_tuples(
args.model,
args.symbol,
covar_set_id,
hps_id,
args.batch_size * iteration * 2,
len(ranked_features),
)
if warmstart_tuples is not None:
logger.info(
"preloaded %s historical searched hyper-params for warm-start.",
len(warmstart_tuples),
)
else:
logger.info("no available historical data for warm-start")
tuner = Tuner(
space,
objective,
dict(
initial_random=n_jobs,
batch_size=n_jobs,
num_iteration=iteration,
initial_custom=warmstart_tuples,
domain_size=domain_size,
),
)
results = tuner.minimize()