odd-jobs icon indicating copy to clipboard operation
odd-jobs copied to clipboard

Resource-limited jobs

Open TomMD opened this issue 4 years ago • 22 comments

Problem context and a hack-solution

I have a situation where the jobs need to be limited by a resource of some type. For example, a resource should not have more than K jobs running at a time. Resources could be idenified by a text column.

It appears the best way forward is to schedule all jobs, keep my own table for resource consumption, and reject the excess jobs by re-queuing with a status of Retry. This enforces the resource limit and makes the excess job(s) continually re-scheduled on some interval (depending on jobRunAt). Even then, it isn't clear how to set a job to be marked for Retry... perhaps just a direct call to saveJobIO.

Proposing a better solution

We could solve this and, to the high level user, we would only introduce two fields:

  • jobResource :: Maybe Text column describing the resource consumed by this job.
  • jobMaxResourceCount :: Maybe Int column describing the max resource consumption.

The listener and poller would have to include another filter. Something like SELECT * FROM $table outer WHERE jobMaxResourceCount > ( SELECT COUNT(*) FROM $table jobResource = outer.jobResource ).

Thoughts? It would be great to hear where you're planning on taking this library.

Other Thoughts

Odd jobs looks wonderful, thanks for open-sourcing it. A few aspects feel framework-ish (i.e. it gets to feeling prescriptive such as on the logging) and that makes me wonder how hard it might be to rework my queue to use odd jobs. That said, the point in the design space is spot on as well as the exposed level of abstraction. Much appreciated!

TomMD avatar May 19 '20 06:05 TomMD

I'd really like to see this come to fruition. Is the behavior I propose something you'd accept in the library?

TomMD avatar Jun 02 '20 23:06 TomMD

@TomMD sorry about missing this completely. for some strange reason I had unwatched this project, while fiddling with some settings!

saurabhnanda avatar Jun 23 '20 17:06 saurabhnanda

@TomMD you raise a number of good points:

Even then, it isn't clear how to set a job to be marked for Retry... perhaps just a direct call to saveJobIO.

Currently cfgJobRunner is simply Job -> IO () and I had wondered if it made sense to change it to Job -> IO (Maybe JobStatus) for explicitly forcing a job to retry, or to mark it immediately as "permanently failed". Right now, if you want to do this, throw an unhandled exception from the job-runner and the job will be retried cfgDefaultMaxAttempts times. However, you won't be able to control the jobRunAt.

So, yes -- I think changing the type-signature of cfgJobRunner to enable such use-cases is a good idea at this early stage (to avoid breaking changes in the future).

I have a situation where the jobs need to be limited by a resource of some type. [..] We could solve this and, to the high level user, we would only introduce two fields:

Does a job need to be limited by a resource, or do you mean a job-type? Do you have some concrete use-case in mind? External API that is rate-limited, perhaps? Or something else?

Odd jobs looks wonderful, thanks for open-sourcing it.

Thank you for the kind words. Sorry about being unresponsive. I was wondering about the inactivity around the project and didn't realize that I had ended up unwatching it myself!

A few aspects feel framework-ish (i.e. it gets to feeling prescriptive such as on the logging) and that makes me wonder how hard it might be to rework my queue to use odd jobs.

Which is why I added defaultLogStr. It seems most people would be happy to get plain-text logging out-of-the-box and call it a day. But, I see a real use-case in emitting json logs and putting some sort of graphs / analytics on top of it.

saurabhnanda avatar Jun 23 '20 18:06 saurabhnanda

Does a job need to be limited by a resource, or do you mean a job-type? Do you have some concrete use-case in mind? External API that is rate-limited, perhaps? Or something else?

A JobType would need to be limited by a resource. My use-case would be to limit the number of concurrent jobs being served on a per-user basis - so very similar to rate limiting except that the rate doesn't matter only the number being served right now.

