Use @inject with @pipe decorators
Talking with @skrawcz and @elijahbenizzy on Slack about a use case @skrawcz discovered experimenting that using @inject decorator in conjunction with @pipe decorator does not work.
Current behavior
@skrawcz Please add more description here about the current behavior
Expected behavior
The decorators should work in conjunction.
Stack trace:
Traceback (most recent call last):
ValueError: Cannot call NodeExpander: <class 'hamilton.function_modifiers.expanders.inject'> on more than one node. This must be called first in the DAG. Called with [<A_processed {'module': 'functions'}>, <A_processed.with_echo {'module': 'functions'}>, <A_processed.with_echo_1 {'module': 'functions'}>]
Code:
# functions.py - declare and link your transformations as functions....
import pandas as pd
from hamilton.function_modifiers import extract_fields, inject, source, pipe, step
@extract_fields(
{
"col1 == 'A' and col2 =='B'": pd.DataFrame,
"col1 == 'B' and col2 =='C'": pd.DataFrame,
}
)
def dataframe_partition(partitions: list[str], df: pd.DataFrame) -> dict:
# get a dict by grouping
# return dict of name to dataframe
return {"col1 == 'A' and col2 =='B'": pd.DataFrame({'a': [1,2,3]}),
"col1 == 'B' and col2 =='C'": pd.DataFrame({'b': [1,2,3,]})}
def _echo(value: pd.DataFrame, v: int) -> pd.DataFrame:
print(v)
return value
@pipe(
step(_echo, v=1),
step(_echo, v=2),
)
@inject(A=source("col1 == 'A' and col2 =='B'"))
# A should be applied on "A White Horse" Dataframe
def A_processed(A: pd.DataFrame) -> pd.DataFrame:
print("I've just done a ton of transformations, each one of which is a node in the DAG")
return A # it gets passed the result of transforming them
#run.py
# And run them!
import functions
from hamilton import driver
dr = driver.Driver({}, functions)
dr.display_all_functions(
"graph.dot", orient="TB", show_legend=False)
@elijahbenizzy I think what's required here is a target_= parameter, but that would require changing parameterize for inject since inject subclasses it?
@elijahbenizzy I think what's required here is a
target_=parameter, but that would require changingparameterizeforinjectsince inject subclasses it?
inject should not subclass @parameterize -- implementing it correctly would probably be the fix here. It was a quick trick to get it to work.
What it should do is modify the node that takes in the desired parameter to take in a new parameter (either fixed or from another node) -- I think this means implementing NodeModifier