mithril.js
mithril.js copied to clipboard
Mithril v2 streams should have a generalized `fold`.
Description
Something like this, where the stream doesn't emit immediately, but correctly and performantly handles folds:
var childStream = sourceStream.fold(function (value) {
// Emit a value
return value
// Skip this emit
return Stream.SKIP
})
This would, for the most part, act like sourceStream.map(func)
, but with the following difference: when the source stream ends, the callback is called once more with value
set to Stream.SKIP
to get the final emit. (This is convenient for some pass-throughs, as one example shows below, and is currently impossible to receive.)
Why
There's several userland helpers where this could simplify a lot and make it a lot easier to properly handle closure. For instance:
// function groupBy<T, K>(s: Stream<T>, k: (v: T) => K): Stream<[K, Stream<T>]>
function groupBy(s, func) {
const acc = new Map()
return s.map(v => [func(v), v)]).fold(([key, v]) => {
if (v === Stream.SKIP) return Array.from(acc)
let target = acc.get(key)
if (target == null) {
target = stream(v)
acc.set(key, target)
return target
} else {
target(v)
return Stream.SKIP
}
})
}
// function take<T>(s: Stream<T>, n: number): Stream<T>
// Note: `n` must be greater than 0
function take(s, n) {
return s.fold(v => {
if (n === 0) return Stream.SKIP
n--
return v
})
}
// function drop<T>(s: Stream<T>, n: number): Stream<T>
// Note: `n` must be greater than 0
function drop(s, n) {
return s.fold(v => {
if (n === 0) return v
n--
return Stream.SKIP
})
}
// function reduce<T, U>(s: Stream<T>, f: (v: T) => U): Stream<U>
function reduce(s, func, init) {
return s.fold(v => {
if (v === Stream.SKIP) return init
init = func(init)
return Stream.SKIP
})
}
In each case, Stream.scan
is not sufficient, and they really need that fused with a conceptual reduce
operation, and that's the basic functionality this fills.
Possible Implementation & Open Questions
Here's a basic (untested) userland implementation of it:
var sentinel = {}
s.fold = (func) {
var child = sentinel
s.end.map(function (t) {
if (t !== true) return
var prev = child
child = null
if (prev == null || prev === sentinel) return
prev(func(Stream.SKIP))
prev.end(true)
})
return child = s.map(func)
}
Of course, this would be way easier to do with access to internal state.
Open questions:
- Should an accumulator value be involved? I'm leaning heavily towards "no" because it's not like you couldn't just maintain one yourself as mutable state.
Is this something you're interested in working on?
Yes. This is not a v2 blocker, however.
Okay, so here's my latest idea: a stream.transform(newStream => value => ignored)
, where value
is the received value (or Stream.SKIP
on closure) and newStream
is the returned stream. This would unsubscribe from the source on target closure, and it'd make for an easier time dealing with operators like "group by" and such where input doesn't map one-to-one with output. I'd then redefine stream.map
in terms of this, where stream.map = f => stream.transform(s => v => v === Stream.SKIP ? s.end(true) : s(f(v)))
. Here's each of the functions from the initial bug implemented using this:
// function groupBy<T, K>(s: Stream<T>, k: (v: T) => K): Stream<[K, Stream<T>]>
function groupBy(s, func) {
const acc = new Map()
return s.transform(t => v => {
if (v === Stream.SKIP) {
t(Array.from(acc))
t.end(true)
} else {
const key = func(v)
const target = acc.get(key)
if (target != null) target(v)
else acc.set(key, Stream(v))
}
})
}
// function take<T>(s: Stream<T>, n: number): Stream<T>
function take(s, n) {
if (!n) { const t = Stream(); t.end(true); return t }
return s.transform(t => v => {
if (v === Stream.SKIP) return t.end(true)
t(v)
if (!--n) t.end(true)
})
}
// function drop<T>(s: Stream<T>, n: number): Stream<T>
function drop(s, n) {
return !n ? s : s.transform(t => v => {
if (v === Stream.SKIP) t.end(true)
else if (n) n--
else t(v)
})
}
// function reduce<T, U>(s: Stream<T>, f: (v: T) => U): Stream<U>
function reduce(s, func, init) {
return s.transform(t => v => {
if (v === Stream.SKIP) { t(init); t.end(true) }
else init = func(init)
})
}
Yes, this is similar to RxJS's .pipe
method, but it's a bit more compact and fits better with the rest of the library idiomatically.