ember-concurrency icon indicating copy to clipboard operation
ember-concurrency copied to clipboard

Awkward use case: batch+debounce

Open machty opened this issue 7 years ago • 13 comments

After my EmberConf prezo, devs (please comment up if this was you! :) ) were explaining a use case that wasn't elegantly handled by EC. It was something like:

We want to throttle / debounce our performs, such that each perform of a task stashes the args in an array, and after some period of time, that array is passed to some batch processor for processing. We tried this with EC but there was an awkward issue of cancelation / entanglement that we couldn't find an elegant solution to.

I'm guessing the code looked something like:

    // entry point; enqueue the batched args, and "kick"
    // the debounced restartable processSoon task
    enqueueForProcessing(...args) {
      this.batchedArgs.push(args);
      this.get('processSoon').perform();
    },

    processSoon: task(function * () {
      yield timeout(500); // debounce

      let batchedArgs = this.batchedArgs;
      this.batchedArgs = [];

      // this is the weird part; this processBatchNow is still
      // part of the the restartable `processSoon` task, so if
      // `enqueueForProcessing` for processing gets called and
      // it kicks `processSoon`, it'll restart right in the middle
      // of `processBatchNow`. And even if `processBatchNow()` were NOT
      // restartable (because, say, it's just a function that returns
      // an uncancelable promise), we'd still need to honor the
      // constraint to not concurrently run `processBatchNow`.
      yield processBatchNow(batchedArgs);
    }).restartable(),

I can't think of a way to model this with existing EC primitives that doesn't end up looking like the kind of code state-machine-y crappy code EC was designed to prevent. Maybe there's an answer to adding api to define uncancelable regions of tasks? Something like:

import { task, preventCancel } from 'ember-concurrency';

// ...

    processSoon: task(function * () {
      yield timeout(500); // debounce

      let batchedArgs = this.batchedArgs;
      this.batchedArgs = [];

      yield preventCancel();
      yield processBatchNow(batchedArgs);
    }).restartable(),

Or, quite possibly, there's something just a little off about EC's concept of "entanglement" (where anything you yield gets linked to you and canceled if you get canceled, etc). Anywho, let's discuss.

machty avatar Apr 02 '17 18:04 machty

Hey, it was @wytlytningNZ and I!

You've described the issue well. We haven't attempted it yet but we're wanting to use a bundling API endpoint to reduce network load: collect requests up over time, request a 'bundle', then unwrap it and pass back individual responses

Would flagging a task as uncancelable allow other tasks to run concurrently?

martletandco avatar Apr 02 '17 20:04 martletandco

I've run across this "issue" before. My scenario was similar, but I felt like I was trying to do something that didn't fit what EC was trying to help with. I wanted to debounce user actions, and after a period of time yield a potentially long-running request to the server.

At the time, I felt like the best all-around solution was to allow canceling these long-running server requests (which also meant low-level database op cancellation). This was implemented with some albeit complex code that overrides the ajax management by using response IDs and supporting (on the server) DELETE calls when an instance is cancelled. A bit of a mess (on both the client and server) and a bit non-standard, but it works.

After thinking about it more, it seems like EC could potentially help with this scenario. Unfortunately for me, I think I would still need to do some variation of what I am doing right now. E.g. if a user decides 2 seconds into the operation that they want to change something, I shouldn't disallow them to do that...

Anyway, I like the preventCancel idea. It's almost like stating that the instance has reached its point of no return and should not be restartable. I assume then that additional requests during this period would be dropped?

I guess another idea is to make processBatchNow another task, and first check that it is not running, but this could be a potential race condition location. E.g.

import { task, preventCancel } from 'ember-concurrency';

// ...
    processBatchNow: task(function*(batchedArgs){
      // long-running ops ...
    }).drop(),

    processSoon: task(function * () {
      yield timeout(500); // debounce

      let batchedArgs = this.batchedArgs;
      this.batchedArgs = [];

      if (!this.get("processBatchNow.isRunning")){
        yield this.get("processBatchNow").perform(batchedArgs);
      }
    }).restartable(),

A side-note: If the child task (processBatchNow) has concurrency of 1 and drops additional requests, this may work without the isRunning check... with a side-effect that your state arguments and actual-running arguments could be out of sync.

