flyte icon indicating copy to clipboard operation
flyte copied to clipboard

Asynchronous / manual / human-in-the-loop tasks

Open MrSaints opened this issue 4 years ago • 10 comments

Motivation: Why do you think this is important?

There are several data / AI / ML workflows which may require asynchronous completion. That is, rather than having the task automatically scheduled on Kubernetes or Spark, it can be asynchronously picked up, and completed. The intended audience for these sort of tasks are humans. Data / AI / ML workflows often require some "human-in-the-loop" (HITL) to audit, review, feedback, and/or adjust the results / outcomes of automated tasks.

Examples:

  • An ETL workflow where a snapshot of the results of an extraction, and/or transformation are assessed before being loaded to avoid polluting the data target
  • A decision-making AI / ML workflow (e.g. risk assessment, fraud detection, etc) where less confident scores are escalated to a human operator to make the final decision
  • Tasks which require completion from robotic process automation (RPA) or some external / on-premise service

Goal: What should the final outcome look like, ideally?

Ideally, it should not deviate too much from Flyte's DSL. It may be achievable with a new type of task. In Flytekit, this may be an @asynchronous_task decorator. External services can pull pending tasks, and complete them through an API.

Describe alternatives you've considered

When discussed on Slack, Ketan recommended:

There is no plugin that waits for manual input today, though conceptually we can definitely add one.
But this can be easily achieved in Flyte without any more code. This is how you could do it
Lets says there is a workflow with task1, task2, task3, task4, task5. In this workflow task3 is actually needing a human input

Workflow1 with task1 & task2

Workflow2 with task4 & B task5

and you can now wait for Workflow1 to finish, do your logic in your own service/component etc and then call flyteAdmin api to trigger workflow2

Though this works, it relies on the external service to make the appropriate callback. That external service may not be concerned about what happens next, and baking that logic may make the whole workflow brittle.

Flyte component

  • [x] Overall
  • [ ] Flyte Setup and Installation scripts
  • [x] Flyte Documentation
  • [ ] Flyte communication (slack/email etc)
  • [ ] FlytePropeller
  • [ ] FlyteIDL (Flyte specification language)
  • [x] #2631
  • [ ] FlyteAdmin (Control Plane service)
  • [ ] FlytePlugins
  • [ ] DataCatalog
  • [ ] FlyteStdlib (common libraries)
  • [ ] FlyteConsole (UI)
  • [ ] Other

[Optional] Propose: Link/Inline

Still thinking about this as I'm quite new to Flyte.

Additional context

I'm borrowing the language from quite a few other orchestrators / workflow engines with a similar feature / concept:

  • Uber's Cadence Workflow: https://cadenceworkflow.io/docs/07_goclient/12_activity_async_completion
  • Camunda's Zeebe: https://zeebe.io/blog/2018/11/message-correlation-working-with-messages-in-zeebe/
  • Netflix's Conductor: https://netflix.github.io/conductor/configuration/systask/#wait
  • Orchestra: https://github.com/b12io/orchestra (human-in-the-loop orchestration)

Is this a blocker for you to adopt Flyte

Not really. The alternative provided by Ketan may work, but again, it would be better to have the visibility, and control in one place. Otherwise, it'd defeat the purpose of having a workflow engine / orchestrator managing the control flow while services can focus purely on the business logic.

MrSaints avatar Mar 14 '20 20:03 MrSaints

@MrSaints thank you for filing this issue. This is a very interesting problem. A few quick thoughts, Overall I do agree that it makes sense to support this at a platform level. The implementation for this problem actually matters.

Lets consider various approaches, each considering various trade-offs. Before going to the options it is important to understand that currently -- a workflow in Flyte is a kubernetes resource. Thus an important trade-off is number of workflows active concurrently and its impact on performance.

Solution 1: Flyte workflows waits for an async trigger or a signal, using a task plugin. This can be implemented using a plugin for Flyte propeller backend (written as a FlytePlugin). Pros:

  • Simple to implement (hopefully we can make it trivial), some examples already exist
  • Fits very simply into the model, just a plugin
  • Triggers can be received from variety of sources, SQS queues/ kafka topics, databases etc Cons:
  • if the task is in wait state FlytePropeller is still periodically checking the workflow for completion. The engine itself is unaware of the task and cannot perform any special optimizations. Thus if there are too many workflows, it may reduce the throughput of the system. Ofcourse this can be optimized but the implementation of the plugin may not be as trivial. Also this affects for systems that do not use sharded propeller and are running excess of few thousand workflows per second and the increase in latency is unacceptable.

