payload icon indicating copy to clipboard operation
payload copied to clipboard

feat: adds jobs queue

Open jmikrut opened this issue 1 year ago • 2 comments

Adds a jobs queue to Payload.

TODO:

  • [x] Docs, w/ examples for Vercel Cron, additional services
  • [x] Type the job using GeneratedTypes in JobRunnerArgs (@AlessioGr)
  • [x] Write the runJobs function
  • [x] Allow for some type of payload.runTask
  • [x] Open up a new bin script for running jobs
  • [x] Determine strategy for runner endpoint to either await jobs successfully or return early and stay open until job work completes (serverless ramifications here)
  • [x] Allow for job runner to accept how many jobs to run in one invocation
  • [x] Make a Payload local API method for creating a new job easily (payload.createJob) or similar which is strongly typed (@AlessioGr)
  • [x] Make payload.runJobs or similar (@AlessioGr)
  • [x] Write tests for retrying up to max retries for a given step
  • [x] Write tests for dynamic import of a runner
  • ~~Add a runJobsOnAdminLoad which will run any jobs that are found when the admin UI loads, for cases where cron is not available. This is similar to what WP does~~ No

Feature planning 9/27/24 outcomes

The shape of the config should permit the definition of steps separate from the job workflows themselves.

const config = {
  // Not sure if we need this property anymore
  queues: {
  },
  // A job is an instance of a workflow, stored in DB
  // and triggered by something at some point
  jobs: {
    // Be able to override the jobs collection
    collectionOverrides: () => {},

    // Workflows are groups of tasks that handle
    // the flow from task to task.
    // When defined on the config, they are considered as predefined workflows
    // BUT - in the future, we'll allow for UI-based workflow definition as well.
    workflows: [
      {
        slug: 'job-name',
        // Temporary name for this
        // should be able to pass function 
        // or path to it for Node to dynamically import
        controlFlowInJS: '/my-runner.js',

        // Temporary name as well
        // should be able to eventually define workflows
        // in UI (meaning they need to be serialized in JSON)
        // Should not be able to define both control flows
        controlFlowInJSON: [
          {
            task: 'myTask',
            next: {
              // etc
            }
          }
        ],

        // Workflows take input
        // which are a group of fields
        input: [
          {
            name: 'post',
            type: 'relationship',
            relationTo: 'posts',
            maxDepth: 0,
            required: true,
          },
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
      },
    ],

    // Tasks are defined separately as isolated functions
    // that can be retried on fail
    tasks: [
      {
        slug: 'myTask',
        retries: 2,
        // Each task takes input
        // Used to auto-type the task func args
        input: [
          {
            name: 'post',
            type: 'relationship',
            relationTo: 'posts',
            maxDepth: 0,
            required: true,
          },
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
        // Each task takes output
        // Used to auto-type the function signature
        output: [
          {
            name: 'success',
            type: 'checkbox',
          }
        ],
        onSuccess: () => {},
        onFail: () => {},
        run: myRunner,
      },
    ]
  }
}

payload.createJob

This function should allow for the creation of jobs based on either a workflow (group of tasks) or an individual task.

To create a job using a workflow:

const job = await payload.createJob({
  // Accept the `name` of a workflow so we can match to either a 
  // code-based workflow OR a workflow defined in the DB
  // Should auto-type the input
  workflowName: 'myWorkflow',
  input: {
    // typed to the args of the workflow by name
  }
})

To create a job using a task:

const job = await payload.createJob({
  // Accept the `name` of a task
  task: 'myTask',
  input: {
    // typed to the args of the task by name
  }
})

jmikrut avatar Sep 15 '24 18:09 jmikrut

@jmikrut this is very needed feature. I created a plugin inspired by Woocommerce Action Schedule for payload, but I haven't updated it for the latest version. Now, when job queue will be within payload core, I might not need to finish this plugin :D

https://github.com/mpresecan/payload-action-scheduler

mpresecan avatar Sep 30 '24 13:09 mpresecan

We are using node-schedule in our Next.js app on a self hosted server using PM2 cluster mode. In order to run jobs only once they are always executed on the primary node. The primary node gets selected on startup and lives until its process dies, then a new primary is initialized.

We need a similar mechanism for the Payload job queue. Jobs can be created on any node but have to be synchronized (on database level?) and to be scheduled on the primary node only.

cbratschi avatar Sep 30 '24 16:09 cbratschi

A few things necessary to the state of this PR:

Done: 1. Simplify the changes to payload-types.ts

No need to strongly type the taskStatus virtual field. Not worth it as of now, we can revisit in the future if we want. But very few people will be interacting directly with payload-jobs data, and we can type the input / output within payload.jobs.queue() and similar.

Done: 2. Remove all import-generation tactics in payload-types

We can add it back when necessary but for now we should not use it.

Done: 3. Simplify the types generated:

Before:

  jobs?: {
    tasks: {
      CreateSimple: {
        input: TaskCreateSimpleInput;
        output: TaskCreateSimpleOutput;
      };
    };
    workflows?: {
      updatePost?: {
        input: WorkflowupdatePostInput;
      };
    };
  };

After:

  jobs?: {
    tasks: {
      CreateSimple: TaskcreateSimple;
    };
    workflows?: {
      updatePost?: WorkflowupdatePost;
    };
  };

In this scenario, TaskcreateSimple will be a type with input and output keys. Same with workflows.

Done: 4. Add a mechanism for naming types properly

Clearly, the type names above are nasty with the camelCase situation going on.

We need a way to name these auto-generated types in a better way. Can we automate it, and then allow someone to override how it works? If automated, would we just uppercase the first letter, if it were lowercase? And if the person didn't like it, they could use interfaceName or similar?

Options for auto-naming:

  1. Task_createSimple - underscore, take it as it comes
  2. TaskCreateSimple - capitalize first letter regardless - best option at the moment
  3. createSimpleTask - less ideal, types should be capitalized out of convention, etc

Done: 5. Simplify folder structure

We really only have one operation right now for jobs. But the operations folder is Flarped.

I'd suggest flattening / simplifying as much as we can there. Will defer to you on specifics.

jmikrut avatar Oct 16 '24 16:10 jmikrut

🚀 This is included in version v3.0.0-beta.121

github-actions[bot] avatar Oct 30 '24 18:10 github-actions[bot]

@cbratschi I start node-schedule's cron task in instrumentation.ts and add jobs to BullMQ queue.

smoothdvd avatar Oct 31 '24 01:10 smoothdvd

@cbratschi I start node-schedule's cron task in instrumentation.ts and add jobs to BullMQ queue.

We are using node-schedule too in instrumentation.ts and bind it to the primary PM2 node. I had a look at Payload's implementation and it should be easily possible to bind it to a single node too. Payload's workflow and tasks are probably better than using node-schedule. In instrumentation.ts we could use a timer to trigger the Payload task handling. Will check it next week after the Payload 3.x release, right now still on Payload 2.x.

cbratschi avatar Oct 31 '24 10:10 cbratschi