tzellman avatar Apr 04 '17 08:04 tzellman

So, apparently I was wrong in my assumption of the child task instance orchestration. This twiddle shows that it appears that even the child task (marked with drop) is getting cancelled when the parent is restarted...

I need to do some more tinkering with this..

tzellman avatar Apr 04 '17 09:04 tzellman

The batch processing should run concurrently, having only one instance at a time can be achieved with the enqueue or keepLatest modifier, or no modifier at all

I'm not sure if EC should be used in this way. I don't want to muddy the API with a use case that doesn't make sense. To tie batching back to a UI I'd have a task for each operation in the batch, so cancelation would be handled upstream. That's only because in this case the operations are network requests that aren't worth the hassle to cancel. A plain promise implementation would be along the lines of:

  enqueueForProcessing(...args) {
    let d = RSVP.defer();

    this.queue.push([d, args]);
    run.debounce(this, 'process', 500);

    return d.promise;
  },

  process() {
    let batchedArgs = this.queue;
    this.queue = [];

    let deferreds = [];
    let batch = [];
    for(let [d, args] in batchedArgs) {
      deferreds.push(d);
      batch.push(args);
    }

    let results = batchProcess(batch);
    for(let i = 0; i < results.length; i++) {
      // with more error handling, reject etc
      deferreds[i].resolve(results[i]);
    }
  } 

-- pause for reflection --

Writing this out as given me another idea to try which might drive out a need around the derived state

martletandco avatar Apr 04 '17 09:04 martletandco

I've created a demo showing that passing it off to another task does work (derived state is fine). The processSoon task does the debounce and gather, then kicks of another task (processNow) without holding a reference to it. I've yet to verify all cancellation scenarios work

  enqueueForProcessing(...args) {
    this.batchedArgs.push(args);
    this.get('processSoon').perform();
  },

  processSoon: task(function * () {
    yield timeout(500); // debounce

    let batchedArgs = this.batchedArgs;
    this.batchedArgs = [];
    
    // start 'isolated' task
    this.get('processNow').perform(batchedArgs);
  }).restartable(),
  
  processNow: task(function * (batchedArgs) {
    let results = yield processBatchNow(batchedArgs);
    
    this.results.push(results.join(', '));
  }),

martletandco avatar Apr 04 '17 11:04 martletandco

So this works but it has a few drawbacks which turn up if you're interested in individual and/or batch task state

One way to think about this data flow is the no task is really cancelled, just delayed until the batch task starts, so processSoon tasks are grouped then tied to a processNow task. There could be a mechanism to make a TaskInstance reliant on a different context, where it could also pull data from and yield for it's own, final, value

For tasks with a maxConcurrency of more than one, it would be nice to opt in to derived state for each TaskInstance

I'll try put up a proposed API when I'm on a computer

martletandco avatar Apr 05 '17 05:04 martletandco

I just saw that you've touched on the maxConcurrency > 1 derived state issue in #132

martletandco avatar Apr 05 '17 09:04 martletandco

This is what I was thinking around how a context API could be used to make batching less awkward. It's similar to the preventCancel in the opening comment but retains a reference, instead of it going into a blackhole of cancel-lessness

Concerns of concurrency limiting, derived state, and cancelation are moved to the context so that it can be changed dynamically. It also means that it can be passed around like Promise can, allowing flows like https://github.com/poteto/ember-pipeline

import { task, context, switchContext }  from 'ember-concurrency';

