joblib-spark icon indicating copy to clipboard operation
joblib-spark copied to clipboard

catboost & joblib spark

Open timpiperseek opened this issue 5 months ago • 0 comments

Hello, I am reaching out here in case someone can help me with setting this up and possibly spot anything that maybe obvious. I am trying to run catboost across multiple machines each with a single GPU. There are machines in total but it only seems to be running on the driver machine. If you have any thoughts it would be appreciated on how to get this to run on each machine in parallel.

from catboost import CatBoostRanker, Pool
import optuna
from optuna.trial import TrialState
from copy import deepcopy

import joblib
from joblibspark import register_spark

# disable mlflow autolog 
import mlflow
mlflow.autolog(disable=True)

register_spark() # register Spark backend for Joblib

default_parameters = {
'loss_function': 'QueryRMSE',
'verbose': False,
'random_seed': 0,
'allow_writing_files': False,
'task_type':'GPU',
}


def objective(trial: optuna.Trial):
    download_pool_if_not_avalible_locally('train_pool_1.pool', force_download=False)
    download_pool_if_not_avalible_locally('val.pool', force_download=False)

    train_pool = Pool('quantized://' + '/local_disk0/' + 'train_pool_1.pool')
    val_pool = Pool('quantized://' +'/local_disk0/' + 'val.pool')

    search_space = {
        "learning_rate": trial.suggest_float("learning_rate", 0.10, 0.9), 
        "depth": 8,
        'iterations': 700,
        "l2_leaf_reg": trial.suggest_int("l2_leaf_reg", 1, 100),
        "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 10, 1000),
        'random_strength': trial.suggest_float("random_strength", 1.0, 1.20),
        'border_count': trial.suggest_categorical("border_count", [32,64,128,254])
    }
    bootstrap_type = trial.suggest_categorical("bootstrap_type", ['Bayesian','Bernoulli','Poisson']) 
    search_space['bootstrap_type'] = bootstrap_type
    if bootstrap_type == 'Bayesian':
      search_space['bagging_temperature'] = trial.suggest_int("bagging_temperature", 1, 100)
    elif bootstrap_type == 'Poisson':
      search_space['subsample'] =  trial.suggest_float("subsample_poisson", 0.7,1.0)

    elif bootstrap_type == 'MVS':
      search_space['subsample'] =  trial.suggest_float("subsample_mvs", 0.7,1.0)

    elif bootstrap_type == 'Bernoulli':
      search_space['subsample'] =  trial.suggest_float("subsample_bernoulli", 0.7,1.0)

    parameters = deepcopy(default_parameters)
    parameters.update(search_space)

    
    model = CatBoostRanker(**parameters)
    model.fit(
        train_pool,
        eval_set=[val_pool],
        verbose=0,
        early_stopping_rounds=100,
    )

    train_metrics = model.best_score_['learn']
    test_metrics = model.best_score_['validation']
    Qrmse_train = train_metrics['QueryRMSE']
    Qrmse_test = test_metrics['QueryRMSE']

    return Qrmse_test


with joblib.parallel_backend("spark", n_jobs=-1):
    study.optimize(objective, n_trials=100)


` ``

timpiperseek avatar Sep 10 '24 08:09 timpiperseek