sdk icon indicating copy to clipboard operation
sdk copied to clipboard

Can't write data to an IOSink when a flush is in progress

Open tvolkert opened this issue 8 years ago • 10 comments

https://gist.github.com/tvolkert/0967e7676686e96c54a921e26f1161b3

The example shows that you can't write data to an IOSink while a flush is in progress. Doing so results in a StateError ("StreamSink is bound to a stream").

Ideally, you'd be able to write to the sink, and the flush future would just complete later than it originally would have... but in absence of that behavior, at the very least the API docs should specify the fact that this is unsupported. https://api.dartlang.org/stable/1.20.1/dart-io/IOSink/add.html, for instance, doesn't mention anything on the subject.

tvolkert avatar Feb 05 '17 06:02 tvolkert

@zanderso

tvolkert avatar Feb 05 '17 06:02 tvolkert

The write should not delay the flush. Only data written prior to calling flush is flushed. The question is whether you need to wait for the flush to complete before writing more data, and currently you do. That should at least be reflected in the documentation.

Its obviously possible to add a buffer that delays writes added during a flush, but it's probably not worth it - the user just needs to await the flush operation.

lrhn avatar Feb 06 '17 08:02 lrhn

Side note: the message "StreamSink is bound to a stream" is also a little confusing, since the caller never called addStream().

tvolkert avatar Feb 06 '17 16:02 tvolkert

the user just needs to await the flush operation

The problem with this approach is that since none of the IOSink write operations return futures, it's common / encouraged to use them in synchronous code. The pattern is "write quickly to my IOSink in this synchronous method, and I'll later await the data being written out in async code." And in this design, it's impossible to await the flush operation.

Case in point: you have a StreamSubscription whose onData callback needs to, among other things, write some data to an IOSink. StreamSubscription.onData must be a synchronous callback, so it has no way to know if the call to IOSink.write* is going to fail or not.

Thus, I actually think this is more than just a documentation issue.

tvolkert avatar Feb 07 '17 01:02 tvolkert

There is a conflict between flush actually doing what you'd expect (writing out everything before you continue) and being asynchronous. It should probably be blocking - it must flush before the next statement is executed, otherwise a stdout.flush();exit(1); won't work anyway.

The alternative - to not flush immediately, but somehow asynchronously cause a flush as-soon-as-possible - is what it's doing, and it means that you must await the flush call before doing something else. It's not really true that the API is not asynchronus - it's a non-blocking API, which means that it is exactly asynchronous. You can use it in synchronous code, but your output will be delayed, and the flush is what is needed to synchronize your code with the async code of the IOSink.

The flush also does double duty in that it returns any error that happened on an add call. If that was the only async thing it did, then it shouldn't need to block the sink.

...

I've now read the code for flush, and it's pretty far from doing direct file-descriptor work. I think the stdout class will need a complete rewrite for it to be able to be synchronous here - there are too many stream controllers between the sink and the file descriptor, and no way to manage the buffering.

lrhn avatar Feb 07 '17 06:02 lrhn

So in the meantime, do you see any good way to handle the following scenario? In my StreamSubscription's onData callback, I need to write to sink, yet the sink may have a pending flush operation (a caller may have recently asked for my serializedValue or for my complete future).

(code has been simplified for brevity's sake)

class StreamWrapper {
  final StreamController<String> controller = new StreamController();
  final Completer<Null> completer = new Completer();
  final List<String> data = [];
  final Stream<String> stream;
  final File file;
  final IOSink sink;

  StreamWrapper(this.stream, this.file) : sink = file.openWrite() {
    stream.listen(
      (String value) {
        data.add(value);
        sink.write(value);
        controller.add(value);
      },
      onError: (dynamic error, StackTrace stackTrace) {
        controller.addError(error, stackTrace);
      },
      onDone: () {
        sink.close();
        completer.complete();
        controller.close();
      },
    );
  }

  Stream<String> get rawValue => controller.stream;

  List<String> get recordedValue => data;

  Future<String> get serializedValue async {
    await sink.flush();
    return '!${file.path}';
  }

  Future<Null> get complete {
    return Future.wait([completer.future, sink.flush()]);
  }
}

tvolkert avatar Feb 07 '17 18:02 tvolkert

Not sure if precisely this will work, but maybe it will give you an idea:

Can you record the Future from sink.flush() somewhere, then in onData: check if it's null. If it's null, directly do sink.write(), otherwise add a .then() to the future that does sink.write(). Basically use the Future from sink.flush() as a buffering mechanism.

zanderso avatar Feb 07 '17 18:02 zanderso

Yeah that may work - I'll give it a shot.

tvolkert avatar Feb 07 '17 18:02 tvolkert

This issue, and particularly the "bound to a stream" message, cost me quite some time today; there's nothing about it in the docs, I was looking for anywhere that was using "addStream" and there wasn't one, it was only when I happened to check the implementation of "flush" that I worked it out.

+1 for some improvement here please :)

davidmorgan avatar Feb 16 '24 17:02 davidmorgan

FYI @brianquinlan

zanderso avatar Feb 16 '24 19:02 zanderso