UserA -> DoThing  -- consumed = [ ( userA, 1 ) ]
UserA -> DoThing  -- consumed = [ (userA, 2) ]
UserB -> DoThing  -- consumed = [ (userA, 2), (userB, 1) ]
UserA -> DoThing  -- consumed = [ (userA, 2), (userB, 1) ]  backlog = [ (userA, 1] ]
Finish thing             -- consumed = [ (userA, 1), (userB, 1) ] backlog = [ (userA, 1) ]
Pop backlog           -- consumed = [ (userA, 2), (userB, 1) ]
Finish thing x 3      -- consumed = [ ]

TomMD avatar Jun 23 '20 18:06 TomMD

A JobType would need to be limited by a resource. My use-case would be to limit the number of concurrent jobs being served on a per-user basis - so very similar to rate limiting except that the rate doesn't matter only the number being served right now.

Can this be built on top of DynamicConcurrency with a slight change?

Use this to dynamically determine if the next job should be picked-up, or not. This is useful to write custom-logic to determine whether a limited resource is below a certain usage threshold (eg. CPU usage is below 80%). Caveat: This feature has not been tested in production, yet.

Instead of DynamicConcurrencyControl (IO Bool), what if the type signature is changed to DynamicConcurrencyControl (Job -> IO Bool). So, odd-jobs would somehow pick the next job (without locking it), and pass it along to this action. This action could pick the userId from jobPayload and run a query to find how many other jobs for the same userId are running. If it's below the pre-determined threshold, it evaluates to True, else False.

The challenge in this approach is implementing a "peek" function without thrashing the DB or unnecessarily locking and unlocking jobs in quick succession.

The other approach could be to change the type-sig of jobRunner to Job -> IO (Maybe JobStatus) (which is probably a good idea for other reasons as well). And use it in the following way:

jobRunner :: Job -> IO (Maybe JobStatus)
jobRunner Job{jobPayload} =
  case jobPayload of
    RateLimitedJob x -> do
      (ensureRateLimit x) >>= \case
         False -> pure $ Just JobRetry
         True -> runRateLimitedJob x
    RegularJob x -> runRegularJob x 

ensureRateLimit x = 
  -- extract the userId from x, query the DB to check how many jobs are
  -- currently running for the same userId, and evaluate to True, or False.

The second one sounds cleaner, right?

saurabhnanda avatar Jun 23 '20 18:06 saurabhnanda

@TomMD any thoughts on https://github.com/saurabhnanda/odd-jobs/issues/38#issuecomment-648340999 ? Will that solve your problem?

saurabhnanda avatar Jun 24 '20 04:06 saurabhnanda

@saurabhnanda I'll need to give it proper thought but am a bit busy - will take some days to look into DynamicConcurrency.

TomMD avatar Jun 24 '20 04:06 TomMD

yes I believe DynamicConcurrency with the proposed change would work ok, but the second is closer to what I had in mind. Is there one you'd prefer to see in the library and one you'd rather not support?

TomMD avatar Jun 28 '20 01:06 TomMD

I would really like to see this feature implemented! I have the user-rate-limiting use case as well, but I also have a use case for throttling requests against an external resource, but there might be multiple job types accessing a particular resource. I would really like to be able control the number of jobs of multiple types that are concurrently accessing an abstract resource..

mrputty avatar Sep 18 '20 04:09 mrputty

@TomMD @mrputty let's solve this then. What're the thoughts on https://github.com/saurabhnanda/odd-jobs/issues/38#issuecomment-648340999 ?

Actually, now that I think about this more, neither of the two approaches would prevent "thrashing" of the job-queue. If a particular set of jobs is being rate-limited, odd-jobs is going to pick them up (lock them for executing), and then almost immediately reschedule them.

In your use-cases, once a set of jobs has been rate-limited, how long do you want to wait before attempting to execute them again?

saurabhnanda avatar Sep 18 '20 04:09 saurabhnanda

I think rate-limiting is separate from retrying. I haven't fully thought this through, but I do think we need to come up with a solution that avoids thrashing or locking/unlocking, and avoids any concept of waiting or retrying.

In the absence of transactions, this will require something that can be evaluated in the update statements that select and lock job rows. I personally think this feature should managed entirely within the library. I also think the resource for a job needs to be tracked separately from the job type. I have some very generic jobs that might be associated with many resources, and I'd like to be able to regulate concurrency by both system level resources and user/account level resources.

