NVTabular icon indicating copy to clipboard operation
NVTabular copied to clipboard

[BUG] TypeError when using Multi-GPU and workflow ops to datetime

Open liguo88 opened this issue 3 years ago • 2 comments

Describe the bug

TypeError: Improperly matched output dtypes detected in event_time_ts, int64 and datetime64[s] Screen Shot 2022-06-16 at 7 48 40 PM

Steps/Code to reproduce bug session_time = ( session_ts >> nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >> nvt.ops.Rename(name = 'event_time_dt') )

dataset = nvt.Dataset(df) workflow = nvt.Workflow(filtered_sessions) workflow.fit(dataset) sessions_gdf = workflow.transform(dataset).to_ddf()

sessions_gdf.head(2)

Expected behavior When I use 1 GPU, it works well. But when I choose to use 2 GPUs, this error shows up.

liguo88 avatar Jun 16 '22 23:06 liguo88

@liguo88 which NVT pipeline are you running? is it from Transformers4Rec repo examples? can you please provide a simple reproducible example with a small synthetic dataset you can generate?

rnyak avatar Jun 17 '22 13:06 rnyak

@rnyak Thank you for your response. Yes it is from Transfomers4Rec repo examples. It woks well for single GPU, but having previous issue for 2 GPUs. Fake Data: Screen Shot 2022-06-17 at 2 17 42 PM Here is an example:

import os
import cupy as cp
import numpy as np
import cudf
import nvtabular as nvt
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import dask_cudf
import transformers4rec

cluster = LocalCUDACluster() # by default use all GPUs in the node. I have two.
client = Client(cluster)

df = cudf.read_parquet('test-fake.parquet')

#NVT workflow
cat_feats = ['l1_brand'] >> nvt.ops.Categorify(start_index=1)
#create time features
session_ts = ['event_time_ts']

session_time = (
    session_ts >>
    nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >>
    nvt.ops.Rename(name = 'event_time_dt')
)

sessiontime_weekday = (
    session_time >>
    nvt.ops.LambdaOp(lambda col: col.dt.weekday) >>
    nvt.ops.Rename(name ='et_dayofweek')
)

def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2*np.pi*value_scaled)
    return value_sin

def get_cycled_feature_value_cos(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_cos = np.cos(2*np.pi*value_scaled)
    return value_cos

weekday_sin = sessiontime_weekday >> (lambda col: get_cycled_feature_value_sin(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_sin')
weekday_cos= sessiontime_weekday >> (lambda col: get_cycled_feature_value_cos(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_cos')

time_features = (
    session_time+
    sessiontime_weekday +
    weekday_sin +
    weekday_cos 
)
groupby_feats = ['user_id', 'event_time_ts'] + cat_feats + time_features
groupby_features = groupby_feats >> nvt.ops.Groupby(
    groupby_cols=["user_id"],
    sort_cols=["event_time_ts"],
    aggs={
        'l1_brand': ["list", "count"],
        'event_time_ts': ["first"],
        'event_time_dt': ["first"],
        'et_dayofweek_sin': ["list"],
        'et_dayofweek_cos': ["list"]
        },
    name_sep="-")

groupby_features_list = groupby_features[
        'l1_brand-list',
        'et_dayofweek_sin-list',
        'et_dayofweek_cos-list'
                                        ]
SESSIONS_MAX_LENGTH = 30
MINIMUM_SESSION_LENGTH = 3
groupby_features_trim = groupby_features_list >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) >> nvt.ops.Rename(postfix = '_seq')
# calculate session day index based on 'timestamp-first' column
day_index = ((groupby_features['event_time_dt-first'])  >>
    nvt.ops.LambdaOp(lambda col: (col - col.min()).dt.days +1) >>
    nvt.ops.Rename(f = lambda col: "day_index")
)

selected_features = groupby_features['user_id', 'l1_brand-count'] + groupby_features_trim + day_index
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["l1_brand-count"] >= MINIMUM_SESSION_LENGTH)

dataset = nvt.Dataset(df)

workflow = nvt.Workflow(filtered_sessions)
workflow.fit(dataset)
sessions_gdf = workflow.transform(dataset).to_ddf()

print(sessions_gdf.head(2))

liguo88 avatar Jun 17 '22 18:06 liguo88

@liguo88 can you pull the latest container merlin-pytorch:22.10 and tell us if the issue still occurs or not? thanks.

rnyak avatar Nov 09 '22 19:11 rnyak