Derive methods from general accumulators?
It seems that many methods can be derived from the following general accumulator combinator:
accum<A>(init: Behavior<A>, fnStream: Stream<Behavior<(a: A) => A>>): Behavior<Behavior<A>>
Here accum(init, fnStream) begins with init and is mapped successively through each function in the fnStream:
const accum = (init, fnStream ) => t1 => t2 =>
fnStream
.filter({ time } => (time >= t1) && (time <= t2))
.map({ value } => value(t))
.reduce((acc, feed)=>feed(acc), init(t))
Now we can derive:
const stepper = (init, stream) =>
accum(Behavior.of(init), stream.map(a => Behavior.of(b => a)))
const switcher = (init, streamBeh) =>
accum(init, streamBeh.map(beh => t => a => beh(t)))
const scan = (fn, init, stream) =>
accum(Behavior.of(init), stream.map(a => Behavior.of(b => fn(a, b)))
Is this correct?
I think that is really smart. I definitely think we should consider implementing it like that. The only thing I'd be worried about is it would affect performance negatively.
@dmitriz I think there may be some errors in the definitions you posted above. What do you think of the code below? The type of accum and the semantics have been changed slightly.
function accum<A>(
init: Behavior<A>, fnStream: Stream<(a: A) => Behavior<A>>
): Behavior<Behavior<A>> {
return t1 => t2 =>
fnStream
.filter({ time } => (t1 <= time) && (time <= t2))
.reduce((acc, {time, value}) => value(acc(time)), init)(t2);
}
const stepper = (init, stream) =>
accum(Behavior.of(init), stream.map(a => acc => Behavior.of(a)));
const scan = (f, init, stream) =>
accum(Behavior.of(init), stream.map(a => acc => Behavior.of(f(a, acc))));
const switcher = (init, streamB) =>
accum(init, streamB.map(beh => acc => beh));
@paldepind
I recall that code was more of a general principle, so yes, there may be errors.
Your code may work, where you might need to change the reduce method to take the curried functions instead of the binary ones.
More recently, I have been looking for the simplest possible abstraction doing the job, and this new project came out:
https://github.com/dmitriz/tiny-cps
It mentions the Pull Streams by @dominictarr as one of the inspirations, where a pull stream is implemented by a pure function called with a callback argument, and a CPS function generalizes the pull stream, in which it allows for variadic both inputs and cps-outputs.
The main difference with other stream implementations is the clear distinction between function declarations and function call,s which seems to be missing for the other streams and observables, making those abstractions more complex.
For instance, you would not call scan on a running function, rather on the function itself. Which in this case is equivalent to starting with initial value and update whenever the function returns a (cps-)value and emit the new value.
That makes scan a pure operator between CPS functions.
In contrast, applying it to a running stream is like applying to a running function that has been already called. The latter is what causing the problems, is not pure and is not really necessary.
It seems that many problems that streams, observable or promises solve, can be also solved via the CPS functions without any additional abstractions.
I wonder what is your opinion about it?