faas-flow icon indicating copy to clipboard operation
faas-flow copied to clipboard

Question/Enhancement/Research Batch Managmenent

Open kwojcicki opened this issue 4 years ago • 7 comments

TLDR Wondering the place faas-flow/faas-tower has in batch management ie batch management built on top of faas-flow or completely separate. What I call batch management: (job here is referencing a function call that is part of a batch not a k8s job) viewing batch/job states, pausing/canceling jobs/batches, notification if a batch/job fails, exponential backoff of failed functions, caching of function results.

Longer version I've recently been doing some PoC at work wrt to OpenFaas. Aside from using it as just a FaaS many are interested in shifting their batch processing to OpenFaas. I don't think thats necessarily wrong and potentially OpenFaas itself may add some functionality wrt to batch management (https://github.com/openfaas/faas/issues/657). In the mean time I was looking to build a small management layer that could do the above stated tasks.

I was looking to get some input from you (@s8sg ) if you think its best to build on top of faas-tower to get this functionality (using the distributed tracing you already have built) or maybe adding some extra functionality into faas-flow that could facilitate batch management :smile:

kwojcicki avatar Jul 29 '19 21:07 kwojcicki

Hi @kwojcicki, thank you for creating the issue. It does make a lot of sense to have a batch management system that utilizes OpenFaaS. I'm not entirely convinced that OpenFaaS needs to build the support for batch being a runtime for functions. Similarly, a batch management system may provide different constructs for batch jobs where function execution is one of them.

The current direction of Faas-flow is providing an SDK to Construct a Workflow as a DAG with Operations and provide an Executor that implements the controller and executes Operations in Order.

Currently, the SDK is highly coupled with OpenFaaS but I recently started refactoring it, more details are at Issue #72 . It will take a week or two to complete. cc: @vtolstov.

In near future it will look like:

  1. Faas-Flow SDK (completely decoupled from OpenFaaS) Provide Interface: * Operation * ExecutionHandler * ExecutionRuntime * StateStore * DataStore * EventHandler
  2. Use-case specific Implementation, example faas-flow template (utilizes OpenFaaS).

In short, the SDK supposed to provide extensibility. Operation implements any operations. The Executor uses ExecutionHandler which handle operation execution and ExecutionRuntime which implements the runtime. StateStore implements execution coordinator backend in a distributed ExecutionRuntime and DataStore implements the intermediate data storage. EventHandler handles the flow event with the monitoring backend.

For example, Operations could be a FaaS Function call, a Closure, a request to a MicroService, etc. ExecutionHandler can be implemented for executing the Operations using HTTP, RPC call, etc. ExecutionRuntime could be OpenFaaS or just as a standalone Golang server. StateStore can be any 3rd party synchronous store and DataStore could be any object storage. The workflow itself could be a BatchTemplate, an ETL pipeline, an Application Logic or a distributed SAGA. Monitoring can be done with EventHandler which can be implemented with OpenTracing.

Now as per the current status Faas-flow supports some of the features you mentioned but not all of them. For example, it doesn't support pausing/canceling of a Job, exponential backoff of failed functions, caching of function results out of the box. Although these may be implemented inside the ExecutionHandler and ExecutionRuntime.
Similarly viewing batch/job states and notification if a batch/job fails can be implemented with FlowEventHandler.

With all that, Faas-flow SDK stays the same. We can make one Openfaas Batch Management System by packaging all the implementation related to Batch Execution in Openfaas in a function Template, and provide a simpler wrapper on top of the core SDK.

Currently, we have only one implementation as Faas-Flow OpenFaaS template. Where Operations is a function call + some, ExecutionHandler handles the calls to function (we can add caching here), ExecutionRuntime utilizes OpenFaaS (pausing/canceling can be implemented here). FlowEventHandler is implemented with OpenTracing. And we allow the user to define StateStore and DataStore. Now we may restrict the user from choosing different StateStore and DataStore for a batch execution.

Now coming to Faas-flow-tower. Initially, it's only meant to support monitoring of OpenFaaS function composition with Faas-flow OpenFaaS template. We can add more functionalities for Batch Jobs Management and better monitoring. At the same time, it possible to provide a fixed stack that includes something like:

  1. Jaguar as Monitoring Backend.
  2. Minio for Data Storage
  3. ETCD for Coordination Controller

I went very lengthy here, let me know your thoughts on the same.

s8sg avatar Jul 30 '19 02:07 s8sg

Thanks for giving such a detailed response @s8sg ^_^

In short, the SDK supposed to provide extensibility. Operation implements any operations. The Executor uses ExecutionHandler which handle operation execution and ExecutionRuntime which implements the runtime. StateStore implements execution coordinator backend in a distributed ExecutionRuntime and DataStore implements the intermediate data storage. EventHandler handles the flow event with the monitoring backend.

Thanks for the great explanation of where faas-flow is progressing to!

Currently, we have only one implementation as Faas-Flow OpenFaaS template. Where Operations is a function call + some, ExecutionHandler handles the calls to function (we can add caching here), ExecutionRuntime utilizes OpenFaaS (pausing/canceling can be implemented here). FlowEventHandler is implemented with OpenTracing. And we allow the user to define StateStore and DataStore. Now we may restrict the user from choosing different StateStore and DataStore for a batch execution.

Based on your explanation of where faas-flow is going + your above explanation. It seems like much of my asks/general asks for a batch management platform are almost there.

I think a minimal viable batch management system would let you view the status of a batch + the jobs within a batch, cancel a batch/job and maybe retrying a job (this may be tough to define when one should restart a job and from where).

  • status viewing: this seems not too difficult, flows already push their state to jaguar. Maybe when a user invokes a function they could pass a flow-batch-id which can be included in the StateStore. Functions can be grouped on that key in the faas-flow-tower UI. Functions could be queued in a faas-flow-queue service to know where in the queue a job/batch is.

  • canceling: when a ExecutionHandler is about to execute an Operation it can check the StateStore to see if the job has been canceled. The user can cancel a job by doing some HTTP request either to faas-flow-tower or some faas-flow-job-server-thing which updates the StateStore accordingly. This model would only be able to cancel a job when its transitioning an edge in a DAG but that should be okay.

  • retrying: in a sync world using the of-watchdog runtime this may not be too bad. If the ExecutionHandler receives a none 2xx response from a function it could try again (I believe it already does this) and after lets say 5 times it could give up mark the job as Failed in the StateStore where a user could manually restart the entire flow at a later date. Not sure how this would work in an async world.

I think batchs will be a common use case of OpenFaas and these are some good first steps into providing that :) Let me know what you think

