hamilton icon indicating copy to clipboard operation
hamilton copied to clipboard

Reusable subDAG components

Open elijahbenizzy opened this issue 3 years ago • 1 comments

Is your feature request related to a problem? Please describe. Reusing functions, helpers, etc... are all nice and good. However, sometimes you want to be able to reuse large subcomponents in the DAG.

Describe the solution you'd like

Two idea:

  1. Use the Driver to stich bigger DAGs together
  2. Two decorator (still need to fully think this through)

This uses prefixes, but some actual namespace notion could be nice here.

@hamilton.subdag_split(
    inputs={
        'subdag_1' : {'source' : 'source_for_dag_1'}, # subdag 1 gets its source from a different place than subdag 1
        'subdag_2 : {'source' : 'source_for_dag_2'}}} 
def data(source: str) -> pd.DataFrame:
    return _load_data(source)

def foo(data) -> pd.Series:
    return _do_something(data)

@hamilton.subdag_join(subdag='subdag_1')
def process_subdag_1(foo) -> pd.Series:
    return _process_some_way(foo)

The framework would then compile this in a fancy way -- ensuring that every node between the splits and the joins is turned into one for each subdag, under a separate namespace. TBD on how to access it.

Describe alternatives you've considered Not allowing this -- I don't have a concrete use-case blocking for anyone but we have observed one at Stitch Fix.

Additional context Thinking ahead here.

elijahbenizzy avatar Mar 12 '22 00:03 elijahbenizzy

OK, starting to think about this one. This is not something we want yet, but it is something we should plan for. Going to propose the following spec, I think this is much cleaner. See code example:

# marketing_module.py
def signups(raw_signup_data: pd.Series, business_line: str) -> pd.Series:
    ...

def marketing_spend(all_marketing_data: pd.Series, business_line: str) -> pd.Series:
    ...

def acquisition_cost(signups: pd.Series, marketing_spend: pd.Series) -> pd.Series:
    ...

def acquisition_cost_smoothed() -> pd.Series:
    ...

# dataflow.py
import marketing_module
@ubernode( # For lack of a better name...
    load_from=marketing_module, # modules or list of functions, cause why not?
    inject={'business_line' : value('womens')}, #this parameter gets injected in
    namespace='womens', # all nodes are named `womens.<node>`
    extract={'acquisition_cost' : 'acquisition_cost_womens', 'acquisition_cost_smoothed' : 'acquisition_cost_smoothed_womens'}
)
# Not sure what this function should look like. Idea would be to error out if you don't include all sources in the subDAG, then print out the signature for them to copy/paste. 
def acquisition_data_womens(
    raw_signup_data: pd.Series, 
    all_marketing_data: pd.Series) -> MultiNodeOutput(acquisition_cost_smoothed_womens=pd.Series, acquisition_cost_womens=pd.Series):
    pass

Motivation

Hamilton is a new paradigm for writing dataflow code. Nodes in a dataflow map 1:1 with functions, exactly. As opposed to a system in which some lower-level DSL (E.G. python) is used to construct static dataflows, we specifically trade off verbosity (of which Hamilton has plenty) for readability, debuggability, ease of development, etc.. We consider this to be strictly good, but it does not work for all cases, namely that of repetition and configuration (corresponding to for and if/else in a more traditional paradigm). Thus we've introduced a few decorators (parameterize and config.when) to allow for greater flexibility while preserving the benefits of Hamilton. These tend to break down, however, when one wants to apply for-loops over higher-level constructs in the DAG. For example, one might have a set of metrics that they wish to run on both the mens and womens business line. There are a few currently feasible approaches, each of which has its trade-offs:

  1. Run separate drivers: works if materialization should be kept separate and these datums aren't joined as part of the dataflow.
  2. Parameterize everything: works if the subset of computation is small enough and easy enough to handle to add a bunch of parameterizations.
  3. Run a node that instantiates a driver and runs that (yikes)

However, none of these solve the problem of actually repeating in the same DAG. This proposal does that.

Terminology

namespace A way to differentiate two nodes with the same name. E.G. with node baz and namespaces foo and bar, we have nodes foo.bar and foo.baz. This exists solely in conjunction with new subdags. subdag A subset of the DAG that is introduced by parameterizing something.

API

Calling the decorater uberdag, although we need a better name. It takes in the following:

  1. A list of functions or a module (load_from above)
  2. Parameters/config to inject (inject above). No reason it can't be the same as parameterized
  3. a namespace (namespace above/how to refer to all the intermediate nodes)
  4. Output mapping (extract_outputs) above, referring to the nodes from the subdag the user wishes to extract, and how to name them.
  5. Maybe a config if we're feeling fancy -- this allows us to configure the subDAG.

Then we have the function we decorate. Note that this is actually just gravy (we don't need it with the decorator), but let's make it show something to have it be readable:

  1. Input parameters correspond to all non-injected inputs/types from the subDAG.
  2. Output type is a MultiNodeOutput or something like that. Basically a typeddict with all the outputs.

This is a pain, so we have an error message when its not correct that prints the correct values.

Implementation

So this is easy enough. All we do is:

  1. If module is passed in, resolve that to functions
  2. if we already did (1) or functions are passed in, resolve the functions to nodes, using our config
  3. change those nodes to have the namespaced name so they can coexist
  4. Add identity nodes to ensure outputs have the right name
  5. Validate, suggest function signature
  6. Return all created nodes.

Might see how I feel and prototype this soon -- it'll be <100 LOC. And solves this problem in an extremely comprehensive, powerful, and readable way (I think).

elijahbenizzy avatar Sep 18 '22 04:09 elijahbenizzy

Alright -- prototype is here: https://github.com/stitchfix/hamilton/pull/199. I want to get this out soon for power users. Need to rethink parts of the API.

elijahbenizzy avatar Oct 29 '22 17:10 elijahbenizzy

Leaving some API thoughts here:

  1. One could just use Hamilton within hamilton.
@extract_columns(*["video_title", "title_length", "title_word_count"])
def load_df(path: str) -> pd.DataFrame:
    _df = pd.read_csv(path)
    dr = driver.Driver(_df.to_dict(orient="series"), inner_transforms)
    df = dr.execute(["video_title", "title_length", "title_word_count"])
    return df
  1. We could also constrain a decorator that effectively does the above two lines of code ^

@extract_columns(*["video_title", "title_length", "title_word_count"])
@inner_execute(inner_transforms, outputs=["video_title", "title_length", "title_word_count"])
def load_df(path: str) -> pd.DataFrame:
    _df = pd.read_csv(path)
    return _df

With (1) it would be very opaque. With (2) we would have some visibility into creating a single DAG -- with a best guess of determining inputs/no static guarantee with "inner" DAG (it would be a runtime check). Would require some internal work though.

skrawcz avatar Jan 29 '23 07:01 skrawcz

@elijahbenizzy can you put the current API in here too?

skrawcz avatar Jan 29 '23 07:01 skrawcz

Leaving some API thoughts here:

  1. One could just use Hamilton within hamilton.
@extract_columns(*["video_title", "title_length", "title_word_count"])
def load_df(path: str) -> pd.DataFrame:
    _df = pd.read_csv(path)
    dr = driver.Driver(_df.to_dict(orient="series"), inner_transforms)
    df = dr.execute(["video_title", "title_length", "title_word_count"])
    return df
  1. We could also constrain a decorator that effectively does the above two lines of code ^
@extract_columns(*["video_title", "title_length", "title_word_count"])
@inner_execute(inner_transforms, outputs=["video_title", "title_length", "title_word_count"])
def load_df(path: str) -> pd.DataFrame:
    _df = pd.read_csv(path)
    return _df

With (1) it would be very opaque. With (2) we would have some visibility into creating a single DAG -- with a best guess of determining inputs/no static guarantee with "inner" DAG (it would be a runtime check). Would require some internal work though.

Not much of a difference between @inner_execute and what we already with `reuse_functions. It just joins it. So sure, if you have a set of columns producing a dataframe it could be syntactic sugar on top. IMO Running Hamilton within Hamilton is a bit of an anti-pattern. No lineage guarentees, no way to abstract execution, encapsulation is busted...

elijahbenizzy avatar Jan 29 '23 18:01 elijahbenizzy

Re: the current API -- see this: https://github.com/stitchfix/hamilton/blob/main/decorators.md#reuse_functions.

elijahbenizzy avatar Jan 29 '23 18:01 elijahbenizzy

maybe this is a better API:


@parameterized_subdag(sub_dag, # functions or modules
                      config={"loader": "source_3"},
                      inputs={"source_3_path": source("source_3_path")})
def source_3_combined_data(combined_data: pd.DataFrame, something_not_in_inner: pd.Series) -> pd.DataFrame:
    # function name as namespace
    return combined_data

skrawcz avatar Feb 17 '23 22:02 skrawcz

We are moving repositories! Please see the new version of this issue at https://github.com/DAGWorks-Inc/hamilton/issues/23. Also, please give us a star/update any of your internal links.

elijahbenizzy avatar Feb 26 '23 17:02 elijahbenizzy