hamilton + quokka support
Issue by skrawcz
Thursday Jan 05, 2023 at 00:48 GMT
Originally opened as https://github.com/stitchfix/hamilton/pull/269
Working on how to make Hamilton handle Quokka better.
skrawcz included the following code: https://github.com/stitchfix/hamilton/pull/269/commits
Comment by skrawcz
Thursday Jan 05, 2023 at 07:25 GMT
Thoughts to think about:
- quokka, like spark, relies on a central object to be manipulated so that it can perform query optimizations.
- adding support for hamilton means that we need to under the hood manipulate this object correctly.
- I believe with a default hamilton function we have enough information to know what to do, it's just a matter of figuring out how to traverse the DAG in a way to construct the right order of operations.
Comment by skrawcz
Thursday Jan 05, 2023 at 07:46 GMT
Sketch of some code converting the hello world example
# Data Loading
# Filtering is part of data loading -- do we also expose columns like this?
@extract_columns(*["l_quantity", "l_extendedprice", "l_discount", "l_tax", "l_returnflag", "l_linestatus"])
def lineitem(qc: QuokkaContext, path: str,
filter: str = "l_shipdate <= date '1998-12-01' - interval '90' day") -> DataStream:
"""Loads and filters data from the lineitem table"""
ds: DataStream = qc.read_csv(path, sep="|", has_header=True)
if filter:
ds = ds.filter(filter)
return ds
# transforms we want people to write
def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
"""Computes the discounted price"""
return l_extendedprice * (1 - l_discount)
def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
"""Computes the charge"""
return l_extendedprice * (1 - l_discount) * (1 + l_tax)
@groupby("l_returnflag", "l_linestatus", order_by=[...])
def grouped_lineitem(l_quantity: pl.Series, l_extendedprice: pl.Series,
disc_price: pl.Series, charge: pl.Series, l_discount: pl.Series,
l_returnflag: pl.Series, l_linestatus: pl.Series) -> GroupedDataStream:
pass
# maybe more subtly
def grouped_lineitem(l_returnflag: pl.Series, l_linestatus: pl.Series, *, l_quantity: pl.Series, l_extendedprice: pl.Series,
disc_price: pl.Series, charge: pl.Series, l_discount: pl.Series,
) -> GroupedDataStream:
pass
Comment by skrawcz
Saturday Jan 07, 2023 at 20:02 GMT
Parking a thought -- what about just enabling hamilton type functions instead of with_column?
Basically given a datastream, that's the input and the output is another datastream
def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
"""Computes the discounted price"""
return l_extendedprice * (1 - l_discount)
def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
"""Computes the charge"""
return l_extendedprice * (1 - l_discount) * (1 + l_tax)
def main(qc, path):
temp_module = ad_hoc_utils.create_temporary_module(disc_price, charge)
adapter = QuokkaGraphAdapter_V2(base.DictResult())
lineitem = qc.read_csv(path, sep="|", has_header=True)
d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
dr = hamilton.Driver({}, temp_module, adapter=adapter)
d = dr.execute(["disc_price", "charge"])
f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag", "l_linestatus"]).agg(
{
"l_quantity": ["sum", "avg"],
"l_extendedprice": ["sum", "avg"],
"disc_price": "sum",
"charge": "sum",
"l_discount": "avg",
"*": "count",
}
)
return f.collect()
Comment by skrawcz
Tuesday Feb 21, 2023 at 06:40 GMT
Tweaking the above slightly:
def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
"""Computes the discounted price"""
return l_extendedprice * (1 - l_discount)
def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
"""Computes the charge"""
return l_extendedprice * (1 - l_discount) * (1 + l_tax)
def main(qc, path):
temp_module = ad_hoc_utils.create_temporary_module(disc_price, charge)
adapter = QuokkaGraphAdapter_V2()
lineitem = qc.read_csv(path, sep="|", has_header=True)
d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
dr = hamilton.Driver({}, temp_module, adapter=adapter)
d = dr.execute(["disc_price", "charge"], inputs={c: d for c in d.schema}) # default is to append columns to passed in dataframe
f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag", "l_linestatus"]).agg(
{
"l_quantity": ["sum", "avg"],
"l_extendedprice": ["sum", "avg"],
"disc_price": "sum",
"charge": "sum",
"l_discount": "avg",
"*": "count",
}
)
return f.collect()
The QuokkaGraphAdapter_V2 adapter then intercepts and massages the internals appropriately to return a datastream with the extra columns.