db-scheduler icon indicating copy to clipboard operation
db-scheduler copied to clipboard

Enhance db-scheduler to support asynchronous task execution

Open amit-handda opened this issue 2 years ago • 33 comments

Thank you for maintaining db-scheduler.

What

This PR enhances db-scheduler to support task execution asynchronously. It addresses this issue

Why

db-scheduler has an executorservice of threadpool. Threadpools are used to schedule db-scheduler tasks. Tasks were executed in a synchronous way. If tasks do something (http call eg) asynchronously, the threadpool will remain blocked. This might not be an issue where tasks are tiny or the task throughput is tiny. But, it can become an issue for some scenarios.

Current status

  • Few TCs are flaky due to asychronous implementation. looking into fixing it.

amit-handda avatar Jul 07 '22 13:07 amit-handda

Thank you! I am currently on vacation and will not be able to look at it for a couple of weeks. And it is also a critical change so I need some time to consider it

kagkarlsson avatar Jul 14 '22 06:07 kagkarlsson

I see some significant performance improvements as well. @amit-handda can you post some benchmarks as well?

x0a1b avatar Aug 01 '22 18:08 x0a1b

As expected, we have observed signficant throughput gains for async tasks, as follows:

  Current (execution/s) Future-based (execution/s)
1 scheduler instance    
300 threads, 10k tasks 285 1,666
4 scheduler instances    
20 threads, 10k tasks 80 2,206
100 threads, 10k tasks 359 2,407

Task def image

amit-handda avatar Aug 03 '22 23:08 amit-handda

Hello @kagkarlsson, hope you had nice vacation. Wondering if you are back and had the time to look at the PR ? Let me know if you have any query. TY !

amit-handda avatar Aug 19 '22 21:08 amit-handda

Thank you! Yes I am back, but I struggle to get the bandwidth to review this PR unfortunately.

I have scanned through the code and noted a couple of things:

  • I am not sure how I feel about breaking the ExecutionHandler interface. If the async variant could possibly be introduced more gently, as an option rather..
  • Are we allowing more things to happen in parallel? (If so, I am not sure what the consequences of this will be)

kagkarlsson avatar Aug 24 '22 20:08 kagkarlsson

Hello @kagkarlsson , thanks for the feedback. ques 1/ how do you intend the ExecutionHandler interface to be ? The PR is allowing the ExecutionHandler implementation to return a Future<CompletionHandler> so that the db-scheduler thread would be unblocked (in case the implementation performs async ops such as HTTP calls etc.) Thanks,

amit-handda avatar Aug 25 '22 15:08 amit-handda

1/ how do you intend the ExecutionHandler interface to be ?

Well, that is the problem, I do not know currently. What options do we have 🤔

kagkarlsson avatar Aug 26 '22 13:08 kagkarlsson

As expected, we have observed signficant throughput gains for async tasks, as follows:

  Current (execution/s) Future-based (execution/s) 1 scheduler instance     300 threads, 10k tasks 285 1,666 4 scheduler instances     20 threads, 10k tasks 80 2,206 100 threads, 10k tasks 359 2,407

I need to understand why a future-based execution will be that much faster. Feels like we are skipping av step..

kagkarlsson avatar Aug 26 '22 13:08 kagkarlsson

I need to understand why a future-based execution will be that much faster. Feels like we are skipping av step..

Hi, It is due to our task defn. Task implementation does nothing other than sleeping for a second. After our changes, the sleeping tasks dont block the scheduler threadpool (since task implementation returns futures). Before our changes, same task implementation would block the scheduler threadpool (hence, leading to inferior performance), let me know if you have any issues. https://user-images.githubusercontent.com/56566242/182731346-cf142ba0-56f5-45ad-a888-69b8c4378aac.png

amit-handda avatar Aug 29 '22 20:08 amit-handda

Well, that is the problem, I do not know currently. What options do we have 🤔

IMO, returning a completablefuture<completionhandler> is the only option to enable task to execute async operations and unblocking the db-scheduler threadpool. <- this necessitates updating the interface in a non-gentle way. :)

