worker icon indicating copy to clipboard operation
worker copied to clipboard

Organize tasks in subfolders

Open christiaanwesterbeek opened this issue 2 years ago β€’ 2 comments

In my system the list of tasks is expanding, and I'm looking for a way to visually organize them.

Would you consider the following to target a task hello in folder maintenance?

select graphile_worker.add_job('maintenance/hello', json_build_object('name', 'Bobby Tables'));

christiaanwesterbeek avatar Jun 02 '22 12:06 christiaanwesterbeek

I don't think there's anything preventing you doing this in the current codebase; you would need to use library mode and pass your own task list using these custom identifiers, but I think it would work (unless we have some regexp validation of task identifiers somewhere?)

I'm not super keen to introduce this as automatic behaviour, however - defining the tasks over multiple levels would have knock-on consequences for watch mode, startup time, and more; plus I'm not convinced that the folder structure and the task identifiers should necessarily be linked. That said, I do have a lot of user__foo.js tasks where I use a double underscore for namespacing; a / would be cleaner... I'm going to leave this open to see if it gains any interest from the wider community.

benjie avatar Jun 05 '22 13:06 benjie

I have a suspicion that any serious users are going to want to move to library mode completely, I'd be a fan of keeping CLI mode as simple as possible and showcasing a few recipes for advanced usage in this repo, or another related repo.

skabbes avatar Sep 25 '22 00:09 skabbes

(unless we have some regexp validation of task identifiers somewhere?)

So I have actually implemented storing tasks in Sub folders using library mode (alongside adding type safety) but I have just encountered your suspicion above!

The place that validates the task identifier is in the Cron implementation and specifically when the cron file etc are parsed enforcing only _'s and alphas.

