optimus icon indicating copy to clipboard operation
optimus copied to clipboard

Support for on-demand execution of jobs using its own scheduler

Open kushsharma opened this issue 3 years ago • 3 comments

Is your feature request related to a problem? Please describe. At the moment, Optimus works just as an orchestrator that transpiles basic job instructions to Airflow parsable dags. It doesn't have anything to execute jobs of its own. Although Airflow is pretty stable and used widely, it is build as a generic job executor. I believe we can do a lot more if we have our own scheduling & execution engine. But for now we will start with having just the execution engine. Note: I am not suggesting to deprecate the use of Airflow, this will work in tandem to enhance the existing workflow.

Describe the solution you'd like I have already decided to call this prime(long back) and it will be used as follows:

  • User should be able to simply request a run on demand via optimus job run <job-name> and the job should start executing.
  • Optimus API will allow any external service to execute on demand jobs instead of scheduling them via airflow if needed.
  • Users should be able to leverage on-demand execution for adhoc operations like DML queries, one time syncs, cleanups, etc.
  • User should be able to run complete pipeline of jobs end-to-end, this is when a job is dependent on a different job and so on.
  • Running a whole pipeline will allow users to write tests for data pipeline which can execute in different execution environment then the production similar to how we write integration tests for code.
  • Running a pipeline will allow replay execution to be done on Optimus clusters.
  • Optimus should work as a distributed cluster and from the start should be capable of scaling on-demand horizontally.
  • Optimus should be able to run as highly-available(except postgres db).
  • No extra moving parts, like airflow has Redis, Celery, etc.
  • Local development of Optimus will be really easy as developers won't be needing anything other than Optimus and a postgres db(we should start supporting sqlite as well to even get rid of postgres.)

Describe alternatives you've considered There are no alternatives to this at the moment.

Additional context We can break this into different milestones as follows:

v1

  • user will be able to run a single job and get the final status
  • job can have multiple hooks, and it should execute everything in that job in correct order
  • we will start with supporting local docker host as the execution engine

v2

  • support for executing set of jobs interdependent on each other
  • proper user commands to interact with optimus server to fetch things like logs of execution.

v3

  • support for kubernetes as the execution engine
  • running replay on optimus cluster

v4

  • support for writing testable pipelines
  • support for own scheduling engine capable of doing everything what airflow can do except UI

v5 (although this will go out of scope of this feature request but wanted to list the vision)

  • get rid of postgres and keep Optimus state in local filesystem to make Optimus truly independent and highly available.

kushsharma avatar Apr 19 '22 16:04 kushsharma

Hey Kush, thank you for helping with work on executor, for now we will concentrate on v1 for the proposal and define the technical and business specifications of the executor.

Outcome for executor:

User should be able to run their jobs manually on optimus and check details specific to the run including

  1. Details for the job run instance
  2. Compiled assets for the job run
  3. Any relevant information like logs to understand and debug job failure

Questions as a user

Where can I see the job run details, including compiled assets ? Can I see the logs, to understand job failures ? Does the run retry some of the failures or it has to be done manually ? How do I get the final status as there is no ui like airflow. Does this support only bq2bq jobs or other jobs also ? Can I check, when was the last manual run ? How can I compare changes with previous run. ? How long can I expect to see my previous logs ?

Technical design of proposal

Can we also discuss the technical solution proposed for the same. How is optimus server interacting with the executor? If we have a better understanding of the interactions we can understand the code better.

Please let us know if you have any questions related to this

sbchaos avatar Apr 26 '22 13:04 sbchaos

Sure, let's get the ball rolling for v1.

Questions as a user

Where can I see the job run details, including compiled assets ?

Job run details should be fetched very similar to how we fetch replay details, via CLI. I don't get what do you mean by compiled assets? Optimus already support asset compilation right?

Can I see the logs, to understand job failures ?

I was thinking we can take this as 2nd revision and for now either user(which I believe is a superuser or developer) should be able to get it manually. We will only return the final job status as v1.

Does the run retry some of the failures or it has to be done manually ?

I don't think we should support retry for now, this looks like an advanced feature which we can take later.

How do I get the final status as there is no ui like airflow.

This is spot on, we should allocate developer bandwidth to work on UI, it's been too long we are working with CLI. For now we will have to work with CLI.

Does this support only bq2bq jobs or other jobs also ?

It should support all the tasks as well as hooks.

Can I check, when was the last manual run ?

Yes, just like replay, we will have these details in db so we should be able to list the status. But as a starter, the UX might not be as smooth as we think it should be.

How can I compare changes with previous run. ?

I didn't get this question, what do you mean by previous run? What I think is each run should be independent for now.

How long can I expect to see my previous logs ?

