dolphinscheduler
dolphinscheduler copied to clipboard
[DSIP-22][TriggerPlugin] Introduce Trigger Plugin
Search before asking
- [X] I had searched in the issues and found no similar feature requirement.
Description
After seeing all local & large distributed job-scheduling framework, I found that we may leverage our excellent plugin framwork design and consider extending a key process in job-scheduling ---- Trigger, which allows user to defines their custom trigger in distributed runtime environment.
Apparently quartz has considered this situation, which is a part of master scheduler, but not extend into distributed executing semantics. seeing https://www.quartz-scheduler.org/documentation/quartz-2.1.7/tutorials/tutorial-lesson-04.html
In distributed job-scheduling environment, we could aligned all trigger into a event trigger.
Use case
- [Use case 1: TimerTrigger] This is common case. which is currently widely supported by dophinscheduler and all other job-scheduling framework, which event is signal event from timer.
- [Use case 2: MQTrigger] for example, after receive a message with some pattern, we should start a new workflow. which event is message from MQ.
- [Use case 3: HTTPTrigger/TCPTrigger/SMTPTrigger] very popular. after we make a HTTP call with dophinscheduler-api, we should start a new workflow. even we can trigger pipeline after we receive an email from external 3p team. which event is network call.
- [Use case 4: Combine Streaming & Batch Job] In common scenarios, we often use streaming to do some near-real-time OLAP job, which requires batch job for backfill supplyment in case some unexpected error happens in streaming. which event could be any condition with streaming elements.
Benefits from the best plugin in dolphinscheduler. We should consider to add trigger SPI plugin loaded by master server.
Overview:
┌─────────────────────┬────────────────────┬────────────────────┬────────────────────────┬───────────────────────────────┐
│ │ │ │ │ │
│ UI │ API │ DB │ Registry │ Master │
│ │ │ │ │ │
├─────────────────────┼────────────────────┼────────────────────┼────────────────────────┼───────────────────────────────┤
│ │ │ │ │ │
│ ┌─────────────┐ │ ┌─────────────┐ │ ┌─────────────┐ │ │ ┌─────────────────────────┐ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ User ├───┼──►│ Create ├──┼──►│ Trigger ├──┼──────────Pull──────────┼──► TriggerTaskThreadPool │ │
│ │ │ │ │ Trigger │ │ │ │ │ │ │ │ │
│ └─────┬───────┘ │ └─────────────┘ │ └─────────────┘ │ │ └───────────┬─────────────┘ │
│ │ │ │ │ │ │ │
│ │ │ ┌─────────────┐ │ ┌─────────────┐ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ └─Push──────┼──►│ Request ├──┼──►│ Schedule ◄──┼────────────────────────┼──────────────┘ │
│ │ │ │ │ │ │ │ │ │
│ │ └─────────────┘ │ └─────┬───────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ │ │ │ ┌────────────────────────┐ │
│ │ │ │ │ │ │ │ │
│ │ │ └──────────┼────────────────────────┼───► SchedulerApi │ │
│ │ │ │ │ │ │ │
│ │ │ │ │ └───────────┬────────────┘ │
│ │ │ │ │ │ │
│ │ │ ┌─────────────┐ │ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ Command ◄──┼────────────────────────┼───────────────┘ │
│ │ │ │ │ │ │ │
│ │ │ └─────┬───────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ │ │ │ │
│ │ │ │ │ │ ┌─────────────────────────┐ │
│ │ │ │ │ │ │ │ │
│ │ │ └──────────┼────────────────────────┼─►MasterSchedulerBootStrap │ │
│ │ │ │ │ └─────────────────────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
└─────────────────────┴────────────────────┴────────────────────┴────────────────────────┴───────────────────────────────┘
- All trigger should trigger by Event
- Event Delivery should based either Push or Pull mode.
- Trigger only at the start on the workflow for easy implementation.
Detail Design
Controller Layer
- TriggerController CURD Trigger
Compatibility
- Scheduler has been refactor into scheduler-api partially.
- Relationship of trigger spi & scheduler api
DB Layer
- Create new table
t_ds_trigger
- Leverage current table
t_ds_schedules
- Use SimpleTrigger to execute immediately, so that we can update db accordingly in quartz scheduler implementation.
Related issues
[DSIP-16][Task] Support stream task
Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Good idea, we may need to design the TriggerAPI
and TriggerEvent
structure first.
Now architecture quartz scheduler uses db as Pessimistic Concurrency Control, when job is execute we insert command into DB.
Deep dive if we can leverage quartz interface or we needs to implement ourselves in TriggerService
. If so, we should leverage current distributed lock & master failover mechanism.
We have table t_ds_command
table, all trigger event should transform to command.
Quartz
is an plugin of ScheduleTrigger
, we may have other schedule plugin in the future.
Thanks wenjun for continuous guidance in this thread and very solid architecture design in dolphinscheduler!
These days I've been thinking the relationship of trigger & scheduler service responsibility in new architecture. In my current high-level design.
- host 2 trigger related table: trigger -> trigger instance, which currently may be host by quartz scheduler
- Push: internal signal or http call -> push -> create trigger instance
- Pull: ThreadPool -> pull -> onevent -> create trigger instance
Pull mode is more common and needs resource, I am thinking integrate pull trigger into a special task type, which could be dispatched by master/worker, leverage current task architecture which can master/server failover.
Scheduler: pull trigger instance on this master host -> command
Good idea, we may need to design the TriggerAPI and TriggerEvent structure first.
As you mentioned, I've trying to design triggerAPI in code level. now I am learning previous task/registry/scheduler design & code pattern.
Quartz is an plugin of ScheduleTrigger, we may have other schedule plugin in the future.
I got your point from your scheduler api refactor, but it may be hard to do mult steps. In first stage maybe we can extends quartz scheduler in current architecture design. For long-term fix I may think the relationship & boundary between trigger
, schedule
and command
.
My current design is as below:
┌─────────────────────┬────────────────────┬────────────────────┬────────────────────────┬───────────────────────────────┐
│ │ │ │ │ │
│ UI │ API │ DB │ Registry │ Master │
│ │ │ │ │ │
├─────────────────────┼────────────────────┼────────────────────┼────────────────────────┼───────────────────────────────┤
│ │ │ │ │ │
│ ┌─────────────┐ │ ┌─────────────┐ │ ┌─────────────┐ │ │ ┌─────────────────────────┐ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ User ├───┼──►│ Create ├──┼──►│ Trigger ├──┼──────────Pull──────────┼──► TriggerTaskThreadPool │ │
│ │ │ │ │ Trigger │ │ │ │ │ │ │ │ │
│ └─────┬───────┘ │ └─────────────┘ │ └─────────────┘ │ │ └───────────┬─────────────┘ │
│ │ │ │ │ │ │ │
│ │ │ ┌─────────────┐ │ ┌─────────────┐ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ └─Push──────┼──►│ Request ├──┼──►│ Schedule ◄──┼────────────────────────┼──────────────┘ │
│ │ │ │ │ │ │ │ │ │
│ │ └─────────────┘ │ └─────┬───────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ │ │ │ ┌────────────────────────┐ │
│ │ │ │ │ │ │ │ │
│ │ │ └──────────┼────────────────────────┼───► SchedulerApi │ │
│ │ │ │ │ │ │ │
│ │ │ │ │ └───────────┬────────────┘ │
│ │ │ │ │ │ │
│ │ │ ┌─────────────┐ │ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ Command ◄──┼────────────────────────┼───────────────┘ │
│ │ │ │ │ │ │ │
│ │ │ └─────┬───────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ │ │ │ │
│ │ │ │ │ │ ┌─────────────────────────┐ │
│ │ │ │ │ │ │ │ │
│ │ │ └──────────┼────────────────────────┼─►MasterSchedulerBootStrap │ │
│ │ │ │ │ └─────────────────────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
└─────────────────────┴────────────────────┴────────────────────┴────────────────────────┴───────────────────────────────┘
- API, TriggerController, TriggerService: CURD Trigger, Create new Trigger Instance(which currently is correspond to Schedule/Command in DB)
- Start a
TriggerTaskExecutorThreadPool
in master, which should align with logic task executor- For push trigger, directly accept request from api(or any other signal from master) and directly create trigger_instance into DB
- For pull trigger, it should be handled as a poll task with priortity & timeout. I suggest to put it into master due to we may not wants user create complex IO logic in trigger.
- Scheduler takes trigger instance for scheduling, which could be scheduled by master host & priortity.
We need to define how to assign the trigger to Master, since we use pull
mode, so all Master will pull the trigger from DB, we need to make sure one trigger will only be consumed by one Master.
We need to define how to assign the trigger to Master, since we use
pull
mode, so all Master will pull the trigger from DB, we need to make sure one trigger will only be consumed by one Master.
Thanks for Wenjun's support.
Here's current quartz architecture. DolphinScheduler uses quartz scheduled by timer trigger and insert command into DB. For quartz scheduler, it use exclusive lock while acquiring Triggers and fired them to get JobDetails for executing in hosted threadpool.
Actually it use DB as distributed lock solution.
see https://github.com/quartz-scheduler/quartz/blob/main/quartz/src/main/java/org/quartz/core/QuartzSchedulerThread.java#L291-L381
Currently quartz not share public interface for us for implementation our own trigger mechanism.
Here's the high-level steps we implement this feature:
- (Required) dolphinscheduler-ui & dolphinscheduler-api: add trigger controller: CURD trigger & push trigger implementation by inserting command directly
- (Required) dolphinscheduler-scheduler-plugin: Implement new scheduler plugin
- acquire distributed lock from
RegistryClient
- get triggers from db and wrap them into threadpool
- if trigger success, callback will insert command into DB waiting for picking up to build workflow & processinstance.
- acquire distributed lock from
- (Optional) Consider unifying trigger and task or we just set them working individually.
I am still considering a better solution that we can compatible with quart scheduler in first version, but it seems a bit hard. Luckily ds has few steps on it.
Step #1: create related table schema for review Step #2: create trigger load plugin manager Step #3: backend development, add a test api for local testing, draft api review for testing Step #4: frontend development
Step #1: create related table schema for review Step #2: create trigger load plugin manager Step #3: backend development, add a test api for local testing, draft api review for testing Step #4: frontend development
Is there any progress? ^_^
- master fetch
t_ds_trigger_definition
table for trigger - execute trigger, if it meets user-custom condition, insert a command into
t_ds_command
, note that this operation needs to be transactional
Here's the question,
currently our command fetching are based on id-based algorithm, the key point is when ProcessInstance is generated, we delete related command record.
https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java#L299
but we should not delete t_ds_trigger_definition
in this scenario.
One solution is we add one more table to record offset for fetching, once master is alive, every time master will firstly acquire lock, then fetching t_ds_trigger_definition
table for trigger and execute, finally we update offset into db with trigger_instance within one transaction.
for more gracefully, we can do it in consistent-hash way. just like memcached & brpc. https://github.com/apache/brpc/blob/master/src/brpc/policy/dynpart_load_balancer.cpp#L83consider to its complexity, I will use method #1 for implementation.
- master fetch
t_ds_trigger_definition
table for trigger- execute trigger, if it meets user-custom condition, insert a command into
t_ds_command
, note that this operation needs to be transactionalHere's the question, currently our command fetching are based on id-based algorithm, the key point is when ProcessInstance is generated, we delete related command record. https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java#L299 but we should not delete
t_ds_trigger_definition
in this scenario.One solution is we add one more table to record offset for fetching, once master is alive, every time master will firstly acquire lock, then fetching
t_ds_trigger_definition
table for trigger and execute, finally we update offset into db with trigger_instance within one transaction.for more gracefully, we can do it in consistent-hash way. just like memcached & brpc. https://github.com/apache/brpc/blob/master/src/brpc/policy/dynpart_load_balancer.cpp#L83consider to its complexity, I will use method #1 for implementation.
@ruanwenjun Can you help guide this design?