dagster icon indicating copy to clipboard operation
dagster copied to clipboard

Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.

Open 0xfabioo opened this issue 3 years ago • 2 comments

Summary

Fan-In in sub-graph throws the Exception: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.

Possible duplicate of #6906 ?

Reproduction

from dagster import DynamicOut, DynamicOutput, OpExecutionContext, op, graph
from random import randint


@op(out=DynamicOut(int))
def get_random_numbers(context: OpExecutionContext, nb_samples: int):
    context.log.info(f"Sample size: {nb_samples}")
    for idx in range(0, nb_samples):
        rand_num = randint(0, 100)
        yield DynamicOutput(rand_num, mapping_key=str(idx))

@op
def add_one(context: OpExecutionContext, number: int) -> int:
    context.log.info(f"Number: {number}")
    return number + 1

@op
def add_two(context: OpExecutionContext, number: int) -> int:
    context.log.info(f"Number: {number}")
    return number + 2

@op
def add_three(context: OpExecutionContext, number: int) -> int:
    context.log.info(f"Number: {number}")
    return number + 3

@op
def add_all(context: OpExecutionContext, numbers: list[int]) -> int:
    all_sum = sum(numbers)
    context.log.info(f"Sum: {all_sum}")
    return all_sum

@op
def calc_average(context: OpExecutionContext, numbers: list[int]) -> int:
    average = sum(numbers) / len(numbers)
    context.log.info(f"Average: {average}")
    return average

@graph()
def do_stuff(number: int):
    cal = [add_one(number), add_two(number), add_three(number)]
    return add_all(numbers=cal)

@graph
def test_pipeline():
    numbers = get_random_numbers()
    res = numbers.map(do_stuff)
    calc_average(res.collect())

Dagit UI/UX Issue Screenshots

image

image

Additional Info about Your Environment

pip list | grep "dag"
dagit                  0.14.3
dagster                0.14.3
dagster-graphql        0.14.3

Message from the maintainers:

Impacted by this bug? Give it a 👍. We factor engagement into prioritization.

0xfabioo avatar Mar 15 '22 08:03 0xfabioo

I have two types of @dagster.job:

  1. the first type is non-partitioned job executed by sensor which process the data one by one
  2. and the second type is partitioned job (with static partitions). Each partition key represents range of data (e.g. 1-10).

In order to simplify the code, I've extracted a new graph from job (1) and use it for job(2). I want to map the partition key to each data but I got the same error.

Is there any alternative of this fan-in subgraph approach?

I'm new to dagster, thank you 🙏

pyk avatar Feb 21 '23 14:02 pyk

I've also hit this bug recently.

To help a bit in debugging, I set up a reproduction repo: https://github.com/hobofan/dagster-fan-in-repro

When running the job that contains a DynamicOut, which is mapped with a graph that contains branching and a fan-in, it breaks with the error:

dagster._check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.

When peeling back that error check, we can see another error that gives a bit more information about the root cause of this issue:

dagster._check.ParameterCheckError: Param \"step_output_handle\" is not a StepOutputHandle.
Got UnresolvedStepOutputHandle(unresolved_step_handle=UnresolvedStepHandle(node_handle=NodeHandle(name='duplicate_foo_text', parent=NodeHandle(name='analyze_foo_or_bar', parent=NodeHandle(name='analyzed_many', parent=None)))), output_name='unified', resolved_by_step_key='analyzed_many.many_foo_bar_fanout', resolved_by_output_name='result') which is type <class 'dagster._core.execution.plan.outputs.UnresolvedStepOutputHandle'>.

So the problem is that the fan-in that is internal to the graph sees unresolved outputs that are originating from the DyanmicOutput (resolved_by_step_key='analyzed_many.many_foo_bar_fanout') as inputs.

This is quite annoying as it hurts clean composability, and currently forces us to use a workaround where we instead of doing proper branching, execute all branches with Optional values and early returns in those cases (which makes the UI view of a run execution a lot busier and understanding which branches were actually evaluated harder).

(I think https://github.com/dagster-io/dagster/issues/5924 is also a related issue that hasn't actually been resolved and marked as duplicate of the wrong issue.)

hobofan avatar May 07 '24 12:05 hobofan

Just ran into this. Exact problem described by @hobofan. I need the sub-graph to do the conditional branching and the fan in of the branches.

If this is only due to the introduction of the sub-graph I'm happy to live with expanding that into the main graph in the meantime, but I just don't know how to do the conditional branching and its fan-in when the upstream op has dynamic output. The map/collect syntax doesn't seem to allow for conditional branching, is that right?

Eg:

@graph
def foo():
    dyn_output = dyn_output_producing_op()
    res_to_collect = dyn_output.map(conditional_branch_producing_op).map_z({'branch_1': branch_1_op, 'branch_2': branch_2_op, ...}).what_now(...)
    merge_all(res_to_collect.collect())

So in that, I've no clue how to do map_2 and what_now without using a subgraph for it, in which case it would simply be res_to_collect = dyn_output.map(subgraph) and all that complexity gets moved there.

spandan-sharma avatar Jul 31 '24 03:07 spandan-sharma