kwojcicki avatar Jul 31 '19 22:07 kwojcicki

@kwojcicki Few questions

status viewing:

  1. currently for each request Faas-flow executor generate a ReqId for the whole flow. Can the same be used for flow-batch-id?
  2. Functions can be grouped on that key in the faas-flow-tower UI.
    Does that mean listing and checking the Status for each function in a batch by batch id ?
  3. Functions could be queued in a faas-flow-queue service to know where in the queue a job/batch is. Not sure what exactly the reason for queuing here ?

Canceling

  1. When a ExecutionHandler is about to execute an Operation it can check the StateStore to see if the job has been canceled. Here the check part is implemented, although for canceling in explicitly one need to know how to handle the statestore. One option is providing the cancel logic inside Executor in sdk. Then we can add an API to faas-flow so that once can cancel a request by request-ID by just calling the flow function.

Retrying

  1. it could try again (I believe it already does this) and after lets say 5 times it could give up mark the job as Failed in the StateStore where a user could manually restart the entire flow at a later date This is not implemented, there a issue open #73
  2. Not sure how this would work in an async world Operation retry happen only in sync. We may add a Node retry option. In that case the whole Node will be retried again asynchronously.

s8sg avatar Aug 02 '19 04:08 s8sg

status viewing:

1. currently for each request Faas-flow executor generate a `ReqId` for the whole flow. Can the same be used for `flow-batch-id`?

I think the best would be if users could provide a flow-batch-id so they themselves can specify function calls x,y,z are in flow-batch-id: "batch1", while function calls q,w,e are in flow-batch-id: "batch2".