amit-handda avatar Aug 30 '22 02:08 amit-handda

I think what @amit-handda is mentioning is more specific for reactive solutions (In this case Kotlin coroutines which are non-blocking on threads). I think having this in place will unlock bunch of reactive usecase (including Spring framework users) to write their workers in more scalable way.

maxpert avatar Sep 01 '22 05:09 maxpert

The results you are referring to, where future-based and non-future based tasks are compared, what polling-strategy are they using, and what limits?

In my mind it should not be pulling in more work before completing a significant part of the last batch. But that will be affected by polling-strategy changes.

kagkarlsson avatar Sep 01 '22 19:09 kagkarlsson

The results you are referring to, where future-based and non-future based tasks are compared, what polling-strategy are they using, and what limits?

So the reactive systems that I am referring to usually use stuff like e-poll, kqueue etc. For example if you are using Reactor + Netty there are various system level event driven libraries it can build on, you can read details here. Combined with project reactor, it can get you amazing async DB libraries like R2DBC which means you can talk to DB in async manner now.

In my mind it should not be pulling in more work before completing a significant part of the last batch. But that will be affected by polling-strategy changes.

If the work it's doing is CPU intensive for sure, even regular threads will be blocked. But if the work is more IO intensive then in traditional model you are just spinning up threads and wasting cycles waiting for it to complete. That's where these Async-IO / Reactor driven frameworks let you fully saturate your CPU. I don't think Future based work load messes with pulling logic.

So goin details a little bit more let's assume as simple job scheduler that sends out emails. All the job does is pull an email from worker job and then makes an HTTP call to say SendGrid or some external service to send email. Now in traditional thread model you won't be able to spin up more than couple of hundred thread, so you can't do processing of more than couple of hundred parallel emails at a time, however the async/reactor models unblock you from that. Since each job turns into a future promise based model, none of the threads have to block and wait until the worker lets go of thread. That's what event loops have done and unlocked at large scale. This PR IMO will exactly do that.

maxpert avatar Sep 02 '22 01:09 maxpert

I appreciate the explanation, but I was also concretely asking about what settings where used in the test :)

In my mind it should not be pulling in more work before completing a significant part of the last batch

was referring to my mental model of how the scheduler works

kagkarlsson avatar Sep 02 '22 06:09 kagkarlsson

I think the thing is that if the thread-pool is not the bottleneck in terms of how many executions we may have running in parallel, we need to consider how many is safe to allow? After they finish there will be a backlog of executions to complete..

kagkarlsson avatar Sep 02 '22 19:09 kagkarlsson

Hi @kagkarlsson, we used lock_and_fetch poll strategy with 0.5 (lower_limit) and 4.0 (upper_limit). please let me know if you need more info. Many thanks for your careful review feedback. Have a good day.

return Scheduler.create(primaryDS, task.make())
      .pollUsingLockAndFetch(lowerLimitFractionOfThreads, upperLimitFractionOfThreads)
      .threads(schConfig.schedulerThreadPool)
      .pollingInterval(Duration.ofSeconds(schConfig.schedulerPollingIntervalSecs.toLong()))
      .schedulerName(SchedulerName.Fixed(schedulerInstanceId))
      .registerShutdownHook()
      .build()

perf test config

db:
  db:
    jdbcUrl: "jdbc:postgresql://localhost:5432/pgloadtest_local"
    username: "pgloadtest_local"
    password: "testpwd"
    maxLifetimeSecs: 300
    dbconnTtlSecs: 300
    maximumPoolSize: 50
pgloadtest:
  dbscheduler:
    inactivityDurationSecs: 3600
    failRedoSecs: 4
    schedulerThreadPool: 300
    schedulerPollingIntervalSecs: 10
  task:
    failRedoSecs: 5
    inactivityDurationSecs: 600

amit-handda avatar Sep 05 '22 23:09 amit-handda

Any updates on if this can be merged? This will really unblock a lot of use-cases.

