hamilton icon indicating copy to clipboard operation
hamilton copied to clipboard

hamilton + quokka support

Open HamiltonRepoMigrationBot opened this issue 2 years ago • 4 comments

Issue by skrawcz Thursday Jan 05, 2023 at 00:48 GMT Originally opened as https://github.com/stitchfix/hamilton/pull/269


Working on how to make Hamilton handle Quokka better.


skrawcz included the following code: https://github.com/stitchfix/hamilton/pull/269/commits

Comment by skrawcz Thursday Jan 05, 2023 at 07:25 GMT


Thoughts to think about:

  • quokka, like spark, relies on a central object to be manipulated so that it can perform query optimizations.
  • adding support for hamilton means that we need to under the hood manipulate this object correctly.
  • I believe with a default hamilton function we have enough information to know what to do, it's just a matter of figuring out how to traverse the DAG in a way to construct the right order of operations.

Comment by skrawcz Thursday Jan 05, 2023 at 07:46 GMT


Sketch of some code converting the hello world example

# Data Loading
# Filtering is part of data loading -- do we also expose columns like this?
@extract_columns(*["l_quantity", "l_extendedprice", "l_discount", "l_tax", "l_returnflag", "l_linestatus"])
def lineitem(qc: QuokkaContext, path: str,
             filter: str = "l_shipdate <= date '1998-12-01' - interval '90' day") -> DataStream:
    """Loads and filters data from the lineitem table"""
    ds: DataStream = qc.read_csv(path, sep="|", has_header=True)
    if filter:
        ds = ds.filter(filter)
    return ds


# transforms we want people to write
def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
    """Computes the discounted price"""
    return l_extendedprice * (1 - l_discount)


def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
    """Computes the charge"""
    return l_extendedprice * (1 - l_discount) * (1 + l_tax)


@groupby("l_returnflag", "l_linestatus", order_by=[...])
def grouped_lineitem(l_quantity: pl.Series, l_extendedprice: pl.Series,
                        disc_price: pl.Series, charge: pl.Series, l_discount: pl.Series,
                        l_returnflag: pl.Series, l_linestatus: pl.Series) -> GroupedDataStream:
    pass

# maybe more subtly
def grouped_lineitem(l_returnflag: pl.Series, l_linestatus: pl.Series, *, l_quantity: pl.Series, l_extendedprice: pl.Series,
                        disc_price: pl.Series, charge: pl.Series, l_discount: pl.Series,
                        ) -> GroupedDataStream:
    pass

Comment by skrawcz Saturday Jan 07, 2023 at 20:02 GMT


Parking a thought -- what about just enabling hamilton type functions instead of with_column?

Basically given a datastream, that's the input and the output is another datastream

def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
    """Computes the discounted price"""
    return l_extendedprice * (1 - l_discount)


def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
    """Computes the charge"""
    return l_extendedprice * (1 - l_discount) * (1 + l_tax)

def main(qc, path):
    temp_module = ad_hoc_utils.create_temporary_module(disc_price, charge)
    adapter = QuokkaGraphAdapter_V2(base.DictResult()) 

    lineitem = qc.read_csv(path, sep="|", has_header=True)
    d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
    
    dr = hamilton.Driver({}, temp_module, adapter=adapter)
    d = dr.execute(["disc_price", "charge"])
    f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag", "l_linestatus"]).agg(
        {
            "l_quantity": ["sum", "avg"],
            "l_extendedprice": ["sum", "avg"],
            "disc_price": "sum",
            "charge": "sum",
            "l_discount": "avg",
            "*": "count",
        }
    )
    return f.collect()

Comment by skrawcz Tuesday Feb 21, 2023 at 06:40 GMT


Tweaking the above slightly:

def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
    """Computes the discounted price"""
    return l_extendedprice * (1 - l_discount)


def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
    """Computes the charge"""
    return l_extendedprice * (1 - l_discount) * (1 + l_tax)

def main(qc, path):
    temp_module = ad_hoc_utils.create_temporary_module(disc_price, charge)
    adapter = QuokkaGraphAdapter_V2() 

    lineitem = qc.read_csv(path, sep="|", has_header=True)
    d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
    
    dr = hamilton.Driver({}, temp_module, adapter=adapter)
    d = dr.execute(["disc_price", "charge"], inputs={c: d for c in d.schema}) # default is to append columns to passed in dataframe
    f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag", "l_linestatus"]).agg(
        {
            "l_quantity": ["sum", "avg"],
            "l_extendedprice": ["sum", "avg"],
            "disc_price": "sum",
            "charge": "sum",
            "l_discount": "avg",
            "*": "count",
        }
    )
    return f.collect()

The QuokkaGraphAdapter_V2 adapter then intercepts and massages the internals appropriately to return a datastream with the extra columns.