hamilton icon indicating copy to clipboard operation
hamilton copied to clipboard

Parameterized sub dag from config

Open janhurst opened this issue 2 years ago • 12 comments

Is your feature request related to a problem? Please describe. I would like to define a DAG that uses a list of inputs to "fan out" to sub DAGs, and then collect the results of the sub DAGs back together. At present the syntax to do this feels a bit awkward, but this may also reflect my inexperience with hamilton.

I am passing a list of inputs via config and using @resolve and parameterized_subdag

@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda inputs_from_config: parameterized_subdag(
        sub_dag_module, **_subdag_inputs(inputs_from_config)
    ),
)
def pipeline(sub_dag_fcn: MyObj) -> MyObj:
    return sub_dag_fcn

My _subdag_inputs "helper" looks something like:

def _subdag_inputs(inputs: dict) -> dict[str, dict]:
    return {
        f"{foo}.{bar}": {
            "inputs": {
                "foo": value(foo),
                "thing": value(bar),
            }
        }
        for foo, bar in inputs.items()
    }

I then collect the sub DAG outputs back together using @resolve and inject:

@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda inputs_from_config: inject(
        result=group(
            **{
                f"{foo}.{bar}": source(f"{foo}.{bar}")
                for foo, bar in inputs_from_config.items()
            }
        )
    ),
)
def all_sub_dag_results(result: dict[str, Any]) -> dict[str, Any]:
    """A collector."""
    return result

Apologies the example is obfuscated somewhat from my real world use case. I hope it makes sense.

Describe the solution you'd like A decorator that can parameterize a subdag from config and a way of templating (?) the input dict generator. I'm trying to "express" that structure, and avoid needing to do mental arithmetic around dict expansions to understand where the sub dag inputs come from. Perhaps along the lines of:

@parameterized_subdag_from_config(my_config_inputs)
def pipeline(...)

with my_config_inputs being something like [{"foo.bar": { "foo": "something", "bar": "else" }}]

And a related:

@collect_sub_dags_from_config(my_config_inputs)
...

Describe alternatives you've considered Started to move some of the dict comprehensions out into functions to try and clean up the decorator calls. Starting to investigate some sort of custom decorator.

janhurst avatar Apr 28 '23 16:04 janhurst

@janhurst thanks for the issue!

A clarifying question:

  1. do you want everything to be a part of a single DAG?

Otherwise, with a new decorator like this, we like to iterate from a documentation first perspective. E.g. what would the docs look like if you were to explain this to someone. What would an example look like, etc. We find this helps us try out the new API/UX before trying to figure out how to make it work underneath. Do you have any strong thoughts here?

Lastly, just to park a half-thought for myself, and anyone reading. Would a class approach work better here? Or some more structured way to go from driver to expressing this functionality?

skrawcz avatar May 02 '23 06:05 skrawcz

For my current use case, yes, everything is conceptually part of one single DAG.

To try and be more complete.... I want my input list to fan out to a sub DAG per list item, and then have several collections that may potentially overlap. Ie item 1, 2, and 3 may be "joined" back into group 1, items 4, 5, and 6 joined back to group 2, and finally say item 1, 3 and 5 joined into a group 3.

I do also have further sub DAGs after the collections but I have physical system boundaries that probably means its much easier completely separating these.

I'm not too sure what you have in mind for a class based approach...

janhurst avatar May 02 '23 15:05 janhurst

... then have several collections that may potentially overlap Ie item 1, 2, and 3 may be "joined" back into group 1, items 4, 5, and 6 joined back to group 2, and finally say item 1, 3 and 5 joined into a group 3.

Clarification question. Would you expect this to be done in the subdag? or in the "collect" function? or?

Spitballing some more APIs - let me know what you think.

@config.parameterized_subdag(python_modules, config_key="my_config_inputs")
def pipeline(...) -> ...:
    return ...

@config.collect(config_key="my_config_inputs")
def single_collect_node(collection: ...) -> ...:
    return ...
@parameterized_subdag_from_config(config_key="my_config_inputs")
def pipeline(...) -> ...:
   return   ...
@collect_sub_dags_from_config(config_key="my_config_inputs")
def single_collect_node(collection: ...) -> ...:
  return   ...

I'm also brainstorming whether named tuples, or an object that you have to create driver side would help here at all? Dictionaries within dictionaries can get messy...

skrawcz avatar May 03 '23 05:05 skrawcz

Clarification question. Would you expect this to be done in the subdag? or in the "collect" function? or?

I was thinking on the inject... I want to ask for group1 and have the driver execute the subdags that group 1 is collecting. At this stage i would only ask for a single group at any time.

Spitballing some more APIs - let me know what you think.

I think the second option feels more natural to me.

I'm also brainstorming whether named tuples, or an object that you have to create driver side would help here at all? Dictionaries within dictionaries can get messy...

My inputs_from_config is ideally just a list... and the _sub_dag_inputs is just trying to adapt the list into whatever format the parameterized subdag decorator is expecting. I'd have to think/play with it a bit more....

janhurst avatar May 03 '23 06:05 janhurst

