mithril.js icon indicating copy to clipboard operation
mithril.js copied to clipboard

Mithril v2 streams should have a generalized `fold`.

Open dead-claudia opened this issue 5 years ago • 1 comments

Description

Something like this, where the stream doesn't emit immediately, but correctly and performantly handles folds:

var childStream = sourceStream.fold(function (value) {
	// Emit a value
	return value

	// Skip this emit
	return Stream.SKIP
})

This would, for the most part, act like sourceStream.map(func), but with the following difference: when the source stream ends, the callback is called once more with value set to Stream.SKIP to get the final emit. (This is convenient for some pass-throughs, as one example shows below, and is currently impossible to receive.)

Why

There's several userland helpers where this could simplify a lot and make it a lot easier to properly handle closure. For instance:

// function groupBy<T, K>(s: Stream<T>, k: (v: T) => K): Stream<[K, Stream<T>]>
function groupBy(s, func) {
	const acc = new Map()
	return s.map(v => [func(v), v)]).fold(([key, v]) => {
		if (v === Stream.SKIP) return Array.from(acc)
		let target = acc.get(key)
		if (target == null) {
			target = stream(v)
			acc.set(key, target)
			return target
		} else {
			target(v)
			return Stream.SKIP
		}
	})
}

// function take<T>(s: Stream<T>, n: number): Stream<T>
// Note: `n` must be greater than 0
function take(s, n) {
	return s.fold(v => {
		if (n === 0) return Stream.SKIP
		n--
		return v
	})
}

// function drop<T>(s: Stream<T>, n: number): Stream<T>
// Note: `n` must be greater than 0
function drop(s, n) {
	return s.fold(v => {
		if (n === 0) return v
		n--
		return Stream.SKIP
	})
}

// function reduce<T, U>(s: Stream<T>, f: (v: T) => U): Stream<U>
function reduce(s, func, init) {
	return s.fold(v => {
		if (v === Stream.SKIP) return init
		init = func(init)
		return Stream.SKIP
	})
}

In each case, Stream.scan is not sufficient, and they really need that fused with a conceptual reduce operation, and that's the basic functionality this fills.

Possible Implementation & Open Questions

Here's a basic (untested) userland implementation of it:

var sentinel = {}
s.fold = (func) {
	var child = sentinel
	s.end.map(function (t) {
		if (t !== true) return
		var prev = child
		child = null
		if (prev == null || prev === sentinel) return
		prev(func(Stream.SKIP))
		prev.end(true)
	})
	return child = s.map(func)
}

Of course, this would be way easier to do with access to internal state.

Open questions:

  • Should an accumulator value be involved? I'm leaning heavily towards "no" because it's not like you couldn't just maintain one yourself as mutable state.

Is this something you're interested in working on?

Yes. This is not a v2 blocker, however.

dead-claudia avatar Jul 01 '19 01:07 dead-claudia

Okay, so here's my latest idea: a stream.transform(newStream => value => ignored), where value is the received value (or Stream.SKIP on closure) and newStream is the returned stream. This would unsubscribe from the source on target closure, and it'd make for an easier time dealing with operators like "group by" and such where input doesn't map one-to-one with output. I'd then redefine stream.map in terms of this, where stream.map = f => stream.transform(s => v => v === Stream.SKIP ? s.end(true) : s(f(v))). Here's each of the functions from the initial bug implemented using this:

// function groupBy<T, K>(s: Stream<T>, k: (v: T) => K): Stream<[K, Stream<T>]>
function groupBy(s, func) {
	const acc = new Map()
	return s.transform(t => v => {
		if (v === Stream.SKIP) {
			t(Array.from(acc))
			t.end(true)
		} else {
			const key = func(v)
			const target = acc.get(key)
			if (target != null) target(v)
			else acc.set(key, Stream(v))
		}
	})
}

// function take<T>(s: Stream<T>, n: number): Stream<T>
function take(s, n) {
	if (!n) { const t = Stream(); t.end(true); return t }
	return s.transform(t => v => {
		if (v === Stream.SKIP) return t.end(true)
		t(v)
		if (!--n) t.end(true)
	})
}

// function drop<T>(s: Stream<T>, n: number): Stream<T>
function drop(s, n) {
	return !n ? s : s.transform(t => v => {
		if (v === Stream.SKIP) t.end(true)
		else if (n) n--
		else t(v)
	})
}

// function reduce<T, U>(s: Stream<T>, f: (v: T) => U): Stream<U>
function reduce(s, func, init) {
	return s.transform(t => v => {
		if (v === Stream.SKIP) { t(init); t.end(true) }
		else init = func(init)
	})
}

Yes, this is similar to RxJS's .pipe method, but it's a bit more compact and fits better with the rest of the library idiomatically.

dead-claudia avatar Oct 03 '19 09:10 dead-claudia