What I'm thinking of is similar to what @TomMD originally proposed above, but it separates the specification of resource limits from job submission, which is important for my operational requirements. So maybe it's a hybrid of his two initial suggestions.

In any case, my inclination is to connect the resource limit to the resource rather than the job. This implies the addition of a new table, which I'll call resource for this discussion, and which I think should be defined by the library.

I'm imagining something like this:

create table resource
 ( resource_id text primary key
 , resource_limit bigint
 );

alter table job add resource_id text;

Assume a new Config field defaultResourceLimit :: Int64, and that the createJob and scheduleJob functions take a new Maybe ResourceId parameter. There might also be some convenience functions for manipulating resource records.

The key change would be in the dequeueing update statements, where the innser select would have to apply logic like this:

  • If job.resource_id is null then behave as today.

  • Otherwise, only select the job if the count of locked jobs with a matching resource_id is less than the resource_limit determined by:

    1. If there is no resource.resource_id matching job.resource_id assume the defaultResourceLimit, or
    2. If there is a resource.resource_id matching job.resource_id use resource.resource_limit if it is not null, or
    3. If the matching resource.resource_limit is null assume an infinite limit.

That's not going to be the prettiest SQL, but I think it can be done, and with reasonable performance.

This would meet all my project's requirements, and I think it also makes sense from a library perspective, as it would require the user to opt in to resource-limiting, while still giving operationally useful fine-grained dynamic control over resource utilization, but keeping all the queueing machinery entirely in the library code. It does introduce a new table, which is not a small thing at this point, but otherwise it does not seem like it adds undue complexity.

One thing this doesn't do is allow multiple job queues/tables to share resource limits. I think it would be possible to do that by moving the accounting into the resources table, but it would be a fair bit harder (it would require explicit transactions amongst other complexities).

(On a tangent, this did make me wonder if the library might, in the face of an increasing number of related database objects, want to be a bit more opinionated about database object management. One way to do this would be ask for a schema name instead of individual table names, and only allow one instance of odd-jobs to run in a given schema. The table names could then be fixed, with only the schema qualifier being variable. Of course that makes shared resource accounting across queues even less likely, but that's not really an important feature to me.)

mrputty avatar Sep 25 '20 05:09 mrputty

@mrputty thank you for your comment at https://github.com/saurabhnanda/odd-jobs/issues/44#issuecomment-699034092 Are you and @TomMD working on the same problem, or are you talking about two separate, but similar use-cases?

saurabhnanda avatar Sep 28 '20 05:09 saurabhnanda

So, here are the problem constraints:

  1. Dequeuing the job only to realize that it's rate-limited and re-queuing it again is a wasteful operation and should be avoided as much as possible.
  2. For certain cases you don't really need to dequeue a job to realize that it can't really be executed right now (the rate-limits are known inside the DB itself, so to speak)
  3. Whereas in certain cases you need to dequeue and call the jobHandler which will decide whether to execute or re-queue the job (eg. the rate-limits are decided by a system which is not accessible via a simple SQL query).

To tackle 3 we need a simpler/straightforward way for the jobHandler to tell odd-jobs to re-queue a job. This is discussed in https://github.com/saurabhnanda/odd-jobs/issues/38#issuecomment-648340999

To tackle 1 & 2, here's a simpler solution, which doesn't introduce more moving parts into the library:

  • Make jobPollingSql (and possibly tryLockingJob) configurable for library users

@TomMD @mrputty does it make sense to you?

saurabhnanda avatar Sep 28 '20 05:09 saurabhnanda

That makes sense. It allows me to package up my pattern as odd-jobs-queue or some such to wrap up things with a nice API. However, I don't know @mrputty so his issue and constraints might be different.

TomMD avatar Sep 28 '20 05:09 TomMD

Keeping the details of the resource limiting outside of the library does seem possible, but I have some concerns I'd like talk through.

First I'll point out something that I should perhaps have have called out more explicitly my initial comment: the consistency of my proposal (in the absence of explicit transactions) assumes the addition of a resource_id column to the library's existing job table.

As a library consumer, I would feel very uncomfortable adding a new column to a library defined table. In the absence of that, in addition to a new resource table there would need to be a new user-defined table, which I'll call job_resource, in order to track the resource associated with each job.

The way the library is set up today, this would have two consequences:

  1. The job_resource table will have to be set up carefully. It will need a FK reference to job, which must have on delete cascade. This may have a negative impact on future migrations (see JobId should be Int64).

  2. The user would have to create their own wrappers around createJob and scheduleJob. These would have to start a transaction, call the wrapped function, get the id out of the returned job, and create the corresponding row in job_resource.

An alternative to this would be to implement explicit transactions in the library. Among other things, this would require that the user callbacks such as onJobStart would need to be run inside the transaction (which would require their signatures to change to something like Connection -> Job -> IO (), and and/or there would need to be an explicit onJobDelete function added.

In any case, for both constraints 1 and 2, both jobPollingSql and tryLockingJob would have to be configurable in order for any approach like this to work.

Doing all this in the library is less flexible (for example, with the eternal approach, I could easily add multiple resources per job, which is appealing in some of my use cases), but it side-steps these issues.

For constraint 3 (the try/re-queue scenario), if/when that is tackled, I think explicit transaction will also be required. There would least be an option for this to take place within a database transaction. That at least gives the user an means to safely use a more complex database-backed solution, and at least in that case, it would also transform the "re-queue" into a rollback" which could avoid some of the "thrashing".

FWIW, I think explicit transactions would be a good idea anyway, but I think they become necessary in the face of implementing resource-limited jobs outside of the simple "single update" case.

My core requirement are 1) a mechanism to optionally associate a resource with each job and limit the number of concurrent jobs running for a given resource, and 2) transactionally safe management of job processing. The proposal I made does introduce a new moving part, but it otherwise fits into the current structure of the library. @saurabhnanda, your counter proposal keeps these moving parts out of the library but it seems to me that it makes the library consumer's job harder, and I can't see a way to do this without introducing a new level of coupling to library and database details.

