connect icon indicating copy to clipboard operation
connect copied to clipboard

Workflow Processor - DAG Execution Ordering

Open jem-davies opened this issue 1 year ago • 13 comments

Issue Description

The Benthos provided processor Workflow executes a DAG of Nodes, "performing them in parallel where possible".

However the current implementation uses this dependency solver and it takes the approach: resolve the DAG into series of steps where the steps are performed sequentially but the nodes in the step are performed in parallel.

This means that there can be a situation where a step could be waiting for all the nodes in the previous step: even though all dependencies for the step are ready.

Consider the following DAG, from the workflow processor docs:

      /--> B -------------|--> D
     /                   /
A --|          /--> E --|
     \--> C --|          \
               \----------|--> F

The dependency solver would resolve the DAG into: [ [ A ], [ B, C ], [ E ], [ D, F ] ]. When we consider the node E, we can see the that full dependency of this node would be : A -> C -> E, however in the stage before [ E ], there is the node B so in the current Benthos Workflow implementation E would not execute until B even though there is no dependency of B for E.

jem-davies avatar May 18 '24 19:05 jem-davies

~~Added Draft MR #2600 to resolve / gather feedback.~~

jem-davies avatar May 18 '24 20:05 jem-davies

Closed Draft MR https://github.com/benthosdev/benthos/pull/2600 as the commit messages didn't contain a signed-off-by line.

Opened new Draft MR https://github.com/benthosdev/benthos/pull/2607 for feedback.

jem-davies avatar May 25 '24 14:05 jem-davies

The MR is a draft MR because I am making breaking changes to the way workflow is configured so rather than:


pipeline:
  processors:
    - workflow:
        order: [ [ A ], [ B] ]

        branches:
          A:
            processors:
              #...

          B:
            processors:
              #...

pipeline:
  processors:
    - workflow_v2:

        branches:
          A:
            processors:
              #...

          B:
            dependency_list: ["A"]
            processors:
              #...

jem-davies avatar Jun 02 '24 14:06 jem-davies

Can we then please also get rid of request_map and result_map?

AndreasBergmeier6176 avatar Jun 04 '24 07:06 AndreasBergmeier6176

@AndreasBergmeier6176 - won't we still want to be able to use request_map & result_map to work on a subset of the message in that branch?

jem-davies avatar Jun 04 '24 10:06 jem-davies

@AndreasBergmeier6176 - won't we still want to be able to use request_map & result_map to work on a subset of the message in that branch?

So far do not really understand why you would want request_map in the first place. Why not simply use mapping?

AndreasBergmeier6176 avatar Jun 04 '24 11:06 AndreasBergmeier6176

In the original workflow processor it "executes a topology of branch processors".

In a branch the request_map and results_map enable you work on a new message based on those mappings - it's kind of like a different context to the main message that is being passed along the stream.

A mapping processor will replace the message with contents of the message.

I don't wish to alter this as a part of the PR.

jem-davies avatar Jun 04 '24 11:06 jem-davies

In a branch the request_map and results_map enable you work on a new message based on those mappings - it's kind of like a different context to the main message that is being passed along the stream. A mapping processor will replace the message with contents of the message.

Maybe it is me not being a native speaker, but I did not get that difference from the docs neither from the attributes.

Would it make sense to rename e.g. request_map to local_message_map?

AndreasBergmeier6176 avatar Jun 04 '24 11:06 AndreasBergmeier6176

I think that you might find a number of tokens perhaps don't make sense - one in particular is the unarchive processor - I feel that it is poorly named.

You could submit a PR to do a renaming.

EDIT: Thanks for checking out my PR 😄

jem-davies avatar Jun 04 '24 11:06 jem-davies

Within this new design you are now explicitly "drawing" the graph vertices via dependency_list. If you are doing that anyway, I think I would find it more transparent to tell what data is passed and where:

pipeline:
  processors:
    - workflow_v2:

        branches:
          A:
            processors:
              #...
            push_mapping: |
              branch("B") = this.foo

          B:
            dependency_list: ["A"]
            processors:
              #...
            push_branch: ["C"] # Passes message as is to C
          C:
            processors:
              #...

AndreasBergmeier6176 avatar Jun 04 '24 14:06 AndreasBergmeier6176

@AndreasBergmeier6176

I think that you might be confused about how the original workflow processor worked - and how this works as well - as I haven't changed anything that pertains to provenance of what data is passed where.

In the original workflow processor and this v2: all the nodes get the message passed to them. - the message isn't passed along the nodes of the DAG - in the way you perhaps are thinking.

What you can do with the original workflow processor is use the request_map field to enable redpanda-connect to infer the DAG - but doing this will still resolve it into a 2D array - and will still have a problem where in certain DAGs you will find that a Node isn't executed at the time it could.

When the node is ready to run it gets the contents of the entire message given to it at that time - you can use the request_map and results_map to create a separate "context" where fields from the incoming message can be mapped to this new context and after explicitly map fields back to the message of the data-stream.

You then realise that to fix the DAG execution it is required to use a different data structure then the one is being used by the original workflow processor, this new proposed data structure is a mapping of nodes -> dependency_list and could be separate from the branches part of the config - however to avoid duplication I have included a new field for the definition of a branch.

jem-davies avatar Jun 04 '24 18:06 jem-davies

I also am thinking that it would be better - (though I am happy to be told otherwise) - we get rid of inferring the DAG from request_maps in the new workflow_v2 processor.

I think it makes more sense to have the user explicitly state the dependents of the DAG in the config.

jem-davies avatar Jun 04 '24 18:06 jem-davies

I have started to think that making this alteration to the way the config is specified - is better and that the PR should be resolved with a new workflow_v2 processor.

jem-davies avatar Jun 04 '24 18:06 jem-davies