2. `Functions can be grouped on that key in the faas-flow-tower UI.`
   Does that mean listing and checking the Status for each function in a batch by batch id ?

Yup so going from my previous example it would be neat if faas-flow-tower would know there are 2 batches and could show a grid showing 2 rows one for each batch. With the possibility that each row is expanded showing the individual job status.

3. `Functions could be queued in a faas-flow-queue service to know where in the queue a job/batch is.`
   Not sure what exactly the reason for queuing here ?

I think having the batch jobs go through a faas-flow-queue may be helping in identifying all batchs. So if I queue 2 batches each with 5000 jobs (ie 10, 000 function calls). When each function call is queued up faas-flow (not necessarily this specific library but some faas-flow-batch component) can put in the state store function with reqid: xyz and batch-id: abc is queued. That way in the UI we have visibility of the functions progress from queued => starting => node 1 => node 2 => .... node-x => finished.

Canceling

1. `When a ExecutionHandler is about to execute an Operation it can check the StateStore to see if the job has been canceled.`
   Here the check part is implemented, although for canceling in explicitly one need to know how to handle the statestore.

By saying "one need to know how to handle the statestore" do you mean how to know when a function is canceled or what should be into the statestore after a flow has received a signal to cancel itself and then successfully canceled itself.

One option is providing the cancel logic inside Executor in sdk. Then we can add an API to faas-flow so that once can cancel a request by request-ID by just calling the flow function.

I assume when you say call the flow function you mean call some component that updates the statestore not the actual function endpoint itself because if you call the function endpoint that would spawn another faas-flow I believe?

Retrying

1. `it could try again (I believe it already does this) and after lets say 5 times it could give up mark the job as Failed in the StateStore where a user could manually restart the entire flow at a later date`
   This is not implemented, there a issue open #73

2. `Not sure how this would work in an async world`
   Operation retry happen only in sync. We may add a Node retry option. In that case the whole Node will be retried again asynchronously.

kwojcicki avatar Aug 02 '19 15:08 kwojcicki

Great inputs. Let's have a separate issue for each of the items. And I'm not very experienced in UI, so PR's will be great

For queuing
let's assume each Job is a Node which execute a function. And batch is a flow. When a request comes the flow dynamically decide how many job to execute (with foreach) and executes them.
When I'm sending requests out to 5000 Nodes asynchronously, by default the execution request gets queued to Nats. I don't know the limit, but it configurable. I'm thinking do we still need another external queueing solution.
In the new sdk, before putting it in queue, executor will emit a monitoring event for next node execution. Which will have the next Node Id and request Id.

ReportExecutionForward(nodeId string, requestId string)

We can handle the call in template and that information in monitoring backend, for that we need changes in the template.

I assume when you say call the flow function you mean call some component that updates the statestore not the actual function endpoint itself because if you call the function endpoint that would spawn another faas-flow I believe?

Not necessarily. We can call a faas-flow with different params and handle based on the given params. Currently we do the same for getting flow definition from function generate_dag=true. Similarly we can call the flow function with

stop_request=<req_id>

s8sg avatar Aug 02 '19 16:08 s8sg

@kwojcicki I'm Considering to have a separate template as faasflow-of-batch or similar. Which will provide the default StateStore and DataStore making it much easier to get started with.

As the SDK is refactored it makes much more sense to have separate use-case specific templates. I may put some input during this week.

s8sg avatar Aug 06 '19 06:08 s8sg

i have some questions about batch jobs. For example i have some flow that have 20 nodes to execute one operation for each node. First i need some upper bound to say how much concurrently running jobs i can have. Buffered channel is not good, because i need to put jobs to db first. Second i need some locking for concurrent running jobs on the same resource. For example i have job that affects resource with uuid = 1, and now someone pushes job for update this resource with uuid = 1. Second job needs to wait first to complete before start. How best this can be inmlemented with faas-flow sdk ? If another job running i need to suspend current execution...

vtolstov avatar Sep 14 '19 21:09 vtolstov