string_categorical_encoders icon indicating copy to clipboard operation
string_categorical_encoders copied to clipboard

Expose row parallelism for very large datasets

Open FedericoV opened this issue 5 years ago • 0 comments

In very large datasets, encoding the categorical variables often dominates the preprocessing time compared to scaling numerical columns. In that case, it's often faster to parallelize across rows rather than across columns.

As a trivial example, I have this code for a custom categorical encoder I wrote, which uses dask to do row wide parallelization:

def encode_categorical_columns(
    target_df,
    categorical_encoding,
    cat_columns=None,
    n_partitions=120,
    copy=False,
    use_dask=True,
):
    """
    Given an existing encoding scheme from calculate_categorical_encodings, encode a list
    of categorical columns into integers so they can be used in an embedding layer in a
    neural network.

    :param target_df: DataFrame that we want to encode with a given encoding
    :type target_df: pd.DataFrame
    :param categorical_encoding: Dictionary that specifies how to map categorical variables to integers
    :type categorical_encoding: Dict[Str, Dict[Str, Int]]
    :param cat_columns: A list containing the subset of categorical columns we want to encode.  If None,
        we encode every column present in the categorical_encoding.
    :type cat_columns: List[Str]
    :param n_partitions: The number of dask partitions to use to parallelize (across rows) the encoding
        step.  The default value works well for a dataset of around 40 gigs with a machine with 40 cores.
        Default: 120.
    :type n_partitions: Int
    :param copy: Whether to modify the data frame in place or to return a copy.  This is useful in a
        multi-threading context because otherwise we will get an error.
    :return: A dataframe with the categorical columns encoded
    :rtype: pd.DataFrame
    """
    if copy:
        target_df = target_df.copy()

    if cat_columns is None:
        # Categorical encodings is an ordered dict so column order should stay fine
        cat_columns = list(categorical_encoding.keys())

    if use_dask:
        dd_test_data_df = dd.from_pandas(
            target_df[cat_columns], npartitions=n_partitions
        )

    # I did some experiments, and row-based parallelism with dask ends up being much faster than
    # executing the encoding in parallel across columns with joblib.
    for col_name in cat_columns:
        col_encoding = categorical_encoding[col_name]

        # Annoyingly, we cannot use replace (which would be much faster) because it does not like
        # dictionaries with loops.  Very annoying.
        if use_dask:
            meta = pd.Series(index=target_df.index, name=col_name, dtype=np.int32)
            result = dd_test_data_df[col_name].map(col_encoding, meta=meta)
            target_df[col_name] = result.compute()
        else:
            target_df[col_name] = target_df[col_name].map(col_encoding)

    return target_df

FedericoV avatar Aug 08 '19 00:08 FedericoV