dagster
dagster copied to clipboard
Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
Summary
Fan-In in sub-graph throws the Exception: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
Possible duplicate of #6906 ?
Reproduction
from dagster import DynamicOut, DynamicOutput, OpExecutionContext, op, graph
from random import randint
@op(out=DynamicOut(int))
def get_random_numbers(context: OpExecutionContext, nb_samples: int):
context.log.info(f"Sample size: {nb_samples}")
for idx in range(0, nb_samples):
rand_num = randint(0, 100)
yield DynamicOutput(rand_num, mapping_key=str(idx))
@op
def add_one(context: OpExecutionContext, number: int) -> int:
context.log.info(f"Number: {number}")
return number + 1
@op
def add_two(context: OpExecutionContext, number: int) -> int:
context.log.info(f"Number: {number}")
return number + 2
@op
def add_three(context: OpExecutionContext, number: int) -> int:
context.log.info(f"Number: {number}")
return number + 3
@op
def add_all(context: OpExecutionContext, numbers: list[int]) -> int:
all_sum = sum(numbers)
context.log.info(f"Sum: {all_sum}")
return all_sum
@op
def calc_average(context: OpExecutionContext, numbers: list[int]) -> int:
average = sum(numbers) / len(numbers)
context.log.info(f"Average: {average}")
return average
@graph()
def do_stuff(number: int):
cal = [add_one(number), add_two(number), add_three(number)]
return add_all(numbers=cal)
@graph
def test_pipeline():
numbers = get_random_numbers()
res = numbers.map(do_stuff)
calc_average(res.collect())
Dagit UI/UX Issue Screenshots


Additional Info about Your Environment
pip list | grep "dag"
dagit 0.14.3
dagster 0.14.3
dagster-graphql 0.14.3
Message from the maintainers:
Impacted by this bug? Give it a 👍. We factor engagement into prioritization.
I have two types of @dagster.job:
- the first type is non-partitioned job executed by sensor which process the data one by one
- and the second type is partitioned job (with static partitions). Each partition key represents range of data (e.g. 1-10).
In order to simplify the code, I've extracted a new graph from job (1) and use it for job(2). I want to map the partition key to each data but I got the same error.
Is there any alternative of this fan-in subgraph approach?
I'm new to dagster, thank you 🙏
I've also hit this bug recently.
To help a bit in debugging, I set up a reproduction repo: https://github.com/hobofan/dagster-fan-in-repro
When running the job that contains a DynamicOut, which is mapped with a graph that contains branching and a fan-in, it breaks with the error:
dagster._check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
When peeling back that error check, we can see another error that gives a bit more information about the root cause of this issue:
dagster._check.ParameterCheckError: Param \"step_output_handle\" is not a StepOutputHandle.
Got UnresolvedStepOutputHandle(unresolved_step_handle=UnresolvedStepHandle(node_handle=NodeHandle(name='duplicate_foo_text', parent=NodeHandle(name='analyze_foo_or_bar', parent=NodeHandle(name='analyzed_many', parent=None)))), output_name='unified', resolved_by_step_key='analyzed_many.many_foo_bar_fanout', resolved_by_output_name='result') which is type <class 'dagster._core.execution.plan.outputs.UnresolvedStepOutputHandle'>.
So the problem is that the fan-in that is internal to the graph sees unresolved outputs that are originating from the DyanmicOutput (resolved_by_step_key='analyzed_many.many_foo_bar_fanout') as inputs.
This is quite annoying as it hurts clean composability, and currently forces us to use a workaround where we instead of doing proper branching, execute all branches with Optional values and early returns in those cases (which makes the UI view of a run execution a lot busier and understanding which branches were actually evaluated harder).
(I think https://github.com/dagster-io/dagster/issues/5924 is also a related issue that hasn't actually been resolved and marked as duplicate of the wrong issue.)
Just ran into this. Exact problem described by @hobofan. I need the sub-graph to do the conditional branching and the fan in of the branches.
If this is only due to the introduction of the sub-graph I'm happy to live with expanding that into the main graph in the meantime, but I just don't know how to do the conditional branching and its fan-in when the upstream op has dynamic output. The map/collect syntax doesn't seem to allow for conditional branching, is that right?
Eg:
@graph
def foo():
dyn_output = dyn_output_producing_op()
res_to_collect = dyn_output.map(conditional_branch_producing_op).map_z({'branch_1': branch_1_op, 'branch_2': branch_2_op, ...}).what_now(...)
merge_all(res_to_collect.collect())
So in that, I've no clue how to do map_2 and what_now without using a subgraph for it, in which case it would simply be res_to_collect = dyn_output.map(subgraph) and all that complexity gets moved there.