Again, we will handle this as v2 but it should not be treated as previous run, it should be treated as a run because these are adhoc one-off runs. I think this idea of previous is inherited as a proper scheduler like Airflow, we will make it like Airflow in future revisions but for now let's not compare it. Coming back to your question, I understand managing logs comes with its own challenges, things like retention, storage management, cleanup, etc but I think these can evolve slowly. First draft could simply be fetching the logs from downstream executor, for example if its docker, we will simply query logs from docker, if its kubernetes, we will query logs from kube but depending on downstream services, this method of logs won't be a scalable solution so we will try so sink these logs to external filesystem like s3. I don't believe we need to support some heavy log management in v2, we can either simply start with caching them in db or in locally attached filesystem of node with a retention of 15 days or sink them to s3 if we are okay with introducing another dependency on an external file system.

Technical design

Technical design will evolve slowly with each revision but I can briefly discuss the base implementation. Basic premise of this design is to not have any new hard external dependency and even remove existing dependency like(postgres) in future if needed. Lets discuss it as follows:

Highly available

We can't make Optimus to run as a single node application, I don't think we even recommend doing it at the moment in existing design. There is a load balancer in front and all the calls will be randomly redirected to nodes(randomly means round robin, lowest traffic first, etc doesn't really matter).

Distributed consensus

When we run with more than 1 node, say 3 nodes, we need at least one of them to work as a coordinator. This coordinator will try to accept user request, do the processing and respond back. We can try to make either all the nodes as coordinator or make one coordinator(leader) and rest followers. If we go with first approach, which is to make everyone leader, every request that each of the leader receives, they will work independently and there will be no coordination. Without coordination, requests can get duplicated, we won't be able to know who is doing what. To fix this we can think of sharing a common state between all the leaders and pass this state around after every modification. Of course we will not pass the whole state, we will only pass the delta since last state change but you got the point. This helps in coordination so each node is aware of what the rest of the nodes is doing but we need some kind of locks to not override this state when someone else is reading/writing as this will be done in parallel. So to share this state:

  1. An easy way is to use existing database as a locking system, in this approach we can think of a table as:
request node
execute task 1 node1
execute task 2 node2

Using this table each leader node can figure out who is responsible of doing what and we can use database locks to only modify this table one at a time. Although this approach looks easy, it requires us to introduce a hard dependency on database from the base. Whole cluster will be useless if we there is a db downtime and we won't be able to refactor this dependency without re-writing this consensus from scratch.

  1. Using external dependency like zookeeper, consul, etcd, etc. This is what most application do when they start out, Kubernetes use etcd, Kafka used to use zookeeper, etc. This comes with its own problems, first of all it brakes our base premise of not introducing any new external dependency, second it introduces maintenance overhead of another tooling which needs to run in Highly Availability as well to keep your application running in HA. Kafka rewrote their design to remove zookeeper and do what it was doing on its own because of this reason.

  2. Do what etcd/zookeeper would do in Optimus, which is, having a consensus protocol to distribute this state delta among nodes. When we do this, we will need to change our design from having all leaders to having one leader and rest followers. There are 2 popular consensus algorithms in the wild, paxos and raft. I don't even want to start discussing paxos here because unless you are a research scholar, understanding all the cogs and wheels will take you couple of months and implementing it is a different challenge of its own. The other one is raft, recently introduced in last decade which is easy to read and understand in one evening. So the choice is looking obvious now that raft is the way forward, this is what etcd also uses to manage its distributed state.

Implementation

My design has selected raft to keep Optimus open for future development. We will have a finite state machine in a leader that will apply these delta modification to state and share these changes to all followers. Optimus cluster will be able to self heal. In case a follower dies nothing really changes. In case leader dies, re-election will cause a new leader to be selected. This state will be stored on local file system of each node. You will notice I do use db to update the final states of job instances but it is only modified by cluster leader so we have kept our dependency to minimum which allows us to make Optimus truly independent if needed.

To explain the flow, when a new request is received to run a job it will be stored in database. This database will be accessed by cluster leader and it will assign this job to one of the cluster node to actually execute. Followers will work as worker node(leader will also execute the job its just leader has extra responsibility of managing the cluster as well) to execute the job using one of the many configured executor. Once the instance state of a job run is changed, the requested is broadcasted^1 to all the nodes(including leader) and leader will update the state in db/raft log/etc.

Note: broadcasted^1: we can have a proper discussion on this as well why is it broadcasted? Why don't we directly send this to leader? Why don't we directly modify this in db? etc.

Although basic queries like job status can be answered by any node as they will be kept in db as well as raft log, things like logs of running instance can only be answered by the worker node which is executing the instance at that time(unless we sync these logs at real time to some external filesystem).

@sbchaos We can discuss all of this in more detail on call if you want. Feel free to ask more questions. @ravisuhag @sravankorumilli I think we scheduled a community hour meeting of ODPF sometime back, correct me if I am wrong but I don't see that happening, is it still up?

kushsharma avatar Apr 27 '22 07:04 kushsharma

@kushsharma We will start the meetup from this month.

ravisuhag avatar May 01 '22 09:05 ravisuhag