async icon indicating copy to clipboard operation
async copied to clipboard

Feature request: Flattening StreamQueue

Open lexaknyazev opened this issue 4 years ago • 6 comments

When dealing with unaligned or highly-compressed binary inputs, it's usually convenient to request data byte-by-byte. Of course, doing this directly on IO level is quite limiting, so an intermediate caching layer is desired.

Ad-hoc combination of StreamQueue with a simple wrapper helps with this like:

Stream<List<int>> chunkStream; // input stream from file or network IO

Stream<int> _byteStream(Stream<List<int>> chunkStream) async* {
  await for (final chunk in chunkStream) {
    for (final byte in chunk) {
      yield byte;
    }
  }
}

final byteQueue = StreamQueue<int>(_byteStream(chunkStream));

It would be nice to have this functionality properly implemented internally without extra wrapping overhead:

factory StreamQueue.flatten(Stream<Iterable<T>> source); 

Similar feature is already implemented in package:collection for regular iterables, see CombinedIterableView.

Similar feature may also be implemented as a wrapper for StreamIterator from the core SDK for those who do not need queues.

lexaknyazev avatar Jan 22 '20 15:01 lexaknyazev

final byteQueue = StreamQueue(chunkStream.expand((l) =>l));

Similarly CombinedIterableView can be replaced with iterable.expand((e) => e). I'm not sure how much value it buys to name these concepts...

natebosch avatar Jan 23 '20 00:01 natebosch

CombinedIterableView is slightly more performant (~6% using JIT) than expand((e) => e).

Also, stream.expand has a slightly different behavior than the snippet above: it adds all elements of the inner iterable to the event sink without waiting: https://github.com/dart-lang/sdk/blob/44316eaadef1150b2e9d5375f30fba8004a468aa/sdk/lib/async/stream_pipe.dart#L248-L252

lexaknyazev avatar Jan 23 '20 12:01 lexaknyazev

Makes sense. I think we could accept a PR for this.

natebosch avatar Jan 23 '20 19:01 natebosch

Having a method which flattens a Stream<List<T>> into a Stream<T> would be fine, but embedding it into StreamQueue would probably not be worth it. The use case is too specialized for the functionality to carry its own weight.

lrhn avatar Oct 02 '20 12:10 lrhn

We added Iterable<T> get flattened for iterables. An extension in this package for Stream<T> get flattened would be a good addition.

natebosch avatar Oct 02 '20 18:10 natebosch

Interestingly, we can actually have both of:

extension X1<T> on Stream<Stream<T>> {
  Stream<T> flatten() async* {
    await for (var s in this) yield* s;
  }
}
extension X2<T> on Stream<Iterable<T>> {
  Stream<T> flatten() async* {
    await for (var it in this) for (var e in it) yield e;
  }
}

Can't do that with instance methods!

lrhn avatar Oct 02 '20 19:10 lrhn