Reusable subDAG components
Is your feature request related to a problem? Please describe. Reusing functions, helpers, etc... are all nice and good. However, sometimes you want to be able to reuse large subcomponents in the DAG.
Describe the solution you'd like
Two idea:
- Use the Driver to stich bigger DAGs together
- Two decorator (still need to fully think this through)
This uses prefixes, but some actual namespace notion could be nice here.
@hamilton.subdag_split(
inputs={
'subdag_1' : {'source' : 'source_for_dag_1'}, # subdag 1 gets its source from a different place than subdag 1
'subdag_2 : {'source' : 'source_for_dag_2'}}}
def data(source: str) -> pd.DataFrame:
return _load_data(source)
def foo(data) -> pd.Series:
return _do_something(data)
@hamilton.subdag_join(subdag='subdag_1')
def process_subdag_1(foo) -> pd.Series:
return _process_some_way(foo)
The framework would then compile this in a fancy way -- ensuring that every node between the splits and the joins is turned into one for each subdag, under a separate namespace. TBD on how to access it.
Describe alternatives you've considered Not allowing this -- I don't have a concrete use-case blocking for anyone but we have observed one at Stitch Fix.
Additional context Thinking ahead here.
OK, starting to think about this one. This is not something we want yet, but it is something we should plan for. Going to propose the following spec, I think this is much cleaner. See code example:
# marketing_module.py
def signups(raw_signup_data: pd.Series, business_line: str) -> pd.Series:
...
def marketing_spend(all_marketing_data: pd.Series, business_line: str) -> pd.Series:
...
def acquisition_cost(signups: pd.Series, marketing_spend: pd.Series) -> pd.Series:
...
def acquisition_cost_smoothed() -> pd.Series:
...
# dataflow.py
import marketing_module
@ubernode( # For lack of a better name...
load_from=marketing_module, # modules or list of functions, cause why not?
inject={'business_line' : value('womens')}, #this parameter gets injected in
namespace='womens', # all nodes are named `womens.<node>`
extract={'acquisition_cost' : 'acquisition_cost_womens', 'acquisition_cost_smoothed' : 'acquisition_cost_smoothed_womens'}
)
# Not sure what this function should look like. Idea would be to error out if you don't include all sources in the subDAG, then print out the signature for them to copy/paste.
def acquisition_data_womens(
raw_signup_data: pd.Series,
all_marketing_data: pd.Series) -> MultiNodeOutput(acquisition_cost_smoothed_womens=pd.Series, acquisition_cost_womens=pd.Series):
pass
Motivation
Hamilton is a new paradigm for writing dataflow code. Nodes in a dataflow map 1:1 with functions, exactly. As opposed to a system in which some lower-level DSL (E.G. python) is used to construct static dataflows, we specifically trade off verbosity (of which Hamilton has plenty) for readability, debuggability, ease of development, etc.. We consider this to be strictly good, but it does not work for all cases, namely that of repetition and configuration (corresponding to for and if/else in a more traditional paradigm). Thus we've introduced a few decorators (parameterize and config.when) to allow for greater flexibility while preserving the benefits of Hamilton. These tend to break down, however, when one wants to apply for-loops over higher-level constructs in the DAG. For example, one might have a set of metrics that they wish to run on both the mens and womens business line. There are a few currently feasible approaches, each of which has its trade-offs:
- Run separate drivers: works if materialization should be kept separate and these datums aren't joined as part of the dataflow.
- Parameterize everything: works if the subset of computation is small enough and easy enough to handle to add a bunch of parameterizations.
- Run a node that instantiates a driver and runs that (yikes)
However, none of these solve the problem of actually repeating in the same DAG. This proposal does that.
Terminology
namespace A way to differentiate two nodes with the same name. E.G. with node baz and namespaces foo and bar, we have nodes foo.bar and foo.baz. This exists solely in conjunction with new subdags.
subdag A subset of the DAG that is introduced by parameterizing something.
API
Calling the decorater uberdag, although we need a better name. It takes in the following:
- A list of functions or a module (
load_fromabove) - Parameters/config to inject (
injectabove). No reason it can't be the same asparameterized - a namespace (
namespaceabove/how to refer to all the intermediate nodes) - Output mapping (
extract_outputs) above, referring to the nodes from the subdag the user wishes to extract, and how to name them. - Maybe a
configif we're feeling fancy -- this allows us to configure the subDAG.
Then we have the function we decorate. Note that this is actually just gravy (we don't need it with the decorator), but let's make it show something to have it be readable:
- Input parameters correspond to all non-injected inputs/types from the subDAG.
- Output type is a
MultiNodeOutputor something like that. Basically a typeddict with all the outputs.
This is a pain, so we have an error message when its not correct that prints the correct values.
Implementation
So this is easy enough. All we do is:
- If module is passed in, resolve that to functions
- if we already did (1) or functions are passed in, resolve the functions to nodes, using our config
- change those nodes to have the namespaced name so they can coexist
- Add identity nodes to ensure outputs have the right name
- Validate, suggest function signature
- Return all created nodes.
Might see how I feel and prototype this soon -- it'll be <100 LOC. And solves this problem in an extremely comprehensive, powerful, and readable way (I think).
Alright -- prototype is here: https://github.com/stitchfix/hamilton/pull/199. I want to get this out soon for power users. Need to rethink parts of the API.
Leaving some API thoughts here:
- One could just use Hamilton within hamilton.
@extract_columns(*["video_title", "title_length", "title_word_count"])
def load_df(path: str) -> pd.DataFrame:
_df = pd.read_csv(path)
dr = driver.Driver(_df.to_dict(orient="series"), inner_transforms)
df = dr.execute(["video_title", "title_length", "title_word_count"])
return df
- We could also constrain a decorator that effectively does the above two lines of code ^
@extract_columns(*["video_title", "title_length", "title_word_count"])
@inner_execute(inner_transforms, outputs=["video_title", "title_length", "title_word_count"])
def load_df(path: str) -> pd.DataFrame:
_df = pd.read_csv(path)
return _df
With (1) it would be very opaque. With (2) we would have some visibility into creating a single DAG -- with a best guess of determining inputs/no static guarantee with "inner" DAG (it would be a runtime check). Would require some internal work though.
@elijahbenizzy can you put the current API in here too?
Leaving some API thoughts here:
- One could just use Hamilton within hamilton.
@extract_columns(*["video_title", "title_length", "title_word_count"]) def load_df(path: str) -> pd.DataFrame: _df = pd.read_csv(path) dr = driver.Driver(_df.to_dict(orient="series"), inner_transforms) df = dr.execute(["video_title", "title_length", "title_word_count"]) return df
- We could also constrain a decorator that effectively does the above two lines of code ^
@extract_columns(*["video_title", "title_length", "title_word_count"]) @inner_execute(inner_transforms, outputs=["video_title", "title_length", "title_word_count"]) def load_df(path: str) -> pd.DataFrame: _df = pd.read_csv(path) return _dfWith (1) it would be very opaque. With (2) we would have some visibility into creating a single DAG -- with a best guess of determining inputs/no static guarantee with "inner" DAG (it would be a runtime check). Would require some internal work though.
Not much of a difference between @inner_execute and what we already with `reuse_functions. It just joins it. So sure, if you have a set of columns producing a dataframe it could be syntactic sugar on top. IMO Running Hamilton within Hamilton is a bit of an anti-pattern. No lineage guarentees, no way to abstract execution, encapsulation is busted...
Re: the current API -- see this: https://github.com/stitchfix/hamilton/blob/main/decorators.md#reuse_functions.
maybe this is a better API:
@parameterized_subdag(sub_dag, # functions or modules
config={"loader": "source_3"},
inputs={"source_3_path": source("source_3_path")})
def source_3_combined_data(combined_data: pd.DataFrame, something_not_in_inner: pd.Series) -> pd.DataFrame:
# function name as namespace
return combined_data
We are moving repositories! Please see the new version of this issue at https://github.com/DAGWorks-Inc/hamilton/issues/23. Also, please give us a star/update any of your internal links.