Solution 2: Flyte supports pausable/resumable workflows natively We could implement this in multiple ways, I am writing one way this can be implemented. This requires modification of Flyte backend to understand a new special type of node called Waitable Node. This waitable node can talk to FlyteAdmin API to check for the signal that allows it resume. Pros:

  • Contained and elegant system, and looks like this is a feature on the platform
  • As Propeller and Admin now know about the specification of wait, they can optimize the system much more easily. For example, when propeller encounters a wait node, it simply adds a label that reduces the frequency at which the signal is checked for. When admin receives a signal, it removes this label. The label is used to populate the right set of informers and queues with propeller. Note, you still need to periodically check the signal to ensure that two phase commits are not a problem, this may have a rare but undesirable behavior of increasing the time to resolve the signal and resume the workflow.
  • Visibility that the signal was received on Admin
  • Nicer API Cons:
  • Needs code changes in the platform and IDL updates (not simply a plugin)

kumare3 avatar Mar 15 '20 02:03 kumare3

I think we should probably go with Solution 1, as it can be implemented much sooner and then implement it as Solution 2. Now @MrSaints, Solution 1 does not even need to implemented in Flyte backend. You could simply add a python flytekit plugin that waits for an external event. Actually we already have some plugins that do something similar - https://github.com/lyft/flytekit/tree/master/flytekit/contrib/sensors with an implementation for Hive Tables and paritions landing. You could easily add one to wait for a file in S3 etc.

kumare3 avatar Mar 15 '20 02:03 kumare3

Thanks for the very prompt response @kumare3. I'll definitely look into solution 1 for the time being. With the right adjustments, and deployment, the latency overhead should not be noticeable.

MrSaints avatar Mar 16 '20 09:03 MrSaints

I want to revive this issue as we have found a great way to solve this problem and we might have bandwidth to tackle it.

Potential solution:

  1. Create a new type called Future
message Future {
   Literal v = 1; // the actual value of this future. This will start off a empty
   Identifier reference = 2; // The reference i an identifier, that allows for FlytePropeller to lookup a value
}
  1. FlyteAdmin has a new API called Signal
rpc Signal(SignalRequest) SignalResponse {
}

message SignalRequest {
Identifier reference = 1;
Literal v = 2;
}
  1. FlytePropeller whenever encounters a future without the value, will call FlyteAdmin and wait for the value to appear. This is only when a Task wants to use a future value
  2. FlyteKit example
@task
def foo(f: flytetypes.future) flytetypes.future: 
   pass

kumare3 avatar Feb 10 '21 00:02 kumare3

@EngHabu / @kanterov / @katrogan ^

kumare3 avatar Feb 10 '21 00:02 kumare3

On more thinking, Future is the wrong terminology. Its actually a condition variable or a Promise (not to be confused with current promise in flytekit) and condition can be overloaded. Lets call it Signal for now


BoolSignal = Signal[bool]

@workflow
def my_wf(a: int, b: BoolSignal) -> int:
   v = t1(a=a)
   bv = flyte_await(b)
   return conditional("check-signal").if_(bv==True).then(t2(a=a)).else_.fail("Bad error")
   

Now using the Signal API users can signal and release the await. As follows,

remote.signal("exec_id", {"b":True})

Users will also be able to signal this inline in the UI, when the await is encountered or separately ahead of time

kumare3 avatar Jan 08 '22 18:01 kumare3

@MrSaints would you be interested in collaborating on this?

kumare3 avatar Jan 11 '22 17:01 kumare3

@kumare3 Hey! To be honest, I've been a bit distant from Flyte recently (haven't used it in a while), so I don't really have a compelling need for this feature now.

MrSaints avatar Jan 11 '22 17:01 MrSaints

@MrSaints no worries, I think we have had other users ask for it, so we might be working on this. definitely care about your feedback :)

kumare3 avatar Jan 11 '22 17:01 kumare3

Cc @hamersaw kers mark it in progress?

kumare3 avatar May 09 '22 13:05 kumare3

this is the final flytekit api @hamersaw

from flytekit import task, workflow, approve, wait_for_input

@task
def t1(a: int) -> int: return a + 5

@task
def t2(a: int) -> int: return a + 6

# proposal
@workflow
def wf(a: int) -> typing.Tuple[int, int]:
    x = t2(a=wait_for_input(int, name="user_input_name", timeout=timedelta(hours=2)))
    
    # by default dan forgot to update the comments on this section
    y = t1(a=a)
    z = t1(a=approve(y, name="approval_name", timeout=timedelta(hours=1)))

    return x, z

wild-endeavor avatar Oct 03 '22 18:10 wild-endeavor

We can close this when the following are done:

  • https://github.com/flyteorg/flyte/issues/3270
  • https://github.com/flyteorg/flyte/issues/3354

cosmicBboy avatar Mar 09 '23 22:03 cosmicBboy

this should be closed now right?

kumare3 avatar May 02 '23 04:05 kumare3