One idea -- is this something that could be grouped into one? (will need a larger name)

@inject_subdags(config="subdag_config")
def collect(subdag_results: Collection[...]) -> ...:
    ...

I think the trickier part is determining what the config item should look like...

elijahbenizzy avatar May 03 '23 16:05 elijahbenizzy

Ok, we've been mulling over some ideas. Here's what I've thought through. This still has some of the complexity of the framework, but its hidden in an object you manage. The key is that we need to convert the config shapes to the structure we want -- either we assume a standard shape or bury the complexity...

# generator class that generates one for each field in a dict
# we could imagine helpers here that take in a common shape
# in this case though, you'd be responsible for mapping the shape
class MySubdagGenerator(OneForEachField):
    def __init__(config_item: str): 
        super(SubdagGenerator, self).__init__(config_item)

    def create_subdag(key: str, value_: Any) -> Subdag:
        return Subdag(
            name=key,
            result='sub_dag_fcn',
            inputs={
                "foo": value(value_['foo']),
                "thing": value(value_['bar']),
            })
     
# One pass
# Repeat the subdag then collect
@repeat_subdag_and_collect(
     sub_dag_module,
     MySubdagGenerator(config_field='inputs') 
)
def pipeline(objects: Dict[str, MyObj]) -> MyObj:
    return sub_dag_fcn

# multi pass
# Repeat the subdag, expose
@repeat_subdag(
     sub_dag_module,
     MySubdagGenerator(config_field='inputs') 
)
def pipeline__subdag(result: MyObj) -> MyObj:
    return sub_dag_fcn

# multi pass
# collect it all later
# Imagine we have a `filter` argument in the decorator, TBD what that would look like
# But that way you could collect multiple times
@collect_subdag('pipeline') 
def pipeline__collect(results: Dict[str, MyObj]) -> MyObj:
    return custom_collect(results)

elijahbenizzy avatar May 09 '23 18:05 elijahbenizzy

@janhurst @amosaikman just to play the full spectrum of solutions. Did you consider doing something like this? Would be interested in your feedback here :)

using multiple drivers and not making one uber DAG

Hamilton code:

  1. We have modules that define a DAG, i.e. subdag we want to run with different inputs. e.g. sub_dag_module houses the hamilton functions here.
  2. We have another section of the DAG that we feed the inputs of (1) from. e.g. rest_of_dag_modules houses other hamilton functions. We could have one function that looks like:
def all_sub_dag_results(subdag_results: dict[str, Any]) -> dict[str, Any]:
    """A collector."""
    ...
    return subdag_results

Then we'd have a run.py that could look like following:

# run.py

inputs = read_from_yaml(...)

subdag = driver.Driver({}, sub_dag_module)  # what you want to parameterize (assumption it's paramterized on inputs)

def subdag_runs(subdag: driver.Driver, inputs: dict) -> dict[str, MyObj]:
    """runs the subdag driver multiple times with different inputs"""
    return {
        f"{foo}_{bar}": 
            subdag.execute(
                 ["sub_dag_fcn"], 
                 inputs={
                               "foo":  foo,
                               "thing": bar,
            })
           for foo, bar in inputs.items()
    }

rest_of_dag = driver.Driver({"subdag_results" : subdag_runs(subdag, inputs)}, rest_of_dag_modules)
result = rest_of_dag.execute(
    ["all_sub_dag_results"],
    inputs={...})

Thoughts? (Right now the above code should work -- but it would eagerly evaluate. If we wanted to create a single DAG from it, we could come up with a way to chain "drivers"...).

skrawcz avatar May 09 '23 21:05 skrawcz

Did you consider doing something like this?

My first inclination was something similar, and this is where I started.... I built a function that generated my inputs as an exploded dict/list, which was essentially a list of sub-DAGs with the right sub-DAG inputs. I then filtered that for the portion of the DAG I cared about for any given run. In fact, I just went back to doing this because building the DAG was a bit slow.

I was really just trying to say here is my big full list of everything I want in my workflow, but for this next run I only care about this section of the DAG.

The way I was deciding about "this section", and what I was doing with the filter bit, was kind of an at run time problem and kind of a caching problem... each sub DAG is expensive (analogous to calling a DB ... so slow), and if I have done it before don't do it again unless I can determine it has changed.

But.... my workflow is along the lines of here is my big full DAG... please give me this output and figure out if anything has changed in the nodes to reach the output node along the way.

I hope that makes sense... I'm very worried I am just over thinking this!

Some of my rationale was, oh great, hamilton can just work this out when I press the go button.

