hareactive icon indicating copy to clipboard operation
hareactive copied to clipboard

Derive methods from general accumulators?

Open dmitriz opened this issue 8 years ago • 3 comments

It seems that many methods can be derived from the following general accumulator combinator:

accum<A>(init: Behavior<A>, fnStream: Stream<Behavior<(a: A) => A>>): Behavior<Behavior<A>>

Here accum(init, fnStream) begins with init and is mapped successively through each function in the fnStream:

const accum = (init, fnStream ) => t1 => t2 =>
    fnStream
        .filter({ time } => (time >= t1) && (time <= t2))
        .map({ value } => value(t))
        .reduce((acc, feed)=>feed(acc), init(t))

Now we can derive:

const stepper = (init, stream) =>
    accum(Behavior.of(init), stream.map(a => Behavior.of(b => a)))

const switcher = (init, streamBeh) =>
    accum(init, streamBeh.map(beh => t => a => beh(t)))

const scan = (fn, init, stream) => 
    accum(Behavior.of(init), stream.map(a => Behavior.of(b => fn(a, b)))

Is this correct?

dmitriz avatar Jun 04 '17 19:06 dmitriz

I think that is really smart. I definitely think we should consider implementing it like that. The only thing I'd be worried about is it would affect performance negatively.

paldepind avatar Apr 18 '18 18:04 paldepind

@dmitriz I think there may be some errors in the definitions you posted above. What do you think of the code below? The type of accum and the semantics have been changed slightly.

function accum<A>(
  init: Behavior<A>, fnStream: Stream<(a: A) => Behavior<A>>
): Behavior<Behavior<A>> {
  return t1 => t2 =>
    fnStream
      .filter({ time } => (t1 <= time) && (time <= t2))
      .reduce((acc, {time, value}) => value(acc(time)), init)(t2);
}

const stepper = (init, stream) =>
  accum(Behavior.of(init), stream.map(a => acc => Behavior.of(a)));

const scan = (f, init, stream) => 
  accum(Behavior.of(init), stream.map(a => acc => Behavior.of(f(a, acc))));

const switcher = (init, streamB) =>
  accum(init, streamB.map(beh => acc => beh));

paldepind avatar Jun 01 '18 09:06 paldepind

@paldepind I recall that code was more of a general principle, so yes, there may be errors. Your code may work, where you might need to change the reduce method to take the curried functions instead of the binary ones.

More recently, I have been looking for the simplest possible abstraction doing the job, and this new project came out:

https://github.com/dmitriz/tiny-cps

It mentions the Pull Streams by @dominictarr as one of the inspirations, where a pull stream is implemented by a pure function called with a callback argument, and a CPS function generalizes the pull stream, in which it allows for variadic both inputs and cps-outputs.

The main difference with other stream implementations is the clear distinction between function declarations and function call,s which seems to be missing for the other streams and observables, making those abstractions more complex.

For instance, you would not call scan on a running function, rather on the function itself. Which in this case is equivalent to starting with initial value and update whenever the function returns a (cps-)value and emit the new value. That makes scan a pure operator between CPS functions.

In contrast, applying it to a running stream is like applying to a running function that has been already called. The latter is what causing the problems, is not pure and is not really necessary.

It seems that many problems that streams, observable or promises solve, can be also solved via the CPS functions without any additional abstractions.

I wonder what is your opinion about it?

dmitriz avatar Jul 01 '18 08:07 dmitriz