hamilton icon indicating copy to clipboard operation
hamilton copied to clipboard

Parallelizable cannot aggregate or return multiple Collects

Open JamesArruda opened this issue 1 year ago • 3 comments

Current behavior

A DAG with one Parallelizable and two Collect statements cannot return results from both Collect nodes.

Stack Traces

KeyError: 'Key metric_2 not found in cache'

Steps to replicate behavior

import logging

from hamilton import driver
from hamilton.execution.executors import SynchronousLocalTaskExecutor
from hamilton.htypes import Collect, Parallelizable
import pandas as pd


ANALYSIS_OB = tuple[tuple[str,...], pd.DataFrame]
ANALYSIS_RES = dict[str, str | float]


def split_by_cols(full_data: pd.DataFrame, columns: list[str]) -> Parallelizable[ANALYSIS_OB]:
    for idx, grp in full_data.groupby(columns):
        yield (idx, grp)


def sub_metric_1(split_by_cols: ANALYSIS_OB, number: float=1.0) -> ANALYSIS_RES:
    idx, grp = split_by_cols
    return {"key": idx, "mean": grp["spend"].mean() + number}


def sub_metric_2(split_by_cols: ANALYSIS_OB) -> ANALYSIS_RES:
    idx, grp = split_by_cols
    return {"key": idx, "mean": grp["signups"].mean()}


def metric_1(sub_metric_1: Collect[ANALYSIS_RES], columns: list[str]) -> pd.DataFrame:
    data = [[k for k in d["key"]] + [d["mean"], "spend"] for d in sub_metric_1]
    cols = list(columns) + ["mean", "metric"]
    return pd.DataFrame(data, columns=cols)


def metric_2(sub_metric_2: Collect[ANALYSIS_RES], columns: list[str]) -> pd.DataFrame:
    data = [[k for k in d["key"]] + [d["mean"], "signups"] for d in sub_metric_2]
    cols = list(columns) + ["mean", "metric"]
    return pd.DataFrame(data, columns=cols)


def all_agg(metric_1: pd.DataFrame, metric_2: pd.DataFrame) -> pd.DataFrame:
    return pd.concat([metric_1, metric_2])


if __name__ == "__main__":
    from hamilton.execution import executors
    import __main__

    from hamilton.log_setup import setup_logging
    setup_logging(log_level=logging.DEBUG)

    local_executor = executors.SynchronousLocalTaskExecutor()

    dr = (
        driver.Builder()
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_modules(__main__)
        # .with_remote_executor(remote_executor)
        .with_local_executor(local_executor)
        .build()
    )
    df = pd.DataFrame(
        index=pd.date_range('20230101', '20230110'),
        data={  
            "signups": [1, 10, 50, 100, 200, 400, 700, 800, 1000, 1300],
            "spend": [10, 10, 20, 40, 40, 50, 100, 80, 90, 120],
            "region": ["A", "B", "C", "A", "B", "C", "A", "B", "C", "X"],
        }
    )
    ans = dr.execute(
        ["all_agg"],
        inputs={
            "full_data": df,
            "number": 3.1,
            "columns": ["region"],
        }
    )
    print(ans["all_agg"])

Library & System Information

Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:40:50) [MSC v.1937 64 bit (AMD64)] on win32

>>> hamilton.__version__
(1, 49, 2).

Expected behavior

I would expect to be able to retrieve any collections done. I can request one at a time for metric_1 and metric_2 and have it succeed.

Thank you in advance for your help!

JamesArruda avatar Mar 05 '24 20:03 JamesArruda

Hey! Thanks, this is a known limitation (see point 5 here) -- https://github.com/DAGWorks-Inc/hamilton/issues/301. That said, there's an easy workaround -- you can group them before running Collect. E.G.

def all_metrics(sub_metric_1: ANALYSIS_RES, sub_metric_2: ANALYSIS_RES) -> ANALYSIS_RES:
    return ... # join the two dicts in whatever way you want

def all_agg(all_metrics: Collect[ANALYSIS_RES]) -> pd.DataFrame:
    return ... # join them all into a dataframe

While its not ideal, it should just be adding one extra function! Note that this works in the above case where they're operating over the same partitions. In the case that they aren't, you'll want two separate parallelizations.

Going to reference this issue from the other one -- this is a good one to keep around/I can spend some time scoping out a fix.

elijahbenizzy avatar Mar 05 '24 20:03 elijahbenizzy

Hey! Thanks, this is a known limitation (see point 5 here) -- #301. That said, there's an easy workaround -- you can group them before running Collect. E.G.

def all_metrics(sub_metric_1: ANALYSIS_RES, sub_metric_2: ANALYSIS_RES) -> ANALYSIS_RES:
    return ... # join the two dicts in whatever way you want

def all_agg(all_metrics: Collect[ANALYSIS_RES]) -> pd.DataFrame:
    return ... # join them all into a dataframe

While its not ideal, it should just be adding one extra function! Note that this works in the above case where they're operating over the same partitions. In the case that they aren't, you'll want two separate parallelizations.

Going to reference this issue from the other one -- this is a good one to keep around/I can spend some time scoping out a fix.

and if you want the flexibility to only compute one of them, you can utilize @config.when (docs) and have a few variants of the functions and pass in the correct configuration at driver build time to shape the DAG accordingly.

skrawcz avatar Mar 05 '24 21:03 skrawcz

@elijahbenizzy, @skrawcz Thank you both for the help!

This is where I'm taking the workaround, and in case it's useful for you or anyone else looking at the ticket, this is successful:

@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with= lambda metric_names: inject(sub_metrics=group(*[source(x) for x in metric_names])),
)
def all_metrics(sub_metrics: list[ANALYSIS_RES], columns: list[str]) -> pd.DataFrame:
    frames = []
    for a in sub_metrics:
        frames.append(_to_frame(a, columns))
    return pd.concat(frames)

Don't forget:

from hamilton import settings
_config = {settings.ENABLE_POWER_USER_MODE:True}
_config["metric_names"] = ["sub_metric_1", "sub_metric_2"]

# Then in the driver building:
.with_config(_config)

JamesArruda avatar Mar 06 '24 14:03 JamesArruda

Closing this issue for now.

skrawcz avatar Jul 18 '24 20:07 skrawcz