ember-concurrency
ember-concurrency copied to clipboard
Awkward use case: batch+debounce
After my EmberConf prezo, devs (please comment up if this was you! :) ) were explaining a use case that wasn't elegantly handled by EC. It was something like:
We want to throttle / debounce our performs, such that each
perform
of a task stashes the args in an array, and after some period of time, that array is passed to some batch processor for processing. We tried this with EC but there was an awkward issue of cancelation / entanglement that we couldn't find an elegant solution to.
I'm guessing the code looked something like:
// entry point; enqueue the batched args, and "kick"
// the debounced restartable processSoon task
enqueueForProcessing(...args) {
this.batchedArgs.push(args);
this.get('processSoon').perform();
},
processSoon: task(function * () {
yield timeout(500); // debounce
let batchedArgs = this.batchedArgs;
this.batchedArgs = [];
// this is the weird part; this processBatchNow is still
// part of the the restartable `processSoon` task, so if
// `enqueueForProcessing` for processing gets called and
// it kicks `processSoon`, it'll restart right in the middle
// of `processBatchNow`. And even if `processBatchNow()` were NOT
// restartable (because, say, it's just a function that returns
// an uncancelable promise), we'd still need to honor the
// constraint to not concurrently run `processBatchNow`.
yield processBatchNow(batchedArgs);
}).restartable(),
I can't think of a way to model this with existing EC primitives that doesn't end up looking like the kind of code state-machine-y crappy code EC was designed to prevent. Maybe there's an answer to adding api to define uncancelable regions of tasks? Something like:
import { task, preventCancel } from 'ember-concurrency';
// ...
processSoon: task(function * () {
yield timeout(500); // debounce
let batchedArgs = this.batchedArgs;
this.batchedArgs = [];
yield preventCancel();
yield processBatchNow(batchedArgs);
}).restartable(),
Or, quite possibly, there's something just a little off about EC's concept of "entanglement" (where anything you yield gets linked to you and canceled if you get canceled, etc). Anywho, let's discuss.
Hey, it was @wytlytningNZ and I!
You've described the issue well. We haven't attempted it yet but we're wanting to use a bundling API endpoint to reduce network load: collect requests up over time, request a 'bundle', then unwrap it and pass back individual responses
Would flagging a task as uncancelable allow other tasks to run concurrently?
I've run across this "issue" before. My scenario was similar, but I felt like I was trying to do something that didn't fit what EC was trying to help with. I wanted to debounce user actions, and after a period of time yield a potentially long-running request to the server.
At the time, I felt like the best all-around solution was to allow canceling these long-running server requests (which also meant low-level database op cancellation). This was implemented with some albeit complex code that overrides the ajax management by using response IDs and supporting (on the server) DELETE calls when an instance is cancelled. A bit of a mess (on both the client and server) and a bit non-standard, but it works.
After thinking about it more, it seems like EC could potentially help with this scenario. Unfortunately for me, I think I would still need to do some variation of what I am doing right now. E.g. if a user decides 2 seconds into the operation that they want to change something, I shouldn't disallow them to do that...
Anyway, I like the preventCancel
idea. It's almost like stating that the instance has reached its point of no return and should not be restartable. I assume then that additional requests during this period would be dropped?
I guess another idea is to make processBatchNow
another task, and first check that it is not running, but this could be a potential race condition location. E.g.
import { task, preventCancel } from 'ember-concurrency';
// ...
processBatchNow: task(function*(batchedArgs){
// long-running ops ...
}).drop(),
processSoon: task(function * () {
yield timeout(500); // debounce
let batchedArgs = this.batchedArgs;
this.batchedArgs = [];
if (!this.get("processBatchNow.isRunning")){
yield this.get("processBatchNow").perform(batchedArgs);
}
}).restartable(),
A side-note: If the child task (processBatchNow
) has concurrency of 1 and drops additional requests, this may work without the isRunning
check... with a side-effect that your state arguments and actual-running arguments could be out of sync.
So, apparently I was wrong in my assumption of the child task instance orchestration. This twiddle shows that it appears that even the child task (marked with drop
) is getting cancelled when the parent is restarted...
I need to do some more tinkering with this..
The batch processing should run concurrently, having only one instance at a time can be achieved with the enqueue
or keepLatest
modifier, or no modifier at all
I'm not sure if EC should be used in this way. I don't want to muddy the API with a use case that doesn't make sense. To tie batching back to a UI I'd have a task for each operation in the batch, so cancelation would be handled upstream. That's only because in this case the operations are network requests that aren't worth the hassle to cancel. A plain promise implementation would be along the lines of:
enqueueForProcessing(...args) {
let d = RSVP.defer();
this.queue.push([d, args]);
run.debounce(this, 'process', 500);
return d.promise;
},
process() {
let batchedArgs = this.queue;
this.queue = [];
let deferreds = [];
let batch = [];
for(let [d, args] in batchedArgs) {
deferreds.push(d);
batch.push(args);
}
let results = batchProcess(batch);
for(let i = 0; i < results.length; i++) {
// with more error handling, reject etc
deferreds[i].resolve(results[i]);
}
}
-- pause for reflection --
Writing this out as given me another idea to try which might drive out a need around the derived state
I've created a demo showing that passing it off to another task does work (derived state is fine). The processSoon
task does the debounce and gather, then kicks of another task (processNow
) without holding a reference to it. I've yet to verify all cancellation scenarios work
enqueueForProcessing(...args) {
this.batchedArgs.push(args);
this.get('processSoon').perform();
},
processSoon: task(function * () {
yield timeout(500); // debounce
let batchedArgs = this.batchedArgs;
this.batchedArgs = [];
// start 'isolated' task
this.get('processNow').perform(batchedArgs);
}).restartable(),
processNow: task(function * (batchedArgs) {
let results = yield processBatchNow(batchedArgs);
this.results.push(results.join(', '));
}),
So this works but it has a few drawbacks which turn up if you're interested in individual and/or batch task state
One way to think about this data flow is the no task is really cancelled, just delayed until the batch task starts, so processSoon tasks are grouped then tied to a processNow task. There could be a mechanism to make a TaskInstance reliant on a different context, where it could also pull data from and yield for it's own, final, value
For tasks with a maxConcurrency of more than one, it would be nice to opt in to derived state for each TaskInstance
I'll try put up a proposed API when I'm on a computer
I just saw that you've touched on the maxConcurrency > 1 derived state issue in #132
This is what I was thinking around how a context API could be used to make batching less awkward. It's similar to the preventCancel
in the opening comment but retains a reference, instead of it going into a blackhole of cancel-lessness
Concerns of concurrency limiting, derived state, and cancelation are moved to the context
so that it can be changed dynamically. It also means that it can be passed around like Promise can, allowing flows like https://github.com/poteto/ember-pipeline
import { task, context, switchContext } from 'ember-concurrency';
export default Service.extend({
queue: [],
init(){
// new context that prevents children's cancels from bubbling
this.currentContext = context({
task: this.process,
restartable: true,
cancelFromDecendants: false,
});
},
cancel() {
let c = this.currentContext;
return c.cancel().then(c.reset); // or just create a new context?
},
enqueue(...args){
// Allow all tasks to be cancelled at once
let c = this.currentContext.derive();
this.queue.push([c, args]);
this.currentContext.perform();
return c;
},
process: task(function * () {
yield timeout(500); // debounce
let batchContext = this.currentContext.derive({ restartable: false });
// alternative: context({ restartable: false, parent: this.currentContext });
// detach from restartable context to isolate from debounce/restart
yield switchContext(batchContext); // or context.switch(batchContext)
let queue = this.queue;
this.queue = [];
let contexts = [];
let batchedArgs = [];
for(let [c, a] of queue) {
contexts.push(c);
batchedArgs.push(a);
}
let results = yield processBatchNow(batchedArgs);
for(let i = 0; i < results.length; i++) {
let r = results[i];
let c = contexts[i];
c.resolve(r);
}
}),
});
export default Component.extend({
process: inject.service(),
processing: reads('task.isRunning'),
result: reads('task.lastSuccessful.value'),
start(...args) {
let p = this.get('process');
let t = p.enqueue(...args);
this.set('task', t);
},
stop() {
let t = this.get('task');
if(!t) {
return;
}
t.canel();
},
});
Backwards compatibility could be had by turning task modifiers into context modifiers where calling object.get('task').perform()
is really calling perform on the implicitly created context
I see what you're saying about context
but it still seems to require a lot of ceremony and wouldn't feel like the rest of EC.
I wonder if adding a new task modifier called .atomic()
might actually address this use case:
export Service.extend({
workUnits: [],
processSoon: task(function * (...args) {
this.workUnits.push(args);
// process after 5s unless restarted again
yield timeout(5000);
yield this.get('processNow').perform();
}).restartable(),
processNow: task(function * () {
let workUnits = this.workUnits.slice();
this.workUnits = [];
// simulate slow processing...
yield timeout(10000);
}).atomic(),
});
.atomic()
is like preventCancel()
but marks the entire task as uncancelable (which I kinda like more than slicing a task into sections of cancelability).
In the above example, we use the classic EC pattern of debouncing by making processSoon
a restartable task with a timeout()
, and if it makes it to the end without restarting, then it calls processNow
. Process now is .atomic()
, which means it can't be canceled, and if you try to cancel a task that yields an atomic task (e.g. processSoon
), the cancelation will be blocked until the atomic task completes.
I dunno, still need to flesh out some things but it feels closer to an EC API than going full on configurable context.
I still lean to RSVP.Resolve(this.get('myTask').perform())
. It makes a lot of sense to me. That said having a e-c API that simply wraps that concept would be ok. I think I may be too late to the game though.
@sukima that definitely works for now, but long term, I think this is the kind of thing that should be specified by the task, and RSVP.resolve() is a caller-centric solution; if you truly have a task that shouldn't be canceled, every caller has to agree to that contract and never change, which is super brittle.
I wouldn't mind though if we had an API that required caller-side changes based on some type/lifetime checking, e.g. someone on Slack suggested that in order to perform an .atomic()
task, you have to call it with .performUnlinked()
to make it clear what the contract is; simply calling it with .perform()
would be a runtime error. Obviously these are prospective theoretically nonexistent APIs that need to be fleshed out, but that's an example of a caller-side API I'd be on board with.
Basically I want EC to be more like Rust.
I could buy into the idea that you label a task as .atomic()
and then use performUnlinked()
. With readable assertion errors and related template helpers.
To keep this conversation up to date with discussions on Slack: It was determined that this use case is not directly related to UI state management which is the current focus of e-c
. Primitives to help this use case maybe developed in the future with a context
being a strong contender
For those wanting to use this pattern, the approach I'm using in the meantime is creating a deferred for each inbound request and returning its promise. The args
and deferred
get enqueued, collected after the timeout, and then passed to a plain function to avoid entanglement. That function does the batch work then passes the value to the original request via the deferred. So the e-c
task is only used for the debounce and its derived state is not useful. It might be possible for the plain function to act as a bridge into another task so it's derived state can be used