Scheduled batch supervisor
This change introduces a scheduled batch supervisor in Druid. The supervisor periodically wakes up to submit an MSQ ingest query, allowing users to automate batch ingestion directly within Druid. Think of it as simple batch task workflows natively integrated into Druid, though it doesn't replace more sophisticated workflow management systems like Apache Airflow. This is an experimental feature.
Summary of changes:
The scheduled_batch supervisor can be configured as follows:
{
"type": "scheduled_batch",
"schedulerConfig": {
"type": "unix",
"schedule": "*/5 * * * *"
},
"spec": {
"query": "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY DAY"
},
"suspended": false
}
The supervisor will submit the REPLACE sql query repeatedly every 5 minutes. The supervisor supports two types of cron scheduler configurations:
- Unix cron syntax:
- Type must be set to
unix. - Follows the standard Unix cron format, e.g.,
*/5 * * * *to schedule the SQL task every 5 minutes. - Supports macro expressions such as
@daily,@hourly,@monthly, etc.
- Type must be set to
- Quartz cron syntax:
- Type must be set to
quartz. - Offers more flexibility and control for scheduling tasks.
- Example:
0 0 0 ? 3,6,9,12 MON-FRIto schedule tasks at midnight on weekdays during March, June, September, and December.
- Type must be set to
Key points:
- User can specify the
queryalong with any context in thespec. This structure is identical to what the MSQ task engine accepts. - Currently, the batch supervisor will repeatedly submit the DML SQL present in the
specas-is on its schedule. - This can be useful for recurring batch ingestion or reindexing tasks.
- There is no parameterization or "change detection" support for input sources yet, but some simple mechanisms are planned for the future.
- Users can configure multiple scheduled batch supervisors for the same datasource.
- Only DML ingestion queries are currently supported by the scheduled batch supervisor.
- Users can suspend, resume and terminate the scheduled batch supervisor. Suspending the supervisor stops new tasks from being scheduled.
Some implementation details:
- The
indexing-servicemodule now depends on thedruid-sqlmodule. This allows the scheduled batch supervisor, running on the Overlord, to communicate with the Broker to: a. Validate and parse the user-specified query. b. Submit MSQ queries to the/druid/v2/sql/task/endpoint. - A
ScheduledBatchScheduleris injected in the Overlord, which is responsible for scheduling and unscheduling all scheduled batch instances. - A
BrokerClientimplementation has been added, leveraging theServiceClientfunctionality. - The
SqlTaskStatusand its unit testSqlTaskStatusTesthave been moved from the msq module to the sql module so it can be reused by the BrokerClient implementation in the sql module. - Added
ExplainPlanInformationclass, which is used to deserialize the explain plan response from the Broker.
The status API response for the supervisor contains the scheduler state along with active and completed tasks:
curl -X GET http://localhost:8888/druid/indexer/v1/supervisor/scheduled_batch__foo__3a916bcf-060d-4a42-aab6-904ff61d25cd/status
{
"supervisorId": "scheduled_batch__foo__3a916bcf-060d-4a42-aab6-904ff61d25cd",
"status": "SCHEDULER_RUNNING",
"lastTaskSubmittedTime": "2024-10-18T17:35:00.000Z",
"nextTaskSubmissionTime": "2024-10-18T17:40:00.000Z",
"timeUntilNextTaskSubmission": "PT122.431S",
"activeTasks": {
"query-6a6f14e1-809a-4832-8b71-7128a23165e9": {
"id": "query-6a6f14e1-809a-4832-8b71-7128a23165e9",
"status": "RUNNING",
"duration": -1,
"errorMsg": null,
"location": {
"host": null,
"port": -1,
"tlsPort": -1
}
}
},
"completedTasks": {
"query-c38c8c09-c101-45a1-a6d8-3dc64ce16ad3": {
"id": "query-c38c8c09-c101-45a1-a6d8-3dc64ce16ad3",
"status": "SUCCESS",
"duration": 503,
"errorMsg": null,
"location": {
"host": null,
"port": -1,
"tlsPort": -1
}
},
"query-6badb07f-6ed4-49ce-bb70-8a13be53876b": {
"id": "query-6badb07f-6ed4-49ce-bb70-8a13be53876b",
"status": "SUCCESS",
"duration": 518,
"errorMsg": null,
"location": {
"host": null,
"port": -1,
"tlsPort": -1
}
}
}
}
TODOs in this PR:
- Track metrics of submitted/active and completed tasks per supervisor.
- Add javadocs for the new classes
- Clean up some interfaces.
- More test coverage.
Future Improvements:
- Scheduler State Persistence: This feature doesn't yet persist the scheduler state. Adding persistence will improve robustness (e.g., preventing missed jobs during Overlord restarts).
- Task Limits: There is currently no limit on the maximum number of tasks allowed by the scheduled batch supervisor. We may want to consider this in the future.
- The Supervisors page in the Druid web-console should change a bit to accommodate this change (more broadly non-streaming supervisors).
This PR has:
- [x] been self-reviewed.
- [x] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [ ] a release note entry in the PR description.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [x] added or updated version, license, or notice information in licenses.yaml
- [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
- [ ] added integration tests.
- [x] been tested in a test Druid cluster.
This sounds exciting @abhishekrb19 ! Sorry I haven't reviewed the code yet but are there extension points to modify/add the ingest query during each supervisor run? Or will the same ingest query be executed during every run?
I have updated the PR description to include details on the design and implementation. While the TODOs noted in the summary still need to be addressed, the changes are generally ready for review.
This sounds exciting @abhishekrb19 ! Sorry I haven't reviewed the code yet but are there extension points to modify/add the ingest query during each supervisor run? Or will the same ingest query be executed during every run?
@a2l007, thanks for taking a look! Yeah, the same ingest query will be repeatedly submitted during the supervisor's scheduled runs. We plan to add simple change detection or templating mechanisms for some common use cases, which will be introduced in a future patch.
@kfaraz, thank you for the reviews! I believe I have addressed and/or responded to all of your comments.
@abhishekrb19 , the changes look good to me. You may proceed with the merge.
When you get the time though, please take another look at this comment https://github.com/apache/druid/pull/17353#discussion_r1949384263 If you feel that it's valid, it can be addressed in a follow up PR.
Sounds good, just responded in the comment inline