maxpert avatar Sep 13 '22 14:09 maxpert

It is taking some time because it is not a priority feature (though I am starting to see the merits) and there are just still things to consider that I feel are unanswered.

My thoughts atm

  • have you verified your fork works as expected for your use-case?
    • what configuration are you hoping to run?
    • what performance are you hoping to see?
  • unexpectedly high executions/s, why is that, it should do maximum 4xthreads/s but seem to be doing 5-6xthreads/s
    • was the same config for comparing sync vs async?
    • do we have a bug in the polling-logic? it should not pull more work until queue-length reaches 0.5xthreads again
  • does db-scheduler have any assumptions somewhere in the codebase that the execution lifecycle will be executed by the same thread? (I think not, though aync does make things harder to debug)
  • if we go the async route, how would we do stuff like this: https://github.com/kagkarlsson/db-scheduler/issues/116

Also concerned about unknown unknowns. If it goes in, I think it needs to do so as an experimental feature.

kagkarlsson avatar Sep 13 '22 15:09 kagkarlsson

Would it at all be possible to enable async-mode by config-setting?

kagkarlsson avatar Sep 13 '22 17:09 kagkarlsson

@kagkarlsson

  • digging deeper into unexpectedly high executions/s, ll get back to you
  • about async-mod via config setting ... not sure about it, as discussed earlier. the PR makes breaking changes to db-scheduler public API. am open to ideas though.

amit-handda avatar Sep 13 '22 23:09 amit-handda

I feel conflicted about this change. It will enable higher throughput using fewer threads (for non-blocking io use-cases), which is great. On the other hand, I worry about the things it might break or make more complicated. And this is why an explicit opt-in for async-mode would be good..

I was considering if ExecutionHandler could get a default method for the async-variant, that just wraps the current method in a CompletableFuture. You would override the default method when necessary.. Still, we would need to find a way to toggle the completablefuture code in the executors aswell..

kagkarlsson avatar Sep 14 '22 17:09 kagkarlsson

Maybe it is possible to let the async version live and be released from a custom branch 🤔

kagkarlsson avatar Sep 26 '22 06:09 kagkarlsson

Thanks @kagkarlsson I agree with this, we can let it breathe through custom branch please.

amit-handda avatar Sep 26 '22 14:09 amit-handda

If we can do that and release it a some sort of beta package. I can write some reactor benchmarks as well.

maxpert avatar Sep 26 '22 15:09 maxpert

@kagkarlsson happy friday, checking in. May I help getting it released from custom branch ? thanks

amit-handda avatar Oct 07 '22 17:10 amit-handda

Shall we recreate the PR against the async branch I created?

kagkarlsson avatar Oct 15 '22 21:10 kagkarlsson

Trying to merge into the async branch now, but I am getting compilation errors. Is this branch building ok?

kagkarlsson avatar Nov 18 '22 07:11 kagkarlsson

I reviewed the past comment about this issue and curious also. we can do like below code to impl non blokc task(like http request). and on the other hand task is async or not it depends on use-case

 public RecurringTaskWithPersistentSchedule<PlainScheduleAndData> asyncRecurringTask() {
    return Tasks
        .recurringWithPersistentSchedule("async-recurring-task", PlainScheduleAndData.class)
        .execute((taskInstance, executionContext) -> {
          CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
              ()->{
                try {
                  // mock http request
                  Thread.sleep(2000L);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
              return "test";
              }
          );
          orgFuture.whenComplete((a, throwable) -> {
            // call back here

          });
        });
  }

GithubRyze avatar Feb 17 '23 07:02 GithubRyze

Sorry for not having the bandwidth for pursuing this issue. What about using virtual threads to solve this?

kagkarlsson avatar Mar 27 '23 15:03 kagkarlsson

@kagkarlsson virtual thread are in EAP and even after rollout just like record classes it's gonna take years for people to move over. All of existing library and software stack still relies on Future based solutions.

maxpert avatar Mar 28 '23 17:03 maxpert