druid icon indicating copy to clipboard operation
druid copied to clipboard

Scheduled batch supervisor

Open abhishekrb19 opened this issue 1 year ago • 3 comments

This change introduces a scheduled batch supervisor in Druid. The supervisor periodically wakes up to submit an MSQ ingest query, allowing users to automate batch ingestion directly within Druid. Think of it as simple batch task workflows natively integrated into Druid, though it doesn't replace more sophisticated workflow management systems like Apache Airflow. This is an experimental feature.

Summary of changes:

The scheduled_batch supervisor can be configured as follows:

{
    "type": "scheduled_batch",
    "schedulerConfig": {
        "type": "unix",
        "schedule": "*/5 * * * *"
    },
    "spec": {
        "query": "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY DAY"
    },
    "suspended": false
}

The supervisor will submit the REPLACE sql query repeatedly every 5 minutes. The supervisor supports two types of cron scheduler configurations:

  1. Unix cron syntax:
    • Type must be set to unix.
    • Follows the standard Unix cron format, e.g., */5 * * * * to schedule the SQL task every 5 minutes.
    • Supports macro expressions such as @daily, @hourly, @monthly, etc.
  2. Quartz cron syntax:
    • Type must be set to quartz.
    • Offers more flexibility and control for scheduling tasks.
    • Example: 0 0 0 ? 3,6,9,12 MON-FRI to schedule tasks at midnight on weekdays during March, June, September, and December.

Key points:

  • User can specify the query along with any context in the spec. This structure is identical to what the MSQ task engine accepts.
  • Currently, the batch supervisor will repeatedly submit the DML SQL present in the spec as-is on its schedule.
  • This can be useful for recurring batch ingestion or reindexing tasks.
  • There is no parameterization or "change detection" support for input sources yet, but some simple mechanisms are planned for the future.
  • Users can configure multiple scheduled batch supervisors for the same datasource.
  • Only DML ingestion queries are currently supported by the scheduled batch supervisor.
  • Users can suspend, resume and terminate the scheduled batch supervisor. Suspending the supervisor stops new tasks from being scheduled.

Some implementation details:

  • The indexing-service module now depends on the druid-sql module. This allows the scheduled batch supervisor, running on the Overlord, to communicate with the Broker to: a. Validate and parse the user-specified query. b. Submit MSQ queries to the /druid/v2/sql/task/ endpoint.
  • A ScheduledBatchScheduler is injected in the Overlord, which is responsible for scheduling and unscheduling all scheduled batch instances.
  • A BrokerClient implementation has been added, leveraging the ServiceClient functionality.
  • The SqlTaskStatus and its unit test SqlTaskStatusTest have been moved from the msq module to the sql module so it can be reused by the BrokerClient implementation in the sql module.
  • Added ExplainPlanInformation class, which is used to deserialize the explain plan response from the Broker.

The status API response for the supervisor contains the scheduler state along with active and completed tasks:

curl -X GET http://localhost:8888/druid/indexer/v1/supervisor/scheduled_batch__foo__3a916bcf-060d-4a42-aab6-904ff61d25cd/status
{
  "supervisorId": "scheduled_batch__foo__3a916bcf-060d-4a42-aab6-904ff61d25cd",
  "status": "SCHEDULER_RUNNING",
  "lastTaskSubmittedTime": "2024-10-18T17:35:00.000Z",
  "nextTaskSubmissionTime": "2024-10-18T17:40:00.000Z",
  "timeUntilNextTaskSubmission": "PT122.431S",
  "activeTasks": {
    "query-6a6f14e1-809a-4832-8b71-7128a23165e9": {
      "id": "query-6a6f14e1-809a-4832-8b71-7128a23165e9",
      "status": "RUNNING",
      "duration": -1,
      "errorMsg": null,
      "location": {
        "host": null,
        "port": -1,
        "tlsPort": -1
      }
    }
  },
  "completedTasks": {
    "query-c38c8c09-c101-45a1-a6d8-3dc64ce16ad3": {
      "id": "query-c38c8c09-c101-45a1-a6d8-3dc64ce16ad3",
      "status": "SUCCESS",
      "duration": 503,
      "errorMsg": null,
      "location": {
        "host": null,
        "port": -1,
        "tlsPort": -1
      }
    },
    "query-6badb07f-6ed4-49ce-bb70-8a13be53876b": {
      "id": "query-6badb07f-6ed4-49ce-bb70-8a13be53876b",
      "status": "SUCCESS",
      "duration": 518,
      "errorMsg": null,
      "location": {
        "host": null,
        "port": -1,
        "tlsPort": -1
      }
    }
  }
}

TODOs in this PR:

  • Track metrics of submitted/active and completed tasks per supervisor.
  • Add javadocs for the new classes
  • Clean up some interfaces.
  • More test coverage.

Future Improvements:

  • Scheduler State Persistence: This feature doesn't yet persist the scheduler state. Adding persistence will improve robustness (e.g., preventing missed jobs during Overlord restarts).
  • Task Limits: There is currently no limit on the maximum number of tasks allowed by the scheduled batch supervisor. We may want to consider this in the future.
  • The Supervisors page in the Druid web-console should change a bit to accommodate this change (more broadly non-streaming supervisors).

This PR has:

  • [x] been self-reviewed.
    • [x] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • [ ] added documentation for new or modified features or behaviors.
  • [ ] a release note entry in the PR description.
  • [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • [x] added or updated version, license, or notice information in licenses.yaml
  • [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • [ ] added integration tests.
  • [x] been tested in a test Druid cluster.

abhishekrb19 avatar Oct 15 '24 17:10 abhishekrb19

This sounds exciting @abhishekrb19 ! Sorry I haven't reviewed the code yet but are there extension points to modify/add the ingest query during each supervisor run? Or will the same ingest query be executed during every run?

a2l007 avatar Oct 17 '24 19:10 a2l007

I have updated the PR description to include details on the design and implementation. While the TODOs noted in the summary still need to be addressed, the changes are generally ready for review.

abhishekrb19 avatar Oct 18 '24 18:10 abhishekrb19

This sounds exciting @abhishekrb19 ! Sorry I haven't reviewed the code yet but are there extension points to modify/add the ingest query during each supervisor run? Or will the same ingest query be executed during every run?

@a2l007, thanks for taking a look! Yeah, the same ingest query will be repeatedly submitted during the supervisor's scheduled runs. We plan to add simple change detection or templating mechanisms for some common use cases, which will be introduced in a future patch.

abhishekrb19 avatar Oct 18 '24 18:10 abhishekrb19

@kfaraz, thank you for the reviews! I believe I have addressed and/or responded to all of your comments.

abhishekrb19 avatar Feb 10 '25 09:02 abhishekrb19

@abhishekrb19 , the changes look good to me. You may proceed with the merge.

When you get the time though, please take another look at this comment https://github.com/apache/druid/pull/17353#discussion_r1949384263 If you feel that it's valid, it can be addressed in a follow up PR.

kfaraz avatar Feb 11 '25 03:02 kfaraz

Sounds good, just responded in the comment inline

abhishekrb19 avatar Feb 11 '25 06:02 abhishekrb19