xgboost icon indicating copy to clipboard operation
xgboost copied to clipboard

Dask-XGBoost non-deterministic results

Open dawilliams-nvidia opened this issue 3 years ago • 9 comments

I have been trying to train XGBoost on multi-gpu via Dask. It is required that the training pipeline be made deterministic (for example for comparing Hyperparameter runs), exactly the same results from loading the same data and training on the same number/type of GPUs. So far, with the following code I can generate reproducible results on 1 or 2 GPUs by manually assigning dask partitions to workers. However, going to 3 or more GPUs results in non-determinism.

GPU: V100-32GB GPUs, DGX server Container: rapidsai/rapidsai:22.04-cuda11.0-runtime-ubuntu20.04-py3.8 Data: Higgs dataset

$> wget https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz
$> gzip -d HIGGS.csv.gz

Code:

import os
os.environ["DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING"]="False" # needs to be added to dask-scheduler

from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
from dask import delayed
import dask_cudf
import cudf
import distributed
import xgboost as xgb
import time
from dask.utils import stringify

from sklearn.metrics import mean_absolute_error

cluster = LocalCUDACluster()
client = Client(cluster)

def prepare_data(gpu_dataframes):
    """
    Convert a list of arrow tables to a single GPU dataframe
    
    Returns
    -------
    GPU Dataframe
    """
    import cudf
    concat_df = cudf.concat(gpu_dataframes)
    del gpu_dataframes
    return concat_df


def reproducible_persist_per_worker(df, client):
    ### Assume equal partitions (please edit accrodingly if non equal partitions)
    workers = list(client.has_what().keys())
    num_workers = len(workers)
    parts_per_worker = (df.npartitions+num_workers-1)//num_workers
    
    start_p = 0
    persist_ls = []
    for w in workers:
        end_p = min(start_p+parts_per_worker, df.npartitions)
        subset_delayed = delayed(prepare_data)(df.partitions[start_p:end_p].to_delayed())
        persist_ls.append(client.persist(subset_delayed, workers=w, allow_other_workers=False))
        del subset_delayed
        start_p = end_p
        
    _ = wait(persist_ls)
    del _ ; 
    
    persist_df = dask_cudf.from_delayed(persist_ls).persist()
    _ = wait(persist_df);
    del _
    return persist_df

fname = 'HIGGS.csv'
colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
df = dask_cudf.read_csv(fname, header=None, names=colnames)
persisted_df = reproducible_persist_per_worker(df,client)

dmatrix = xgb.dask.DaskDeviceQuantileDMatrix(client=client,
                               data=persisted_df[persisted_df.columns.difference(['label'])],
                               label=persisted_df['label'])

model = xgb.dask.train(client,
                       {'verbosity': 2,
                        'tree_method': 'gpu_hist',
                       'seed': 123},
                       dtrain=dmatrix,
                       num_boost_round=3000,
                       evals=[(dmatrix,'dtrain')])

print("Final train loss: ",model['history']['dtrain']['rmse'][-1])

y_pred = xgb.dask.predict(client, model, persisted_df[persisted_df.columns.difference(['label'])]).to_frame().compute()
y_pred = y_pred.rename({0:'score'},axis=1)

After running the above, you can see the final training loss. The first time simply save the scoring results to parquet:

y_pred.to_parquet("last_run.parquet")

Then rerun the main code. Compare the training losses, and use the following code to read the last_run.parquet results and compare the average difference (which if deterministic should be 0.0)

last_pred = cudf.read_parquet("last_run.parquet")
sub=(y_pred-last_pred).abs()
print('Average percentage difference: ', sub.mean()/y_pred.mean())

dawilliams-nvidia avatar May 20 '22 20:05 dawilliams-nvidia

This issue is not exclusive to training using GPUs. While the example above uses cudf and GPUs to train, I have been able to reproduce this issue when training without GPUs.

Robert-Christensen-visa avatar May 20 '22 21:05 Robert-Christensen-visa

I reproduced it with 2 workers. The data partitioning is not deterministic.

trivialfis avatar May 23 '22 03:05 trivialfis

I'm afraid xgboost-dask cannot be fully deterministic until the dask data partitioning is deterministic.

trivialfis avatar May 23 '22 03:05 trivialfis

@trivialfis The example provided by @dawilliams-nvidia attempts to make the data partitioning deterministic. When the file is read, the first part of the data is placed on worker 0, the next part of worker 1, ect. Work stealing is also disabled, so the data will remain on the worker it is assigned.

I'm afraid xgboost-dask cannot be fully deterministic until the dask data partitioning is deterministic.

This is true, but it is not sufficient for xgboost-dask to be fully deterministic. This issue is identifying an additional requirement for xgboost-dask to be fully deterministic.

When I have tried to replicate the issue above, if data partitioning is not deterministic you will have issues with reproducibility with 2 workers. If data partitioning is deterministic, you will only start seeing issue if the number of workers is 3 or more.

Robert-Christensen-visa avatar May 23 '22 14:05 Robert-Christensen-visa

@Robert-Christensen-visa Thank you for the explanation. I tested the sample and saved the data for each worker, then compared the sha256sum of the saved data for each run. The data partitioning is still not deterministic in the example.

trivialfis avatar May 23 '22 15:05 trivialfis

@trivialfis Would you share the code that you used to compare the sha256sum?

mmccarty avatar Jun 03 '22 14:06 mmccarty

Possibly related ( https://github.com/dask/dask/issues/9135 )

jakirkham avatar Jun 03 '22 21:06 jakirkham

@mmccarty I did a quick hack inside XGBoost to save the binary before constructing DMatrix (along with DMatrix itself). https://github.com/dmlc/xgboost/blob/1ced6381653eed0f259cf11df8ef599274d84174/python-package/xgboost/dask.py#L810

trivialfis avatar Jun 06 '22 03:06 trivialfis

So poked around at the code above and cleaned up a bit. Here's the notebook. Rewrote the reproducible_persist_per_worker function.

Looking at the data as loaded by Dask it appears the same data is in each worker to start before going to XGBoost. Checked this by writing out the data after persisting and comparing that over several runs (restarting the cluster and reloading the data each time). The data in each part was identical. Compared this by loading each part file with PyArrow and comparing their contents.

That said, still see different results from XGBoost's prediction. Given this am wondering if there might be something in the Dask code in XGBoost that merits further exploration.

As a side note checksumming the parquet files themselves appears to show differences even when writing the exact same data out. Not as familiar with the Parquet format as others, but my guess (having seen similar issues with other formats like tar) is there are metadata differences (maybe timestamps?) that vary. So not confident checksumming is a reliable strategy for validating files are the same. Would suggest sticking to content comparison over using checksums.

jakirkham avatar Jun 16 '22 04:06 jakirkham