dolphinscheduler icon indicating copy to clipboard operation
dolphinscheduler copied to clipboard

[DSIP-22][TriggerPlugin] Introduce Trigger Plugin

Open pegasas opened this issue 1 year ago • 10 comments

Search before asking

  • [X] I had searched in the issues and found no similar feature requirement.

Description

After seeing all local & large distributed job-scheduling framework, I found that we may leverage our excellent plugin framwork design and consider extending a key process in job-scheduling ---- Trigger, which allows user to defines their custom trigger in distributed runtime environment.

Apparently quartz has considered this situation, which is a part of master scheduler, but not extend into distributed executing semantics. seeing https://www.quartz-scheduler.org/documentation/quartz-2.1.7/tutorials/tutorial-lesson-04.html

In distributed job-scheduling environment, we could aligned all trigger into a event trigger.

Use case

  • [Use case 1: TimerTrigger] This is common case. which is currently widely supported by dophinscheduler and all other job-scheduling framework, which event is signal event from timer.
  • [Use case 2: MQTrigger] for example, after receive a message with some pattern, we should start a new workflow. which event is message from MQ.
  • [Use case 3: HTTPTrigger/TCPTrigger/SMTPTrigger] very popular. after we make a HTTP call with dophinscheduler-api, we should start a new workflow. even we can trigger pipeline after we receive an email from external 3p team. which event is network call.
  • [Use case 4: Combine Streaming & Batch Job] In common scenarios, we often use streaming to do some near-real-time OLAP job, which requires batch job for backfill supplyment in case some unexpected error happens in streaming. which event could be any condition with streaming elements.

Benefits from the best plugin in dolphinscheduler. We should consider to add trigger SPI plugin loaded by master server.

Overview:

┌─────────────────────┬────────────────────┬────────────────────┬────────────────────────┬───────────────────────────────┐
│                     │                    │                    │                        │                               │
│         UI          │        API         │         DB         │       Registry         │        Master                 │
│                     │                    │                    │                        │                               │
├─────────────────────┼────────────────────┼────────────────────┼────────────────────────┼───────────────────────────────┤
│                     │                    │                    │                        │                               │
│   ┌─────────────┐   │   ┌─────────────┐  │   ┌─────────────┐  │                        │  ┌─────────────────────────┐  │
│   │             │   │   │             │  │   │             │  │                        │  │                         │  │
│   │    User     ├───┼──►│  Create     ├──┼──►│   Trigger   ├──┼──────────Pull──────────┼──► TriggerTaskThreadPool   │  │
│   │             │   │   │  Trigger    │  │   │             │  │                        │  │                         │  │
│   └─────┬───────┘   │   └─────────────┘  │   └─────────────┘  │                        │  └───────────┬─────────────┘  │
│         │           │                    │                    │                        │              │                │
│         │           │   ┌─────────────┐  │   ┌─────────────┐  │                        │              │                │
│         │           │   │             │  │   │             │  │                        │              │                │
│         └─Push──────┼──►│  Request    ├──┼──►│  Schedule   ◄──┼────────────────────────┼──────────────┘                │
│                     │   │             │  │   │             │  │                        │                               │
│                     │   └─────────────┘  │   └─────┬───────┘  │                        │                               │
│                     │                    │         │          │                        │                               │
│                     │                    │         │          │                        │   ┌────────────────────────┐  │
│                     │                    │         │          │                        │   │                        │  │
│                     │                    │         └──────────┼────────────────────────┼───►      SchedulerApi      │  │
│                     │                    │                    │                        │   │                        │  │
│                     │                    │                    │                        │   └───────────┬────────────┘  │
│                     │                    │                    │                        │               │               │
│                     │                    │   ┌─────────────┐  │                        │               │               │
│                     │                    │   │             │  │                        │               │               │
│                     │                    │   │  Command    ◄──┼────────────────────────┼───────────────┘               │
│                     │                    │   │             │  │                        │                               │
│                     │                    │   └─────┬───────┘  │                        │                               │
│                     │                    │         │          │                        │                               │
│                     │                    │         │          │                        │                               │
│                     │                    │         │          │                        │ ┌─────────────────────────┐   │
│                     │                    │         │          │                        │ │                         │   │
│                     │                    │         └──────────┼────────────────────────┼─►MasterSchedulerBootStrap │   │
│                     │                    │                    │                        │ └─────────────────────────┘   │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
└─────────────────────┴────────────────────┴────────────────────┴────────────────────────┴───────────────────────────────┘
  • All trigger should trigger by Event
  • Event Delivery should based either Push or Pull mode.
  • Trigger only at the start on the workflow for easy implementation.

Detail Design

Controller Layer

  • TriggerController CURD Trigger

Compatibility

  • Scheduler has been refactor into scheduler-api partially.
  • Relationship of trigger spi & scheduler api