Would you have any objections to widening / removing this check in graphile worker? I know there was an issue recently for graphile starter (that I now can't find) that had a similar request and totally get that starter is editable when you implement... However worker is more a library!

P. S. In my implementation I went for .'s instead of /' s but it has the same effect (user.foo)

JonParton avatar Oct 24 '22 15:10 JonParton

https://github.com/graphile/worker/blob/bad84128d5bcd6c12a7aa38a556971f67c832ca0/src/cronConstants.ts#L31

JonParton avatar Oct 24 '22 15:10 JonParton

(alongside adding type safety)

I assume you mean type un-safety πŸ˜‰ i.e. you are declaring the type that a task accepts even though there's no validation of that on the supplying end (e.g. PostgreSQL triggers) so you cannot be sure that that's the type given? We current use the actual type-safe unknown which forces you to validate that the payload that you receive is actually the one you expect (or cast it if you're being lazy/unsafe). Or do you have a different for of type safety in mind?

Would you have any objections to widening / removing this check in graphile worker?

I have no objection to adding . and / to the regexp. I definitely don't think we should remove the validation though.

benjie avatar Oct 28 '22 08:10 benjie

I assume you mean type un-safety πŸ˜‰

Ha - I think I actually mean type safety 😜...

I'll break out how I am doing things from my monorepo so you can take a proper look / give me any comments on what I am doing.... but I have ended up writing an essay with the key bits below any ways which might be enough πŸ€“.


Originally I was going to suggest a different way of doing things Graphile worker wise and look at implementing in the library but I had read somewhere in your comments that you were hoping to allow worker files being defined in other languages than typescript at some point.... and what I was going to suggest would make that harder ...

Basically I was inspired by tRPC and it's procedure/router concepts, so originally I wanted to defined tasks in a similar no code gen type safe manner ... but to start off with and test how it worked I created a Code Gen prototype.

In effect I have enforced a standard way of defining Job files that I have put into a vscode snippet:

// tasks/test_task.ts
import { Task } from 'graphile-worker'
import { typeCheckPayload } from 'worker/utils'
import { z } from 'zod'

// Always define and export a payloadSchema with this name
// NOTE: Can add much more advanced checking of inputs such as max length, regex, records etc etc using zod 
export const payloadSchema = z.object({
  user: z.object({
    email: z.string().email(), // example of some more advanced validation
  }),
  message: z.string(),
  delay: z.number(),
})

// Always export the Payload type under this name
export type Payload = z.infer<typeof payloadSchema>

function sleep(ms: number) {
  return new Promise((resolve) => {
    setTimeout(resolve, ms)
  })
}

// Always export a Task under the task name (Current graphile worker convention)
export const task: Task = async (inPayload, { job }) => {
  // Enforce payload type from being unknown!
  const payload = typeCheckPayload(payloadSchema, inPayload, job)

  await sleep(payload.delay) // p.s. I know I could just set the job to run at a later time .. but wanted to test speed and long running

  console.log(`${payload.message} from ${payload.user.email}!`)
}

export default task

The typeCheckPayload function used above basically checks the incoming payload (That is rightfully unknown as you pointed out) using the zod Schema and if it doesn't match, it permanently fails the job (As retrying it is a waste πŸ™‚ ) :

import { Job } from 'graphile-worker'
import { permanentlyFailJobs } from 'worker/utils'
import { z } from 'zod'

/**
 * A function that will check the Job Payload against a Zod Schema and
 * permanently fail the job if it received an invalid payload to prevent
 * unwanted retries that will instantly fail!
 * @param payloadSchema A zod schema to check the payload with
 * @param inPayload The raw Payload with unknown type!
 * @param job The current job who's payload is being checked!
 * @returns A type safe version of the Payload
 */
export function typeCheckPayload<T extends z.ZodTypeAny>(
  payloadSchema: T,
  inPayload: unknown,
  job: Job
): z.infer<T> {
  try {
    return payloadSchema.parse(inPayload)
  } catch (err) {
    // It will never work with a bad payload!
    permanentlyFailJobs([job.id], String(err))
    throw err
  }
}

I then have Code gen that collects together any files defined in the /tasks folder, and sub folders, and turns them into a set of input Types and Functions. (Note ... the codegen is where I concatenate identifiers into [subfolder].[subfolder].[task_identifier])

// This file is generated by the script in @app/worker/generate.ts
// Please do not edit directly as it will be overwritten.
import { Payload as emails_SendPayload } from '../tasks/emails/send'
import { Payload as TestTaskPayload } from '../tasks/test_task'

export type JobInputTypes = {
  ['emails.send']: emails_SendPayload
  ['test_task']: TestTaskPayload
}
// This file is generated by the script in @app/worker/generate.ts
// Please do not edit directly as it will be overwritten.
import { task as emails_sendTask } from '../tasks/emails/send'
import { task as testTaskTask } from '../tasks/test_task'

export const generatedTasksObject = {
  ['emails.send']: emails_sendTask,
  ['test_task']: testTaskTask,
}

These generated types are then used to launch Graphile worker in library mode like below:

(Note: I also define the shared worker options in a central file to be used across starting the worker and workUtils.)

import { run, RunnerOptions, SharedOptions } from 'graphile-worker'
import * as path from 'path'
import { generatedTasksObject } from 'worker/generated/taskObject'
import { sharedWorkerOptions } from 'worker/utils/sharedWorkerOptions'

/**
 * Any runner specific options
 *
 * NOTE: Shared options such as the connection string are set up in
 * `./utils/sharedWorkerOptions.ts`
 */
const runnerSpecificOptions: Omit<RunnerOptions, keyof SharedOptions> = {
  concurrency: 5,
  // Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
  noHandleSignals: false,
  pollInterval: 1000,
  crontabFile: `${path.join(__dirname, '..', 'crontab')}`,
  taskList: generatedTasksObject,
}

async function main() {
  // Run a worker to execute jobs:
  const runner = await run({
    ...sharedWorkerOptions,
    ...runnerSpecificOptions,
  })

  // Immediately await (or otherwise handled) the resulting promise, to avoid
  // "unhandled rejection" errors causing a process crash in the event of
  // something going wrong.
  await runner.promise

  // If the worker exits (whether through fatal error or otherwise), the above
  // promise will resolve/reject.
}

main().catch((err) => {
  console.error(err)
  process.exit(1)
})

export {}

I then create wrapped Graphile worker functions that add the types (A generic would be nice here πŸ˜‰ workerUtils.addjob<JobTypes>) that I expose from the monorepo package.

I also handle the singleton'ing of the workerUtils object so calling code doesn't have to care (May not be best practice length of connection wise but all my adding jobs happen in short lived serverless API functions so won't be held open long)

// exposed functions that have been made type safe and pre setup with connection
// strings etc
import {
  makeWorkerUtils,
  SharedOptions,
  TaskSpec,
  WorkerUtils,
  WorkerUtilsOptions,
} from 'graphile-worker'
import { JobInputTypes } from 'worker/generated/JobTypes'
import { sharedWorkerOptions } from 'worker/utils'

// Use a global for the current workerUtils promise so it is treated as a singleton across calls ... basically remove this concern from the place you add jobs and handle it here! (`addJob` vs `addQuickJob`)
const workerUtilsGlobal = global as typeof global & {
  workerUtilsPromise?: Promise<WorkerUtils>
}

const workerUtilsSpecificOptions: Omit<
  WorkerUtilsOptions,
  keyof SharedOptions
> = {
  // None at time of coding!
}

/**
 * This will return the currently connected Raw workerUtils instance.
 *
 * NOTE: This does not have type safety around it unlike other functions exported
 * from this package.
 * @returns the currently connected raw workerUtils!
 */
export const getWorkerUtilsRaw = (): Promise<Omit<WorkerUtils, 'release'>> => {
  if (workerUtilsGlobal.workerUtilsPromise) {
    return workerUtilsGlobal.workerUtilsPromise
  } else {
    workerUtilsGlobal.workerUtilsPromise = makeWorkerUtils({
      ...sharedWorkerOptions,
      ...workerUtilsSpecificOptions,
    })
    return workerUtilsGlobal.workerUtilsPromise
  }
}

/**
 * This is a helper function to allow us to add jobs to the Worker que managed
 * by Graphile Worker and stored in the postgres database.
 * @param identifier The identifier of the Task to be run. These should be
 * defined in the worker/tasks folder.
 * @param payload The payload to be passed to the Task implementation. A type
 * for this should be defined in the Task Implementation file in the worker/tasks folder.
 * @param spec An object of extra options to use when queuing the task.
 * @returns
 */
export async function addWorkerJob<TaskKey extends keyof JobInputTypes>(
  identifier: TaskKey,
  payload: JobInputTypes[TaskKey],
  spec: TaskSpec | undefined = {}
) {
  const workerUtils = await getWorkerUtilsRaw()
  return workerUtils.addJob(identifier, payload, spec)
}

export const permanentlyFailJobs: WorkerUtils['permanentlyFailJobs'] = async (
  ids: string[],
  reason?: string
) => {
  const workerUtils = await getWorkerUtilsRaw()
  return workerUtils.permanentlyFailJobs(ids, reason)
}

// Other worker utils methods could be added...

You can then add a job from any server side code using:

//[..]/pages/api/hello.ts
import type { NextApiRequest, NextApiResponse } from 'next'
import { addWorkerJob } from '@app/worker'

const api = async (req: NextApiRequest, res: NextApiResponse) => {

  //Que an example task... All with intellisenseπŸ₯³
  const job = await addWorkerJob('test_task', {
    user: {
      email: `[email protected]`,
    },
    delay: Math.random() * 10000,
    message: 'Hello from the Worker!',
  })

}

export default api

So all of that combined then gives a nice dev experience when adding jobs (knowing the available identifiers and payload types) as well as handling the type checking within defined tasks (even if they came from triggers)!


Sorry for the essay ... and if you have interest in wanting to play around with it in a code editor I could break out and share like I mentioned at the start πŸ™‚

JonParton avatar Oct 28 '22 12:10 JonParton

Also - I'll take a look at the regex when I get a chance if you want me to do a PR? Or if you think reviewing is more hassle then doing it let me know :)

JonParton avatar Oct 28 '22 12:10 JonParton

Ideally the two character change to the regexp and a single test case as a PR would be great. I'm focussed on PostGraphile V5 development right now, so don't have much time budget for Worker - hopefully things will ease up on that front early next year.

benjie avatar Oct 28 '22 21:10 benjie

Ha - I think I actually mean type safety 😜...

Ah; thanks for sharing! Validating the types with Zod or similar is what I'd recommend, and that you've made it work both ways is great. So many people have asked for the task function to be generic so that they can state what type it is (or similar things) that I tend to assume this is what people are talking about now! I'm not keen to add anything like Zod to Worker directly (less is more), but I'm glad you've got a solution you're happy with.

benjie avatar Oct 28 '22 22:10 benjie

I'm not keen to add anything like Zod to Worker directly (less is more), but I'm glad you've got a solution you're happy with.

Totally understand that! πŸ‘

So many people have asked for the task function to be generic so that they can state what type it is (or similar things) that I tend to assume this is what people are talking about now!

100% on the task function as unkown is the right thing here, but there may be some value in making it easier to add types to the workerUtils object where you are actually working in typescript and asking what is passed in to have certain types / set job identifiers though?

Something like


// Define expected Job Types
type TaskTypes = {
  ['emails.send']: {
    email: string
    template?: string
    subject: string
    body: string
  }
  ['task_identifier']: {
    name: string
  }
}

// Able to pass in Task types as a generic
const workerUtils = makeWorkerUtils<TaskTypes>({...opts})

workerUtils.addJob("task_identifier", {name:"dave"}) // Gives intellisense here 

workerUtils.addJob("task_identifier", [ {name:"dave"} , {name: "jane"}]) // also gives intellesense for batch jobs

// Suppose quickAddJob would also need a way ... 
quickAddJob<TaskTypes>({...opts},"task_identifier", {name:"benjie"}) //Intllisense!

If I get chance I might raise a separate PR looking at that but feel free to dismiss it πŸ™‚

p.s. If anyone else finds my pattern above useful I can share the missing pieces as a little fully working repo in the future (including my simple code gen etc) but I won't do that without interest πŸ™‚.

JonParton avatar Oct 28 '22 23:10 JonParton

Hmmn thinking about it an addJob is also passed in to the context of worker task definitions as a helper too ... so a generic there would also be helpful but could be messy as you would almost want to pass it in on the Task type but this may confuse people with the input being unknown then ... πŸ€”

JonParton avatar Oct 28 '22 23:10 JonParton

p.s. If anyone else finds my pattern above useful I can share the missing pieces as a little fully working repo in the future (including my simple code gen etc) but I won't do that without interest

Hey @JonParton, I'd be super interested in a repo when you'll get some time. Also, do you run your workers in a serverful or serverless manner?

trompx avatar Mar 22 '23 10:03 trompx

Graphile worker is deffinitely a serverfull manner affaire!

At the moment I run them alongside applications hosted on fly.io which is container based.

For a more Serverless approach I was looking at Azure Container apps that can use a KEDA Autoscaler to read a query from postgres as to if the worker container should be scaled to zero (or scaled up to x many workers etc) but it would mean a bit of a startup time if your Job que hadn't been used for a while!!

For proper Serverless aproaches, I think you need another tool or pattern like Qstash but the pattern for a true que, with almost instant response, is less good!

If I get time I'll crank out a repo with the above pattern too but I'm pretty stacked at the moment!

JonParton avatar Mar 22 '23 10:03 JonParton