ipex-llm
ipex-llm copied to clipboard
Friesian FeatureTable missing OPs
Operations supported by NVTabular:
- [ ] Normalize (mean std, continuous columns)
- [x] NormalizeMinMax (min max method)
- [x] FillMedian (replaces missing values with the median value for the column)
- [x] ClipMax (may add to existing clip method)
- [ ] ColumnSimilarity (calculates the similarity between two columns using tf-idf, cosine or inner product as the distance metric)
- [x] DifferenceLag(partition_cols, shift, columns) (calculates the difference between two consecutive rows of the dataset)
- [x] Dropna (datafram dropna) (detects missing values, and filters out rows with null values)
- [x] Filter
- [x] JoinExternal (left & inner)
- [x] JoinGroupby (groups the data by the given categorical feature(s) and calculates the desired statistics of requested continuous features (along with the count of rows in each group))
- [x] LambdaOp (apply row level functions) e.g.,
def transform_python_udf(self, in_col, out_col, udf_func)
- [x] TargetEncoding
StatOperator
- [x] gen_string_idx with more options (e.g., no freq_limit, bucketing, etc.)
- [x] Median (quantile)(calculates median of features)
- [x] MinMax (calculates min and max statistics of features)
- [x] GroupbyStatistics ("count", "sum", "mean", "std", "var", "min", "max")(uses groupby aggregation to determine the unique groups of a categorical feature and calculates the desired statistics of requested continuous features)
- [ ] Moments (calculates some of the statistics of features including mean, variance, standarded deviation, and count)
TODO
- [x] Check the updates of NVTabular
From mlperf team: To avoid the all-to-all time for syncing embedding inputs, each worker needs to have the full data of a column (multiple columns) corresponding to its embedding(s). Basically, a collect operation to gather all the data of a column to a certain node.
Operations for RecSys
- [x] encode_string_list (encode strings for columns with lists of strings). E.g. encode each user name as integer:
names |
---|
[user1, user2, user3] |
[user1, user4] |
[user3, user5, user6] |
If possible, assign larger integer for string that is less frequently appear.
- [ ] rename for StringIndex Currently rename for StringIndex is directly inherited from Table, and it will throws an error since it does not change the col_name. It needs to be rewritten.
- [x] union two columns to generate string idx.
- [x] dtypes cast
- [x] select cols
- [x] constructor of Table from dict. E.g.
{'Quote': 0,'Retweet': 1,'TopLevel': 2} # BTW do we need to fix column names?
If the space of categories is fixed and known, it will be more efficient to directly pass a map for encoding, instead of using gen_string_idx.
- [x] LambdaOp with multiple columns. E.g. df = df.withColumn("out_column", col("in_column").cast(IntegerType()))
- [x] shape, i.e. len(df.columns)
- [ ] cache cast
- [ ] columns cast
- [ ] orderBy. Sort by specified columns.
Operations for Wechat Challenge
- [x] constructor of Table from csv file
- [x] write the dataframe of Table to a csv file
- [x] groupby and aggregation
animal age height cat 1 6 dog 8 30 cat 2 10 cat 5 12 dog 3 13 - we can group by the "animal" and aggregate "age" by summing and aggregate "height" by averaging.
- Recommended aggregation operation: sum, avg, min, max, median...
- [x] get the column names of the Table
# Example from the pyspark documentation # ''' # function: pyspark.sql.DataFrame.columns # return: all column names as a list. # ''' print(df.columns) >>> ['animal', 'age', 'height']
- [x] create a new column with constant value (add a new column and all of its values are assigned a constant value)
- [x] add constant value to column(s) (e.g. increase all of the values of a list of columns by 1)
- [x] concatenate a list of FeatureTables into one FeatureTable in the dimension of row
- [x] drop duplicate values int the Table with specified subset
seems pyspark cannot support drop_duplicates_with_last (which keeps the last duplicate in the table) because there is no notion of index in pyspark.
# Example from pyspark documentation
# '''
# function: pyspark.sql.DataFrame.drop_duplicates(subset=None)
# return: a new DataFrame with duplicate rows removed, optionally only considering certain columns.
# '''
df = sc.parallelize([
Row(name='Alice', age=5, height=80),
Row(name='Alice', age=5, height=80),
Row(name='Alice', age=10, height=80)]).toDF()
df.drop_duplicates().show()
>>> +-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
|Alice| 10| 80|
+-----+---+------+
df.dropDuplicates(['name', 'height']).show()
>>> +-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
+-----+---+------+
- [x] sample a fraction of rows from the Table randomly
# Example from pyspark documentation # ''' # function: pyspark.sql.DataFrame.sample(withReplacement=None, fraction=None, seed=None) # param withReplacement: bool; optional; Sample with replacement or not (default False). # param fraction: float; required; Fraction of rows to generate; range [0.0, 1.0]. # param seed: int; optional; Seed for sampling (default a random seed). # return: a sampled subset of this DataFrame. # ''' df.sample(fraction=0.5, withReplacement=True, seed=2021) >>> +------+---+------+ |animal|age|height| +------+---+------+ |cat | 1| 6| |dog | 3| 13| |cat | 1| 6| +------+---+------+
- [x] convert the Table into a dict (same syntax as pandas.dataframe.to_dict())
- [x] convert a specified column the Table into a list
e.g. Table as above "tbl.col_to_list("age")" should return [1, 8, 2, 5, 3]
- [x] rename the column of the Table
Dummy pipeline unsupported operations:
- [ ] interpolate fillna https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.interpolate.html
- [x] hash encode [similar to gen_string_idx with more options (e.g., no freq_limit, bucketing, etc.)]
- [x] concat https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.concat.html
- [x] cut bins https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.cut.html
- [x] filter by frequency (just to drop some data; no gen_string_index involved)
- [x] groupby (already listed above?)
Operations for Booking Challenge
- [x] read_csv: read csv file and convert into FeatureTable
- [x] union: append more date row-wisely
# Example for Feature table union method
# Find the union of two tables according to their columns' name
# :param tbl: feature table
# :return: Feature table
df1.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
+---+---+
df2.show()
>>> +---+---+
| y | x |
+---+---+
| c | 3 |
| d | 4 |
+---+---+
df3 = df1.union(df2)
df3.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
| 3 | c |
| 4 | d |
+---+---+
- [x] append_columns: append a new columns with constant value
# Append the columns with value to table
# :param col: the name of the col
# :param value: value to be append
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
+---+---+
df.append_columns("z",0)
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | 0 |
| 2 | b | 0 |
+---+---+---+
"""
Operations for Booking Challenge
- [x] change the value of a cell based on the index(similar to iloc in pandas)
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
+---+---+
df = df.iloc(x,0,d)
df.show()
>>> +---+---+
| x | y |
+---+---+
| d | a |
| 2 | b |
+---+---+
df = df.iloc(x,[0,1],d)
df.show()
>>> +---+---+
| x | y |
+---+---+
| d | a |
| d | b |
+---+---+
Operations for Booking Challenge
- [x] write_csv: convert the FeatureTable to csv file
- [x] factorise: factorise the given column and convert to output column This is the same as category_encode
#Factorise the given column and convert to output column
#:param in_col: input columns
#:param out_col: output columns
df1.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
| 3 | a |
| 4 | a |
| 5 | c |
+---+---+
df2 = df1.factorise("y","z")
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | 0 |
| 2 | b | 1 |
| 3 | a | 0 |
| 4 | a | 0 |
| 5 | c | 2 |
+---+---+---+
Operations for Booking Challenge
- [x] sort the value of a table with given column
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
| 1 | c |
+---+---+
df = df.sort("x")
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 1 | c |
| 2 | b |
+---+---+
df = df.iloc("x",False)
df.show()
>>> +---+---+
| x | y |
+---+---+
| 2 | b |
| 1 | a |
| 1 | c |
+---+---+
df = df.iloc(["x","y"],[True,False])
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | c |
| 1 | a |
| 2 | b |
+---+---+
- [x] append the column with given list of index and value pair
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
| 1 | c |
+---+---+
df = df.append_list("z", [(0, 1), (1, 2), (2, 3)])
df.show()
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | 1 |
| 1 | c | 2 |
| 2 | b | 3 |
+---+---+---+
df = df.append_list("h", [(0, 1), (2,3)])
df.show()
>>> +---+---+---+---+
| x | y | z | h |
+---+---+---+---+
| 1 | a | 1 | 1 |
| 1 | c | 2 | null |
| 2 | b | 3 | 3 |
+---+---+---+---+
Operations for Booking Challenge
- [x] shift the column with given value
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
| 1 | c |
+---+---+
df = df.shift("x","z")
df.show()
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | null |
| 1 | c | 1 |
| 2 | b | 2 |
+---+---+---+
df = df.shift("x","z",2)
df.show()
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | null |
| 1 | c | null |
| 2 | b | 1 |
+---+---+---+
df = df.shift("x","z",2,-1)
df.show()
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | -1 |
| 1 | c | -1 |
| 2 | b | 1 |
+---+---+---+
Also operations to handle timestamp, including:
- [ ] f.from_unixtime
- [ ] f.hour
- [ ] f.minute
- [ ] f.second
Also operations to handle timestamp, including:
- [ ] f.from_unixtime
- [ ] f.hour
- [ ] f.minute
- [ ] f.second
will do tmr
Also operations to connect Pandas Dataframe, sort
- [x] .to_pandas()
- [x] .from_pandas()
- [x] .sort() and order_by
Need persist table to avoid iterative computation.
- [x] cache()
- [x] uncache()