# One pass
# Repeat the subdag then collect
@repeat_subdag_and_collect(

Whilst I like this, it is only true for my current state and not my to be state. I don't always want to collect all of the sub-DAGs

# Imagine we have a `filter` argument in the decorator, TBD what that would look like
# But that way you could collect multiple times
@collect_subdag('pipeline') 

If I followed it right, this does sound nice!

I'm about to go back and refactor my pipelines. Will take me a day or two to tinker (this stuff is not my primary work :D) but I am thinking of how I can change this to almost sub-DAGs with sub-DAGs, or some type of fan out, collect, fan out again, multiple selective collects.... and in particular take a look at how Dask visualizes such a setup. Will post back here once I've done so!

janhurst avatar May 11 '23 16:05 janhurst

Did you consider doing something like this?

My first inclination was something similar, and this is where I started.... I built a function that generated my inputs as an exploded dict/list, which was essentially a list of sub-DAGs with the right sub-DAG inputs. I then filtered that for the portion of the DAG I cared about for any given run. In fact, I just went back to doing this because building the DAG was a bit slow.

Interesting. How many nodes is your DAG overall then? It shouldn't be that slow. Otherwise we could add a way to chain execution in this "procedural" way that would allow you to construct everything, and then only ask for the subset you need... Would that be of interest?

I was really just trying to say here is my big full list of everything I want in my workflow, but for this next run I only care about this section of the DAG.

Yep makes sense. So given a large DAG, only compute this section I'm interested in.

The way I was deciding about "this section", and what I was doing with the filter bit, was kind of an at run time problem and kind of a caching problem... each sub DAG is expensive (analogous to calling a DB ... so slow), and if I have done it before don't do it again unless I can determine it has changed.

Nice, yep. We've thought about it and definitely think Hamilton has the hooks to do it nicely, i.e. skip computing nodes if its "cached" or skip if "things haven't changed" since the last run. But there's a few ways to implement it and so we haven't decided on what to do here yet.

But.... my workflow is along the lines of here is my big full DAG... please give me this output and figure out if anything has changed in the nodes to reach the output node along the way.

I hope that makes sense... I'm very worried I am just over thinking this!

I think so? Maybe we should draw up a requirements doc just to ensure we're on the same page?

Some of my rationale was, oh great, hamilton can just work this out when I press the go button.

# One pass
# Repeat the subdag then collect
@repeat_subdag_and_collect(

Whilst I like this, it is only true for my current state and not my to be state. I don't always want to collect all of the sub-DAGs

# Imagine we have a `filter` argument in the decorator, TBD what that would look like
# But that way you could collect multiple times
@collect_subdag('pipeline') 

If I followed it right, this does sound nice!

Devils in the details of the "filter" part though 😆 .

I'm about to go back and refactor my pipelines. Will take me a day or two to tinker (this stuff is not my primary work :D) but I am thinking of how I can change this to almost sub-DAGs with sub-DAGs, or some type of fan out, collect, fan out again, multiple selective collects.... and in particular take a look at how Dask visualizes such a setup. Will post back here once I've done so!

Sounds good. I think some more functional requirements could help us determine whether a more procedural approach, e.g. driver chaining, or declarative approach, e.g. special decorators, or both could be useful to have in Hamilton. Side note: I'm interested in how dask helps or not -- could be a good blog post if it does.

skrawcz avatar May 12 '23 03:05 skrawcz

What @skrawcz said, but I also want to say that I don't think you're overthinking it. Complex DAGs are just that (complex), and this is why I'm really excited about Hamilton. IMO nobody has ever simplified DAG development enough to make it worthwhile at a fine-grained level, and I think we're onto the solution here.

And yep! The filter was a catch-all for your specific case 😆 I should flesh it out more -- could come from a config, be a regex, etc...

elijahbenizzy avatar May 12 '23 03:05 elijahbenizzy

Interesting. How many nodes is your DAG overall then? It shouldn't be that slow.

My config is a dict[Tuple[str,str], dict[str, pd.Timestamp]] with about 1750 elements.

The driver takes a little over 20 seconds to build the DAG ("prosumer" desktop). Its bearable, just a little sluggish?

Currently this is an "EL" pipeline and I'm about to start building out the transforms in between. I expect to see the node count jump up a fair bit, but also quite reasonable for me to split the transforms as fully separate second DAG as the transforms are all pretty simple/cheap aggregation-y things.

janhurst avatar May 12 '23 23:05 janhurst

Interesting. How many nodes is your DAG overall then? It shouldn't be that slow.

My config is a dict[Tuple[str,str], dict[str, pd.Timestamp]] with about 1750 elements.

The driver takes a little over 20 seconds to build the DAG ("prosumer" desktop). Its bearable, just a little sluggish?

Currently this is an "EL" pipeline and I'm about to start building out the transforms in between. I expect to see the node count jump up a fair bit, but also quite reasonable for me to split the transforms as fully separate second DAG as the transforms are all pretty simple/cheap aggregation-y things.

I'd be surprised if we couldn't optimize that quite a bit. Unfortunately hard to reproduce, but if you have some generic representation that could repro the problem, I'd be happy to take a stab and see if I can make it build faster (no gaurantees , but I'd be shocked if there wasn't some low-hanging fruit that could benefit everybody as well...)

Otherwise, I think the filter is the key here -- we'll report back with some ideas!

elijahbenizzy avatar May 13 '23 17:05 elijahbenizzy

Any need for this feature still? Will close for now unless there's some updates :)

skrawcz avatar Jul 18 '24 19:07 skrawcz