DB Layer

  • Create new table t_ds_trigger
  • Leverage current table t_ds_schedules
  • Use SimpleTrigger to execute immediately, so that we can update db accordingly in quartz scheduler implementation.

Related issues

[DSIP-16][Task] Support stream task

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

pegasas avatar Dec 16 '23 14:12 pegasas

Good idea, we may need to design the TriggerAPI and TriggerEvent structure first.

ruanwenjun avatar Dec 17 '23 05:12 ruanwenjun

image

Now architecture quartz scheduler uses db as Pessimistic Concurrency Control, when job is execute we insert command into DB.

Deep dive if we can leverage quartz interface or we needs to implement ourselves in TriggerService. If so, we should leverage current distributed lock & master failover mechanism.

pegasas avatar Dec 24 '23 12:12 pegasas

We have table t_ds_command table, all trigger event should transform to command.

Quartz is an plugin of ScheduleTrigger, we may have other schedule plugin in the future.

ruanwenjun avatar Dec 26 '23 13:12 ruanwenjun

Thanks wenjun for continuous guidance in this thread and very solid architecture design in dolphinscheduler!

These days I've been thinking the relationship of trigger & scheduler service responsibility in new architecture. In my current high-level design.

  1. host 2 trigger related table: trigger -> trigger instance, which currently may be host by quartz scheduler
  2. Push: internal signal or http call -> push -> create trigger instance
  3. Pull: ThreadPool -> pull -> onevent -> create trigger instance

Pull mode is more common and needs resource, I am thinking integrate pull trigger into a special task type, which could be dispatched by master/worker, leverage current task architecture which can master/server failover.

Scheduler: pull trigger instance on this master host -> command

Good idea, we may need to design the TriggerAPI and TriggerEvent structure first.

As you mentioned, I've trying to design triggerAPI in code level. now I am learning previous task/registry/scheduler design & code pattern.

Quartz is an plugin of ScheduleTrigger, we may have other schedule plugin in the future.

I got your point from your scheduler api refactor, but it may be hard to do mult steps. In first stage maybe we can extends quartz scheduler in current architecture design. For long-term fix I may think the relationship & boundary between trigger, schedule and command.

pegasas avatar Dec 27 '23 05:12 pegasas

My current design is as below:

┌─────────────────────┬────────────────────┬────────────────────┬────────────────────────┬───────────────────────────────┐
│                     │                    │                    │                        │                               │
│         UI          │        API         │         DB         │       Registry         │        Master                 │
│                     │                    │                    │                        │                               │
├─────────────────────┼────────────────────┼────────────────────┼────────────────────────┼───────────────────────────────┤
│                     │                    │                    │                        │                               │
│   ┌─────────────┐   │   ┌─────────────┐  │   ┌─────────────┐  │                        │  ┌─────────────────────────┐  │
│   │             │   │   │             │  │   │             │  │                        │  │                         │  │
│   │    User     ├───┼──►│  Create     ├──┼──►│   Trigger   ├──┼──────────Pull──────────┼──► TriggerTaskThreadPool   │  │
│   │             │   │   │  Trigger    │  │   │             │  │                        │  │                         │  │
│   └─────┬───────┘   │   └─────────────┘  │   └─────────────┘  │                        │  └───────────┬─────────────┘  │
│         │           │                    │                    │                        │              │                │
│         │           │   ┌─────────────┐  │   ┌─────────────┐  │                        │              │                │
│         │           │   │             │  │   │             │  │                        │              │                │
│         └─Push──────┼──►│  Request    ├──┼──►│  Schedule   ◄──┼────────────────────────┼──────────────┘                │
│                     │   │             │  │   │             │  │                        │                               │
│                     │   └─────────────┘  │   └─────┬───────┘  │                        │                               │
│                     │                    │         │          │                        │                               │
│                     │                    │         │          │                        │   ┌────────────────────────┐  │
│                     │                    │         │          │                        │   │                        │  │
│                     │                    │         └──────────┼────────────────────────┼───►      SchedulerApi      │  │
│                     │                    │                    │                        │   │                        │  │
│                     │                    │                    │                        │   └───────────┬────────────┘  │
│                     │                    │                    │                        │               │               │
│                     │                    │   ┌─────────────┐  │                        │               │               │
│                     │                    │   │             │  │                        │               │               │
│                     │                    │   │  Command    ◄──┼────────────────────────┼───────────────┘               │
│                     │                    │   │             │  │                        │                               │
│                     │                    │   └─────┬───────┘  │                        │                               │
│                     │                    │         │          │                        │                               │
│                     │                    │         │          │                        │                               │
│                     │                    │         │          │                        │ ┌─────────────────────────┐   │
│                     │                    │         │          │                        │ │                         │   │
│                     │                    │         └──────────┼────────────────────────┼─►MasterSchedulerBootStrap │   │
│                     │                    │                    │                        │ └─────────────────────────┘   │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
│                     │                    │                    │                        │                               │
└─────────────────────┴────────────────────┴────────────────────┴────────────────────────┴───────────────────────────────┘
  • API, TriggerController, TriggerService: CURD Trigger, Create new Trigger Instance(which currently is correspond to Schedule/Command in DB)
  • Start a TriggerTaskExecutorThreadPool in master, which should align with logic task executor
    • For push trigger, directly accept request from api(or any other signal from master) and directly create trigger_instance into DB
    • For pull trigger, it should be handled as a poll task with priortity & timeout. I suggest to put it into master due to we may not wants user create complex IO logic in trigger.
  • Scheduler takes trigger instance for scheduling, which could be scheduled by master host & priortity.

