Enable mutation of the output of nodes in a linear fashion via decorators
Is your feature request related to a problem? Please describe. This is similar to #701 -- but a distributed version.
People don't want to change column/dataframe/artifact names. This conflicts with Hamilton. Is there some API we can support?
Describe the solution you'd like One idea is that you pipe transforms together and have the framework group things so that there isn't a renaming issue.
E.g.
def data_set(...) -> pd.DataFrame:
...
return df
@mutate
def _data_set(data_set: pd.DataFrame) -> pd.DataFrame:
# some mutation
return df
@mutate
def _data_set(data_set: pd.DataFrame) -> pd.DataFrame:
# some other mutation
return df
Notes:
- python modules can only expose one function with the same name -- this is the last one defined.
- this means that anything we want to use downstream can only be defined once.
- the mutating functions here in the above are prefixed
_which is reserved for private functions. Which is fine I think because these transform functions aren't useful by themselves -- and shouldn't be exposed directly. It also gets around the naming issue of (1) --- we can have the decorator register and capture these. Open decision as to what "declares" the connection to the function -- the first argument name? or the name of the function? or? - Order matters. The idea is that the decorator builds up an ordered list of transforms. This allows one to experiment with commenting out functions etc as they're developing...
- When Hamilton inspects this module, it then pulls
data_setand then checks what was registered against it via@mutate. One initial constraint we can have is that@mutatehas to be in the same module; We should design for multi-module, but as a first pass constrain to same module... - Hamilton would then render this correctly exposing those nodes in the graph... and expose
data_setas the result of applying all those transforms.
Describe alternatives you've considered Alternative / additional decorator use could be:
@mutate("data_set")
def _some_func_name(arg1: pd.DataFrame) -> pd.DataFrame:
# assumes arg1 maps to data_set ?
# some other mutation
return df
To enable one to have functions names that don't have to match.
This could then help one to write mutations like this -- which I think is a potential vote for allowing multi-module registration --and using module order then to imply transform order application.
for helper_func in helper_funcs:
mutate.register("data_set", helper_func)
Additional context
Here's some code that proves you can intercept and register the functions in mutate decorator.
import collections
function_registry = collections.defaultdict(list)
import functools
def mutate(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
qualified_key = func.__module__ + "." + func.__name__
# Modify the function's name
func.__name__ = func.__name__ + str(len(function_registry[qualified_key]))
wrapper.__name__ = func.__name__
# Register the function
function_registry[qualified_key].append(wrapper)
return wrapper
Then:
from decorator import mutate
import pandas as pd
def my_function(input: pd.Series) -> pd.Series:
return input
@mutate
def _my_function(my_function: pd.Series) -> pd.Series:
return my_function + 1
@mutate
def _my_function(my_function: pd.Series) -> pd.Series:
return my_function * 2
This would be a really cool functionality to have!
I'm considering the following scenario:
I already built a DAG that gives me a baseline functionality or produced a delivarable I was chasing a deadline for. Now I want to experiment by tweaking some nodes, but I would prefer it to be non-invasive and to turn it on or off. Something like doing A/B testing for example, but without needing to decorate the code with config.when. If my transforms work I can make them more permanent, otherwise just delete the one function in one place without needing to do anything else.
We can even add something like turning mutations on or off at building time to test how mutations affect the result.
-- I prefer the alternative implementation for the following reasons:
- I think we should leave the original functions alone, i.e. not add any decorators to them or impose additional naming on them (otherwise the functionality is already there by using other decorators like
pipe_*/ wiring a DAG in the canonical way). - This restriction makes it hard to map the
mutating transformationto more than one original function and we loose the ability to apply the same transformation to multiple nodes. - The alternative solution allows us to pass in a list of function names that would get the same function, can pass in
value()arguments, etc. (I guess this flips the original idea that you only need to comment out the particular function name from the list to cancel the mutation on it, instead of commenting out a whole function) - We still need to think about order, but the naming issue goes away completely since the transforms can be named anything and the decorator binds them.
Let me know if I misunderstood or missed something from the original idea.
Do you have a code example of what it would look like and the updates you'd need to make?
I think the key here is just determining what workflows we want to enable this to work best for, along with showing what the friction points we currently have are. E.g. is it notebook dev? or is it reusing transforms across multiple data sets? or? What does the current code look like to achieve that outcome?
This will then help guide how one registers these additional functions.
Now that I think about it, this also seems also pretty close the ideas with that @with_columns does; Note we haven't made a generic with_columns that works over any dataframe (https://github.com/DAGWorks-Inc/hamilton/issues/1158).
Here is a very rough sketch of what I had in mind:
# sample_module.py
import pandas as pd
from dev import mutate
def my_function(input: pd.Series) -> pd.Series:
return input
def other_function(input: pd.Series) -> pd.Series:
return input
def no_mutations(input: pd.Series) -> pd.Series:
return input
@mutate(functions=["my_function", "other_function"])
def _transform_1(input_function: pd.Series) -> pd.Series:
return input_function + 1
@mutate(functions=["other_function"])
def _transform_2(input_function: pd.Series) -> pd.Series:
return input_function * 2
Here is a very crude implementation, but the essential idea is that we find the right functions and build a pipe_output for each one in the background (pipe_output I put here to fast-jump to a working solution, there's should be a better way of implementing this).
# I think we need to touch only two places
from hamilton.graph_utils import find_functions
import inspect
from types import ModuleType
from typing import Callable, List, Tuple
import collections
import functools
from hamilton.function_modifiers.macros import pipe_output, step
import sample_module
# the actual decorator #############################################################################
def mutate(functions):
def decorator(fn):
@functools.wraps(fn)
def wrapped_fn(*args, **kwargs):
return fn(*args, **kwargs)
# We make all mutable functions hidden
if not fn.__name__.startswith("_"):
# Probably change __qualname__ as well?
wrapped_fn.__name__ = f"_mutate_{fn.__name__}"
else:
wrapped_fn.__name__ = f"_mutate_{fn.__name__[1:]}"
wrapped_fn.functions_to_mutate = functions
return wrapped_fn
return decorator
# somewhere in graph.py ############################################################################
def find_mutations(function_module: ModuleType) -> List[Tuple[str, Callable]]:
"""Function to determine the set of functions we want to build a graph from.
This iterates through the function module and grabs all function definitions.
:return: list of tuples of (func_name, function).
"""
def valid_fn(fn):
return (
inspect.isfunction(fn)
and "_mutate_" in fn.__name__
and is_submodule(inspect.getmodule(fn), function_module)
)
return [f for f in inspect.getmembers(function_module, predicate=valid_fn)]
def create_mutation_pipeline(functions, mutations):
pipelines = collections.defaultdict(list)
for mutation in mutations:
f = mutation[1]
for target_function in f.__dict__["functions_to_mutate"]:
pipelines[target_function].append((f))
return pipelines
def attach_pipelines(functions,pipelines):
new_functions = []
for f in functions:
if not f[0] in pipelines:
new_functions.append((f[0],f[1]))
continue
steps = []
for transform in pipelines[f[0]]:
# place to bind additional args and stuff you can put into step
steps.append(step(transform))
new_functions.append((f[0],pipe_output(*steps)(f[1])))
if __name__ == "__main__":
functions = sum([find_functions(module) for module in [sample_module]], [])
mutations = sum([find_mutations(module) for module in [sample_module]], [])
print(functions)
print(mutations)
pipelines = create_mutation_pipeline(functions=functions, mutations=mutations)
print(pipelines)
new_functions = attach_pipelines(functions=functions, pipelines=pipelines)
print(new_functions)
re: with_columns -- I'll take a look tomorrow, I wasn't aware of it!
Another use case sketch - I want to pull data but keep it the same name...
def cust_data(...) -> pd.DataFrame:
... pull data
@mutate(cust_data)
@check_output(...)
def _filter(cust_data: pd.DataFrame) -> pd.DataFrame:
... filter it
@mutate(cust_data) # do we assume target is first argument?
def _join(cust_data: pd.DataFrame, foo_dep: pd.DataFrame) -> pd.DataFrame:
... join ; we would allow other dag dependencies I think
def aggregate_data(cust_data: pd.DataFrame) -> pd.DataFrame:
... this depends on final cust_data node -- so would be a caveat for users that the functions below still apply.
... (maybe we would have to have a linter to sort files with @mutate)
@mutate(cust_data, some_other_data) # some_other_data is another function; doesn't have to be function pointers, but could be strings.
def _sort(generic_df: pd.DataFrame) -> pd.DataFrame:
... this is a generic helper that is applied to two functions...
@mutate(cust_data, target_="some_data") # required for combined decorator usage?
@with_columns(...)
def _features(some_data: pd.DataFrame) -> pd.DataFrame:
... with columns add features...
Things to think about:
- unit testing -- it is possible this way.
- is the first argument name meaningful? or not?
- do we allow inter module or only allow intra module? We like forcing people to curate code. Inter could make that tricky... inter could allow for people to "break encapsulation" -- not sure that's a good thing (it would require good error and introspection)
- this makes it fuzzy to find what is upstream of a node -- with the function pointer in the decorator it is simpler, if it was a string name a little harder...
Nice, that makes a strong point for the use case that we shouldn't limit @mutate to impact a single function.
- I would bind it the same way as pipe: first argument is the output of the function you want to mutate; other arguments can be passed in via
source/value(maybe even a dict in case of multiple functions so we can do different values? but, this sounds complicated...) - yes, on the other hand it would allow to have a single module with only
mutatefunctions as a central place to check out all data processing stuff - not sure in practice which is better. I guess that boils down to the debate of who the end user is: data scientists who need to transform something or their engineering counterparts who want to force them to ship somewhat organised code? - agreed, but this depends on the decision around 3. If we allow inter-module than you would need to import all the functions, which becomes a hassle compared to just writing a list of strings. Can't we assume upstream to be from the "mother" node that gets transformed? (if we are wiring them together internally, should be able to force sharing the namespace?)
Will work on it more tomorrow to get something concrete out.
Some random thoughts:
Mutate should only be able to work with a select few other decorators:
- check_output & check_output_custom
- subdag like ones potentially like subdag & with_columns. i.e. they can only return a single output.
Other decorators I don't think make much sense.
Things to think through: tricky part is that we have "step" that turns into a "node" and that's what we want to potentially apply the check_output and subdag operators on...
We could constraint this initial feature to not enable this to not make this too big of a project -- but this should be taken into account.
The other way to think about this, is what code is it replacing:
def raw_cust_data(...) -> pd.DataFrame:
... pull data
@check_output(...)
def cust_data_filtered(raw_cust_data: pd.DataFrame) -> pd.DataFrame:
... filter it
def cust_data_joined(cust_data_filtered: pd.DataFrame, foo_dep: pd.DataFrame) -> pd.DataFrame:
... join
def aggregate_data(cust_data_features: pd.DataFrame) -> pd.DataFrame:
.... aggregate
def cust_data_sorted(cust_data_filtered: pd.DataFrame) -> pd.DataFrame:
... sorted
def some_data_sorted(some_data: pd.DataFrame) -> pd.DataFrame:
...
@with_columns(...)
def cust_data_features(cust_data_sorted: pd.DataFrame) -> pd.DataFrame:
... with columns add features...
So it replaces:
- having to have unique names and then changing wiring if you want to add/remove/replace something.
- enabling more verb like names on functions
- potentially simpler "reuse" of transform functions across DAG paths...
Closing since #1160 was merged