joblib-spark
joblib-spark copied to clipboard
catboost & joblib spark
Hello, I am reaching out here in case someone can help me with setting this up and possibly spot anything that maybe obvious. I am trying to run catboost across multiple machines each with a single GPU. There are machines in total but it only seems to be running on the driver machine. If you have any thoughts it would be appreciated on how to get this to run on each machine in parallel.
from catboost import CatBoostRanker, Pool
import optuna
from optuna.trial import TrialState
from copy import deepcopy
import joblib
from joblibspark import register_spark
# disable mlflow autolog
import mlflow
mlflow.autolog(disable=True)
register_spark() # register Spark backend for Joblib
default_parameters = {
'loss_function': 'QueryRMSE',
'verbose': False,
'random_seed': 0,
'allow_writing_files': False,
'task_type':'GPU',
}
def objective(trial: optuna.Trial):
download_pool_if_not_avalible_locally('train_pool_1.pool', force_download=False)
download_pool_if_not_avalible_locally('val.pool', force_download=False)
train_pool = Pool('quantized://' + '/local_disk0/' + 'train_pool_1.pool')
val_pool = Pool('quantized://' +'/local_disk0/' + 'val.pool')
search_space = {
"learning_rate": trial.suggest_float("learning_rate", 0.10, 0.9),
"depth": 8,
'iterations': 700,
"l2_leaf_reg": trial.suggest_int("l2_leaf_reg", 1, 100),
"min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 10, 1000),
'random_strength': trial.suggest_float("random_strength", 1.0, 1.20),
'border_count': trial.suggest_categorical("border_count", [32,64,128,254])
}
bootstrap_type = trial.suggest_categorical("bootstrap_type", ['Bayesian','Bernoulli','Poisson'])
search_space['bootstrap_type'] = bootstrap_type
if bootstrap_type == 'Bayesian':
search_space['bagging_temperature'] = trial.suggest_int("bagging_temperature", 1, 100)
elif bootstrap_type == 'Poisson':
search_space['subsample'] = trial.suggest_float("subsample_poisson", 0.7,1.0)
elif bootstrap_type == 'MVS':
search_space['subsample'] = trial.suggest_float("subsample_mvs", 0.7,1.0)
elif bootstrap_type == 'Bernoulli':
search_space['subsample'] = trial.suggest_float("subsample_bernoulli", 0.7,1.0)
parameters = deepcopy(default_parameters)
parameters.update(search_space)
model = CatBoostRanker(**parameters)
model.fit(
train_pool,
eval_set=[val_pool],
verbose=0,
early_stopping_rounds=100,
)
train_metrics = model.best_score_['learn']
test_metrics = model.best_score_['validation']
Qrmse_train = train_metrics['QueryRMSE']
Qrmse_test = test_metrics['QueryRMSE']
return Qrmse_test
with joblib.parallel_backend("spark", n_jobs=-1):
study.optimize(objective, n_trials=100)
` ``