datatable
datatable copied to clipboard
Support user-defined aggregation and mapping functions
A feature which is often used in pandas is apply (or aggregate or transform) that basically allow to do a mapping, aggregation or even a partial reduction operation.
PySpark basically introduced a way to define user-defined operations for groupby's and select operations:
| function type | Operation | Input → Output | Pandas equivalent |
|---|---|---|---|
SCALAR |
Mapping | Series → Series | df.transform(...) |
GROUPED_MAP |
Group & Map | DataFrame → DataFrame | df.apply(...) |
GROUPED_AGG |
Reduce | Series → Scalar | df.aggregate(...) |
I would love to see something like this in Datatable. Maybe it would be possible to have a udf decorator such as:
@udf
def my_agg(x):
...
return ...
So, I've been thinking about how to best approach this problem, and I think we could combine decorators with type-annotations for better results. For example:
An unannotated function receives its argument x as a dt.Frame object, of whatever shape was generated by the DT[i,j] expression
@udf
def myfn1(x):
return ...
This is equivalent to
@udf
def myfn1(x: dt.Frame) -> dt.Frame:
return ...
However, if you specify that x is of a scalar type (one of bool, int, float, str), then the function will be invoked as many times as there are rows in the DT frame:
@udf
def plus_one(x: int):
return x + 1
DT[:, plus_one(f.x)]
In this case we will also verify that column x in DT is of integer type, and throw an error if it isn't.
Similarly, if you declare that argument x in the udf is a list (or typed list such as List[int], List[str], etc), then datatable will convert a column into a pythonic list before passing it into the function:
@udf
def argmin(x: List):
return min(range(len(x)), key=lambda i: x[i])
Your function may also work with a pandas DataFrame, or a Series, or a numpy array:
@udf
def myfn_using_pandas(x: pd.Series):
return ...
All of these user-defined functions can, of course, take multiple arguments, or even var-args:
@udf
def rowsum(*x: float, skipna=True):
if skipna:
return sum(v for v in x if x is not None)
elif any(v is None for v in x):
return None
else:
return sum(x)
Implementation
The @udf decorator will have to implement lazy evaluation: all arguments must be stored in an object, to be resolved and evaluated later with the context of DT[i,j] expression. Thus, it may look something like this:
class udf:
def __init__(self, fn):
self._fn = fn
# signature may contain types unknown to datatable, but in that case
# those arguments cannot be bound to dt expressions
self._sig = _resolve_signature(fn)
def __call__(self, *args, **kwds):
return bound_udf(self, args, kwds)
The DT[i,j] evaluator would then see a bound_udf object, replace the individual entries in args and kwds with the resolved expressions (according to the annotated types), and then call the stored function. If the annotation requests an argument of scalar type, the function will be called as many times as there are rows in the Frame. If the function expects a vector type (such as list, Frame, np.array, or pd.Series), the function will be executed as many times as there are groups formed by the by() clause (or once if there is no by).
Finally, the results of all these evaluations will be row-bound together into a single column or frame. Thus, a udf is allowed to return the same variety of arguments as it received: int, float, str, list, np.array, pd.DataFrame, or a dt.Frame.
Looks really good, I like especially the usage of type hints to avoid specifying the function type, like mapping or aggregation.