I don't have a strong preference between the two approaches we're currently discussing, as long as we can keep all queue operations atomic in the database. In any case, it seems that @TomMD and I have similar requirements around this, which I think is a good indication that this is a good problem to solve!

mrputty avatar Sep 28 '20 17:09 mrputty

@mrputty @TomMD if it doesn't divulge any proprietary information, would you be able to share on what basis are the jobs being resource-limited? Is it supposed to be something like:

  • X jobs per min per user
  • X jobs per min per tenant/company/account
  • X jobs per min per external API
  • something else?

This will allow us to figure-out the following:

  1. does an abstract job_resources table (dictated by the library) makes sense
  2. or, does providing a way to add more columns to the core jobs table make sense
  3. or, can this be added to the job_payload JSON data (it's anyways extensible)
  4. or, something else

Btw, at Vacation Labs, we needed to add extra auth information to each job payload and we used approach 3 mentioned above (add it to the JSON payload)

Here are my initial thoughts about using a job_resources table -- I suspect it might add more complexity without any advantages. I'm not sure if rate limits are applied at an "abstract level" in real life. It is always in the context of an object/data-model that otherwise holds some domain/business significance. Eg. per user, or per account, or per api key, etc. This means that there would already be a table in the DB to hold said domain objects. Library users would appreciate a reference from the jobs table to one of these existing domain-tables, instead of an abstract job_resources table, that doesn't mean much to them.

What are these "domain tables" in your cases? @TomMD @mrputty

saurabhnanda avatar Sep 30 '20 06:09 saurabhnanda

I have several use cases for resource-limited job execution. However, I have been thinking of this in terms of concurrency, not in throttling of jobs per unit time. I do have a use case for that too, but I think handling that in the queue is very hard (rate limiting is about the spacing between jobs, which is hard to track, and rules about it are hard to define). As long as we can manage concurrency, it seems more natural to manage the rate limiting in other ways.

Here are my concurrency limiting use cases:

  • X concurrent jobs per user

    The resource limit should be configurable, for example based on subscription level, but having a default resource limit is also very useful, as it means I only need to touch the configuration when a user limit needs to be increased over the baseline.

  • X concurrent jobs per external API

    We have a wide range of job sizes and execution times, as well as multiple external APIs with different throttling requirements. Expressing these throttling requirements in odd-jobs would be a lot of work, and seems beyond the scope of this library. However, having the ability to limit on concurrency gives us enough control to implement the specific throttling rules inside the job runners.

    To orchestrate some of our more involved processes, we have jobs that themselves create new jobs. Some of these processes may generate 1000s of sub-jobs. Currently, these are being wrapped up in a job that contains a large list (e.g. of ids that need to be processed). Being able to limit the concurrency of these would mean we could create a bunch of smaller jobs (preferable for error handling and recovery) while still limiting the number of concurrent jobs. Sometimes the sub-jobs share the same logical resource, and sometimes the are for a different resource. Being able to define abstract resource ids with default resource limits is a powerful tool in this space.

  • X concurrent jobs per internal resource

    We have jobs that are interacting with internal resources as well (e.g. loading data into a warehouse database vs. loading data into a user-facing datamart), and these resources have different capacities and performance constraints. Having concurrency limits enables us to create new jobs in a natural, bursty way, while limiting the maximum impact they may have on any given subsystem.

Note that each of these categories has unique jobs types, but there are also job types that are shared between them. This means the resource limit must be separate from job type. Further, there are multiple types of external entities that I want to treat as resource, and while some are represented in the database (e.g. as a user), not all of them are, so there aren't always 'domain tables' in my case.

I agree that the rate limits are not abstract in practice. They are indeed tied to various domain entities, however there may be multiple domains, and in my case at least, they are definitely not all tied to a concrete entity in the database. This leads me to conclude that the library must treat them in an abstract way, and let the user manage them as they see fit. I think that the interpretation of the resource_id should be opaque to the library. If, as a user, I want to put user ids and API keys in there, that's fine, as long as I define a resource_id syntax to keep them separate.

So while these resource-job relationships may seem like foreign key relationships, I think that is not a good way to think of them. I don't think we should encourage explicit coupling between odd-jobs and user tables. I'd like to see a very clear boundary between library-owned concepts (jobs, resource limits) and user-owned concepts (job payloads/types, resources).

In any case, here are the core requirements I see:

  • Define resources, which are opaque resource_ids and an integral resource_limit, which defines the maximum number of concurrent jobs that can be running for a job with that resource_id.
  • Optionally attach a resource_id to each jobs record when it is created.
  • When dequeuing jobs, in a transactionally safe and efficient way, only dequeue jobs which are within the resource_limit of the associated resource_id, using a default resource_limit if one is not supplied, and applying no limit if no resource_id is specified with the job.

As for how we achieve this, I'd break this down slightly differently:

  1. Library-defined: The library defines a resources table, and extends the jobs table to link up to one resource to each job. This is by far the simplest approach that I can think of that meets the requirements I state. Yes, it does introduce a new "moving part" in the resources table, but it is the now

  2. User-defined: The library allows the user to override the two dequeueing update statements to include resource-limiting restrictions. The user is responsible for defining the resources and job_resources table, as well as transaction-managing wrappers for createJob and scheduleJob.

  3. Payload-managed user-defined: As a variant of option 2, the library allows the user to override the two dequeuing update statements to allow user-defined logic to inspect the payload and if it has resource limits, inspect the payloads of all other running jobs to see if the resource limit is exceeded. Of course a user-defined resources table could be created to specific the limits.

From the library's perspective, I think option 3 is just like option 3, but I personally find option 3 very unappealing. As I said earlier, I don't think the resource should be tied to the job type, but the structure of the payload is the job type. In any case, I think the payload should be opaque to the SQL layer, but beyond that, I think this has significant performance concerns.

I find option 2 to introduce far more complexity, but it does offer additional power, and I'd certainly consider it a viable approach. For me, option 1 is clearly the simplest. It offers sufficient power for my use cases, while still being both easy to implement and easy to use.

@saurabhnanda, can you give me some more insight into the concerns you have with this approach?

mrputty avatar Sep 30 '20 22:09 mrputty

So, what I didn't realize is that, some users may want to limit concurrency based on different domain tables. I was assuming that either you'd do it on a per-user basis, or a per api-key basis, but not both (in the same app). If that is accepted (which seems reasonable), we have the following choices:

  1. use a job_resources table, as you have outlined and have a jobs.job_resource_id column which has an FK constraint to the job_resources.id
  2. De-normalize, and have two columns in the jobs table itself: concurrency_key and concurrency_limit

can you give me some more insight into the concerns you have with this approach?

So, my concern with approach 1 is a theoretical concern wrt perf. I'm not sure how PG would behave if jobPollingSql is changed to introduce a join with job_resources table. In some ways I am hoping that the de-normalized approach would alleviate these unknown concerns, but even there I am not sure. In both the cases, the decision whether to lock a job for execution. or not, will now depend on looking at other rows that share the same concurrency_key (or job_resource_id). How would one write such an SQL efficiently?

After having this discussion, I am not opposed to either approach, but would like to benchmark the perf impact of introducing this change to the core jobPollingSql statement. Perhaps it is time to write this benchmark suite - something that I've been procrastinating about.

@mrputty since you are evaluating odd-jobs for a much higher scale, have you, by any chance written a benchmark that you would be willing to contribute upstream?

saurabhnanda avatar Oct 01 '20 06:10 saurabhnanda

Also, I realized an advantage of having a separate job_resources table. It can have the following columns: id, resource_name, concurrency_limit, and current_job_count

current_job_count can be kept up-to-date every time a job is started or ended/failed. This can make jobPollingSql simpler.

But even then, the impact on perf should be measured.

saurabhnanda avatar Oct 01 '20 06:10 saurabhnanda

Yeah, for me, keeping the resource limits flexible, and not tied to a single domain object, is pretty important. I think this would dramatically limit the benefit of this feature.

Having the concurrency limit attached to the job does not make sense to me. That would be two jobs attached to the same resource could have different limits. The whole point of this for me is to have a separately defined limit that applied to every jobs attached to a given resource.

I am not proposing using a field like the current_job_count. Yes, this might make the dequeueing statements simpler, but I think doing it right would be much complicated over all, and it would require the introduction of a transaction around the dequeue operations. Instead, I'm proposing just doing the sum on the fly in the update queries, so they will be part of the implicit transaction around those statements.

I don't have big concerns about the performance impact of adding this join, assuming things are indexed properly (so that only the other rows with a matching resource_id would have to be examined), and I believe it can be done in a way that would only impact jobs actually using a resource. Compared to the overhead of some of the other solutions we've discussed, this seem like a natural solution in the Postgres space. That said, benchmarking would be great.

I don't have any kind of benchmark lying around, but I'm happy to contribute to this effort. However, perhaps what I could contribute would be the dequeueing mechanism, rather than the benchmarking. I've spent some time thinking about it, and I would be happy to implement this, or even the whole resource-limiting feature as I've proposed it.

@TomMD, this was originally your request. I would love to hear your take on the recent discussion, and especially whether the approach I'm proposing 1) seems reasonable, and 2) addresses your needs.

mrputty avatar Oct 01 '20 15:10 mrputty

I don't have any kind of benchmark lying around, but I'm happy to contribute to this effort. However, perhaps what I could contribute would be the dequeueing mechanism, rather than the benchmarking. I've spent some time thinking about it, and I would be happy to implement this, or even the whole resource-limiting feature as I've proposed it.

Instead, I'm proposing just doing the sum on the fly in the update queries, so they will be part of the implicit transaction around those statements.

I can write the benchmarks if you're interested in implementing the feature. But, if the benchmarks take a hit (especially for users who don't need this feature) it might not end-up getting merged. Is that alright?

saurabhnanda avatar Oct 11 '20 16:10 saurabhnanda

Yes, that makes sense to me. I'm happy to implement this knowing it must not negatively affect performance for non-limited use cases.

mrputty avatar Oct 13 '20 17:10 mrputty

Closed by #80 .

ivb-supercede avatar Feb 16 '23 11:02 ivb-supercede