pegasas avatar Dec 29 '23 07:12 pegasas

We need to define how to assign the trigger to Master, since we use pull mode, so all Master will pull the trigger from DB, we need to make sure one trigger will only be consumed by one Master.

ruanwenjun avatar Jan 15 '24 02:01 ruanwenjun

We need to define how to assign the trigger to Master, since we use pull mode, so all Master will pull the trigger from DB, we need to make sure one trigger will only be consumed by one Master.

Thanks for Wenjun's support.

image

Here's current quartz architecture. DolphinScheduler uses quartz scheduled by timer trigger and insert command into DB. For quartz scheduler, it use exclusive lock while acquiring Triggers and fired them to get JobDetails for executing in hosted threadpool.

Actually it use DB as distributed lock solution.

see https://github.com/quartz-scheduler/quartz/blob/main/quartz/src/main/java/org/quartz/core/QuartzSchedulerThread.java#L291-L381

Currently quartz not share public interface for us for implementation our own trigger mechanism.

Here's the high-level steps we implement this feature:

  • (Required) dolphinscheduler-ui & dolphinscheduler-api: add trigger controller: CURD trigger & push trigger implementation by inserting command directly
  • (Required) dolphinscheduler-scheduler-plugin: Implement new scheduler plugin
    • acquire distributed lock from RegistryClient
    • get triggers from db and wrap them into threadpool
    • if trigger success, callback will insert command into DB waiting for picking up to build workflow & processinstance.
  • (Optional) Consider unifying trigger and task or we just set them working individually.

I am still considering a better solution that we can compatible with quart scheduler in first version, but it seems a bit hard. Luckily ds has few steps on it.

pegasas avatar Jan 17 '24 11:01 pegasas

Step #1: create related table schema for review Step #2: create trigger load plugin manager Step #3: backend development, add a test api for local testing, draft api review for testing Step #4: frontend development

pegasas avatar May 20 '24 05:05 pegasas

Step #1: create related table schema for review Step #2: create trigger load plugin manager Step #3: backend development, add a test api for local testing, draft api review for testing Step #4: frontend development

Is there any progress? ^_^

davidzollo avatar Jul 23 '24 11:07 davidzollo

  • master fetch t_ds_trigger_definition table for trigger
  • execute trigger, if it meets user-custom condition, insert a command into t_ds_command, note that this operation needs to be transactional

Here's the question, currently our command fetching are based on id-based algorithm, the key point is when ProcessInstance is generated, we delete related command record. https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java#L299 but we should not delete t_ds_trigger_definition in this scenario.

One solution is we add one more table to record offset for fetching, once master is alive, every time master will firstly acquire lock, then fetching t_ds_trigger_definition table for trigger and execute, finally we update offset into db with trigger_instance within one transaction.

for more gracefully, we can do it in consistent-hash way. just like memcached & brpc. https://github.com/apache/brpc/blob/master/src/brpc/policy/dynpart_load_balancer.cpp#L83consider to its complexity, I will use method #1 for implementation.

pegasas avatar Aug 24 '24 13:08 pegasas

  • master fetch t_ds_trigger_definition table for trigger
  • execute trigger, if it meets user-custom condition, insert a command into t_ds_command, note that this operation needs to be transactional

Here's the question, currently our command fetching are based on id-based algorithm, the key point is when ProcessInstance is generated, we delete related command record. https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java#L299 but we should not delete t_ds_trigger_definition in this scenario.

One solution is we add one more table to record offset for fetching, once master is alive, every time master will firstly acquire lock, then fetching t_ds_trigger_definition table for trigger and execute, finally we update offset into db with trigger_instance within one transaction.

for more gracefully, we can do it in consistent-hash way. just like memcached & brpc. https://github.com/apache/brpc/blob/master/src/brpc/policy/dynpart_load_balancer.cpp#L83consider to its complexity, I will use method #1 for implementation.

@ruanwenjun Can you help guide this design?

davidzollo avatar Aug 31 '24 06:08 davidzollo