rebellion icon indicating copy to clipboard operation
rebellion copied to clipboard

Splitting a stream into separately transducible substreams

Open jackfirth opened this issue 5 years ago • 8 comments

Key idea: combine a bunch of subtransducers into a single transducer that operates on a stream of variant? values, and uses each variant's tag to figure out which subtransducer to feed it to. Emissions should also be tagged variants, so you can tell which subtransducer produced which values. Signature:

(forking [#:<kw> subtransducer transducer?] ...+) -> transducer?

Example:

> (transduce (list (variant #:up "Hello")
                   (variant #:down "world")
                   (variant #:up "Goodbye")
                   (variant #:up "world"))
             (forking #:up (mapping string-upcase)
                      #:down (mapping string-downcase))
             #:into into-list)
(list (variant #:up "HELLO")
      (variant #:down "world")
      (variant #:up "GOODBYE")
      (variant #:up "WORLD"))

jackfirth avatar Nov 13 '19 07:11 jackfirth

If they both emit an element, which one ends up being emitted first?

In the middle of the stream, my first guess would be that it goes like this:

  1. We see that the next input element belongs to the #:up variant.
  2. Transducer #:up receives that element.
  3. Transducer #:up emits however many elements it wants to emit, until it's blocked on an input again.
  4. We see that the next input element belongs to the #:down variant.
  5. Transducer #:down receives that element.
  6. Transducer #:down emits however many elements it wants to emit, until it's blocked on an input again.
  7. ...

This approach is optimal for expressiveness if there can be loops in the transducer composition: It ensures that as much information as possible is passed downstream before we block, which can equip other parts of the system with the information they need to unblock us again.

On the other hand, it has a property that may be surprising: If one transducer begins to emit an infinite list, the other is never visited again. I don't suppose transducers are expressive enough to capture the idea of an "event emitter transformer," but if you were planning to use them that way, you might like an interleaving operation that interleaves events based on their timestamps or uses some kind of race condition arbiter.

The beginning or end of the stream is another story: At the beginning, if both transducers emit an element before even one element is consumed, then the above guess of mine does nothing to determine which one of the emits should be handled first. At the end, every transducer will be informed that the stream has ended, and in response they'll all try to emit their final elements.

I suppose my first guess is that in these ambiguous cases, the leftmost transducer passed to the forking call is processed first, or perhaps that the first variant name in alphabetical order is processed first, or something like that.

For a transducer library that's otherwise so deterministic, these tiebreakers would seem a bit out of place. It seems more like something that should be decided using an explicit argument to forking. Or maybe forking should just raise an exception if two transducers try to emit at the beginning or two try to emit at the end... but that seems like an awkward contract to express.

In summary:

  1. How does forking interleave one transducer's emits with the other transducer's reads and emits? I'm guessing it eagerly emits elements until all the transducers are blocked on a read, but I don't know if this is what you have in mind.

  2. That guess of mine doesn't account for what happens at the beginning or end of the stream, when more than one transducer may be unblocked at a time. If multiple transducers are trying to emit at the start (before reading a single element) or at the end (after reading the end of the stream), what kind of interleaving do those emits undergo?

rocketnia avatar Nov 13 '19 10:11 rocketnia

On the other hand, it has a property that may be surprising: If one transducer begins to emit an infinite list, the other is never visited again.

This seems obvious to me. But I have it ingrained into my brain that transducers have deterministic control flow; at no point in time does the next step in a transduce pipeline depend on whether one blocking I/O operation finishes faster than another. You'd need something like reactive streams for that. Under the constraint of deterministic control flow, forking cannot sensibly interleave emissions and an infinitely-emitting transducer must clog the pipeline.

I think the name forking is too suggestive that there's hidden parallelism or concurrency going on. Maybe something like selecting-variant or choosing-variant?

For the beginning and the end of the stream... I honestly have no idea. Consider the use case in #348 combined with the insertion transducers in #346. What if the transducers insert things at the beginning or end of the stream?

(transduce (in-range 100)
           (splitting-at
               50
               (transducer-pipe (taking 5)
                                (inserting-before 'a)
                                (inserting-after 'b))
               (transducer-pipe (taking 3)
                                (inserting-before 'x)
                                (inserting-after 'y)))
           #:into into-list)

...what should that be? My gut tells me most people would expect it to be (list 'a 0 1 2 3 4 'b 'x 50 51 52 'y), but I don't see how an implementation based on forking could possibly do that.

I'm going to mark this as needs api design for now, since I think more use cases are necessary to figure this out.

jackfirth avatar Nov 13 '19 19:11 jackfirth

Minor update: I created the needs use cases label for issues like this.

jackfirth avatar Nov 13 '19 19:11 jackfirth

Under the constraint of deterministic control flow, forking cannot sensibly interleave emissions and an infinitely-emitting transducer must clog the pipeline.

The following behavior would be highly arbitrary, but it would still be deterministic:

Associate each transducer with a "pending input queue" that starts out empty.
Reading each input value until they run out:
    Find the appropriate transducer for this value's variant.
    Add the value to its pending input queue.
    Run the "tick" subroutine.
Until every transducer has run out of pending emissions:
    Run the "tick" subroutine.
End the output.

The "tick" subroutine processes a little bit of each transducer, like so:
    For each transducer, in the alphabetical order of their variant names:
        Let N begin as 0.
        While N is less than 1 plus the UTF-8 byte length of the variant name:
            If the transducer is blocked on an input:
                If the transducer's pending input queue is nonempty:
                    Pass in a value from the queue.
                Else:
                    Break the while loop, thus continuing the for loop.
            Else if the transducer is emitting something:
                Emit that thing.
                Increment N.
            Else:
                Break the while loop, thus continuing the for loop.

I think the name forking is too suggestive that there's hidden parallelism or concurrency going on.

This whole streaming library embodies a kind of concurrency! The very fact that the input stream is made of interleaved substreams with variant tags suggests that those substreams are being made somewhere out there in some kind of concurrent way.

More specifically, looking at this as a stream-valued reactive programming language, this operation is a conditional branch. It takes a stream of events (the subject of the match), does fan-out to various subexpressions according to what branch each event is a match for, and does fan-in of the result streams those subexpressions produce. So maybe a good name would be branching-on-variant or matching-variant.

The inputs forking receives can be interleaved in any order. This means we don't have very much to go on when deciding what order the outputs should be interleaved in. If only we knew each variant's inputs were received in a sorted order, we would know we should sort the output messages likewise, but we don't have that luxury.

But it seems you and I at least share the intuition that, in the middle of a stream, forking should emit all the emissions that remain in the transducer it just unblocked. To put it another way, it should only block as a last resort. If we take that as a settled matter, then the only part we have left to determine is the behavior at the beginning and the end of the stream.

Suppose we consider a subset of transducers that are well-behaved enough that they don't encounter that gray area at all: The only time they emit is in response to some recent "receive" -- never before their first "receive" or after their "half-close."

A surprising number of operations preserve that protocol: Of particular note are transducer-pipe, mapping, peeking, folding, and enumerating, a set of operations which even preserve the stricter protocol "receive; emit; receive; emit; ...; receive; emit" for synchronous reactive programming. Then there are append-mapping, taking, taking-while, dropping, dropping-while, deduplicating, deduplicating-consecutive, and even splitting-at.

Operations that don't obey this protocol seem relatively rare, but they include sorting, batching, inserting-before, and inserting-after.

Considering all the operations that preserve this protocol, I suppose this protocol might see a lot of use, so I really like the idea of enforcing it in a contract. I suppose forking would be merely the first operation in Rebellion where that contract makes sense. In general, I bet it'll be useful for any transducer operation that performs a fan-in from many streams to one.

rocketnia avatar Nov 14 '19 08:11 rocketnia

The following behavior would be highly arbitrary, but it would still be deterministic:

When I say that forking cannot sensibly interleave emissions, the word "sensibly" is doing a lot of work. I don't find the behavior you described sensible. It breaks the invariant that I should be able to rename statically-chosen names (such as keywords) without changing runtime behavior.

This whole streaming library embodies a kind of concurrency!

No, it doesn't. At no point is there any dependency on time, and given a known sequence of inputs the execution schedule of each transducer is always fixed in a deterministic way. Deterministic concurrency would be if the outputs are deterministic, but the execution schedule isn't and can be chosen arbitrarily by some scheduler.

Granted, I haven't actually written down the rules I use to decide whether some schedule is "correct" or not, but there are rules. Such as "call the finisher as soon as possible" and "only consume as a last resort". The rebellion/streaming libraries are not concurrency libraries.

Suppose we consider a subset of transducers that are well-behaved enough that they don't encounter that gray area at all: The only time they emit is in response to some recent "receive" -- never before their first "receive" or after their "half-close."

I have thought about it, and explored defining various subtypes of transducers. But I couldn't find a subtype that had enough use cases of its own to justify the added API complexity of a transducer type hierarchy. There would need to be multiple useful combinators that are only possible on transducers of that subtype, and those combinators would need to be able to express useful real-world stream processing tasks.

jackfirth avatar Nov 14 '19 19:11 jackfirth

The following behavior would be highly arbitrary, but it would still be deterministic:

When I say that forking cannot sensibly interleave emissions, the word "sensibly" is doing a lot of work. I don't find the behavior you described sensible. It breaks the invariant that I should be able to rename statically-chosen names (such as keywords) without changing runtime behavior.

Agreed.

I suppose I wasn't sure what would budge first: The name-invariance, the argument-order-invariance, the determinism, the contract, or the inclusion of forking in the library altogether.

I get the impression some streaming libraries are designed to process asynchronous events, which would make race conditions a likely source of indeterminism. I didn't expect you to want indeterminism, but I figured I oughta say something before you tried to tackle another streaming library's use cases and got it by accident. :)

This whole streaming library embodies a kind of concurrency!

No, it doesn't. At no point is there any dependency on time, and given a known sequence of inputs the execution schedule of each transducer is always fixed in a deterministic way. Deterministic concurrency would be if the outputs are deterministic, but the execution schedule isn't and can be chosen arbitrarily by some scheduler.

I suppose you prefer a stricter definition of "concurrency" than I'm used to seeing. I can hardly object, since I like a much more strict definition of "declarative" than basically anyone else uses. I actually prefer a very wide-ranging definition of concurrency, but that probably doesn't matter right now. XD

As for what I'm used to seeing, I've heard people refer to coroutines as a kind of standard example of non-parallel concurrency. Coroutines have no indeterminism in outputs and no indeterminism in scheduling. Nevertheless, the execution timelines of two coroutines can overlap. During the overlap, their computations-in-progress are concurrent with each other.

If we take a system of coroutines, name one of them the "scheduler," and pretend it isn't part of the system, then we've essentially created an indeterministic system out of a deterministic one. I suppose that's roughly the time you'd start to call it concurrency, right?


Idea:

Hmm, if forking is close to being a conditional branch on well-behaved transducers, what would a conditional branch for all transducers look like? I think I've got it: The input stream currently carries just the elements, but it needs to also carry "open" and "close" events that let us know where in the stream a transducer's first and last emissions should be placed:

(open #:up)
(element #:up "Hello,")
(open #:down)
(element #:down "Goodbye")
(close #:down)
(element #:up "World")
(close #:up)

rocketnia avatar Nov 15 '19 03:11 rocketnia

The input stream currently carries just the elements, but it needs to also carry "open" and "close" events that let us know where in the stream a transducer's first and last emissions should be placed

That would certainly work, but I don't see any use cases for it.

jackfirth avatar Nov 15 '19 04:11 jackfirth

I don't either. I guess that makes it a solution in search of a problem at this point.

rocketnia avatar Nov 16 '19 01:11 rocketnia