export default Service.extend({
  queue: [],

  init(){
    // new context that prevents children's cancels from bubbling
    this.currentContext = context({
      task: this.process,
      restartable: true,
      cancelFromDecendants: false,
    });
  },

  cancel() {
    let c = this.currentContext;
    return c.cancel().then(c.reset); // or just create a new context?
  },

  enqueue(...args){
    // Allow all tasks to be cancelled at once
    let c = this.currentContext.derive();
    this.queue.push([c, args]);
    this.currentContext.perform();
    return c;
  },

  process: task(function * () {
    yield timeout(500); // debounce

    let batchContext = this.currentContext.derive({ restartable: false });
    // alternative:  context({ restartable: false, parent: this.currentContext });
    
    // detach from restartable context to isolate from debounce/restart
    yield switchContext(batchContext); // or context.switch(batchContext)

    let queue = this.queue;
    this.queue = [];

    let contexts = [];
    let batchedArgs = [];
    for(let [c, a] of queue) {
      contexts.push(c);
      batchedArgs.push(a);
    }
    
    let results = yield processBatchNow(batchedArgs);
    
    for(let i = 0; i < results.length; i++) {
      let r = results[i];
      let c = contexts[i];
      c.resolve(r);
    }
  }),
});
export default Component.extend({
  process: inject.service(),

  processing: reads('task.isRunning'),
  result: reads('task.lastSuccessful.value'),

  start(...args) {
    let p = this.get('process');
    let t = p.enqueue(...args);
    this.set('task', t);
  },

  stop() {
    let t = this.get('task');
    if(!t) {
      return;
    }

    t.canel();
  },
});

Backwards compatibility could be had by turning task modifiers into context modifiers where calling object.get('task').perform() is really calling perform on the implicitly created context

martletandco avatar Apr 15 '17 09:04 martletandco

I see what you're saying about context but it still seems to require a lot of ceremony and wouldn't feel like the rest of EC.

I wonder if adding a new task modifier called .atomic() might actually address this use case:

export Service.extend({
  workUnits: [],

  processSoon: task(function * (...args) {
    this.workUnits.push(args);

    // process after 5s unless restarted again
    yield timeout(5000);

    yield this.get('processNow').perform();
  }).restartable(),

  processNow: task(function * () {
    let workUnits = this.workUnits.slice();
    this.workUnits = [];

    // simulate slow processing...
    yield timeout(10000);

  }).atomic(),
});

.atomic() is like preventCancel() but marks the entire task as uncancelable (which I kinda like more than slicing a task into sections of cancelability).

In the above example, we use the classic EC pattern of debouncing by making processSoon a restartable task with a timeout(), and if it makes it to the end without restarting, then it calls processNow. Process now is .atomic(), which means it can't be canceled, and if you try to cancel a task that yields an atomic task (e.g. processSoon), the cancelation will be blocked until the atomic task completes.

I dunno, still need to flesh out some things but it feels closer to an EC API than going full on configurable context.

machty avatar Jun 14 '17 03:06 machty

I still lean to RSVP.Resolve(this.get('myTask').perform()). It makes a lot of sense to me. That said having a e-c API that simply wraps that concept would be ok. I think I may be too late to the game though.

sukima avatar Jun 19 '17 15:06 sukima

@sukima that definitely works for now, but long term, I think this is the kind of thing that should be specified by the task, and RSVP.resolve() is a caller-centric solution; if you truly have a task that shouldn't be canceled, every caller has to agree to that contract and never change, which is super brittle.

I wouldn't mind though if we had an API that required caller-side changes based on some type/lifetime checking, e.g. someone on Slack suggested that in order to perform an .atomic() task, you have to call it with .performUnlinked() to make it clear what the contract is; simply calling it with .perform() would be a runtime error. Obviously these are prospective theoretically nonexistent APIs that need to be fleshed out, but that's an example of a caller-side API I'd be on board with.

Basically I want EC to be more like Rust.

machty avatar Jun 19 '17 15:06 machty

I could buy into the idea that you label a task as .atomic() and then use performUnlinked(). With readable assertion errors and related template helpers.

sukima avatar Jun 20 '17 13:06 sukima

To keep this conversation up to date with discussions on Slack: It was determined that this use case is not directly related to UI state management which is the current focus of e-c. Primitives to help this use case maybe developed in the future with a context being a strong contender

For those wanting to use this pattern, the approach I'm using in the meantime is creating a deferred for each inbound request and returning its promise. The args and deferred get enqueued, collected after the timeout, and then passed to a plain function to avoid entanglement. That function does the batch work then passes the value to the original request via the deferred. So the e-c task is only used for the debounce and its derived state is not useful. It might be possible for the plain function to act as a bridge into another task so it's derived state can be used

martletandco avatar Jul 12 '17 23:07 martletandco