sdk icon indicating copy to clipboard operation
sdk copied to clipboard

IOSink.add throws exceptions that cannot be caught

Open Hixie opened this issue 3 years ago • 19 comments

How do I catch the exception thrown in the code below?

import 'dart:io';

void main() async {
  try {
    final Process process = await Process.start('echo', <String>[]);
    process.stdin.add([1,2,3]); // throws an exception                                                                                            
  } catch (error) {
    print('$error\n'); // does not catch it... :-(                                                                                                
  }
}

For the record the exception I'm talking about is:

Unhandled exception:
SocketException: Write failed (OS Error: Broken pipe, errno = 32), port = 0
#0      _NativeSocket.write (dart:io-patch/socket_patch.dart:1190:34)
#1      _RawSocket.write (dart:io-patch/socket_patch.dart:1906:15)
#2      _Socket._write (dart:io-patch/socket_patch.dart:2346:18)
#3      _SocketStreamConsumer.write (dart:io-patch/socket_patch.dart:2094:26)
#4      _SocketStreamConsumer.addStream.<anonymous closure> (dart:io-patch/socket_patch.dart:2068:11)
#5      _RootZone.runUnaryGuarded (dart:async/zone.dart:1618:10)
#6      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:341:11)
#7      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:7)
#8      _SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:774:19)
#9      _StreamController._add (dart:async/stream_controller.dart:648:7)
#10     _StreamController.add (dart:async/stream_controller.dart:596:5)
#11     _StreamSinkImpl.add (dart:io/io_sink.dart:136:17)
#12     _Socket.add (dart:io-patch/socket_patch.dart:2193:38)
#13     _StdSink.add (dart:io/stdio.dart:327:11)
#14     main (file:///home/ianh/dev/dsh/example/test.dart:6:19)
<asynchronous suspension>

Hixie avatar Mar 03 '22 01:03 Hixie

The add goes through a StreamController which sends the data as an event. The error is triggered at the point where that data event is received, which is in another event, so it can't throw the error back.

I don't know how much of this is intentional, and what is just an accident of implementation. Leaving that for a dart:io developer to look at.

As for catching it ... I'm not sure you can. The runZonedGuarded should prevent any error from coming back, and it's run in the root zone. You can try wrapping the code in a zone with an uncaught error handler, and see if that works.

lrhn avatar Mar 03 '22 13:03 lrhn

Greetings, @Hixie. If you call process.stdin.flush() and await the Future, you can catch the expected error.

void main() async {
  try {
    final Process process = await Process.start('echo', <String>[]);
    process.stdin.add([1, 2, 3]);
    // as per the documentation of .add you have to call .flush or .done to get
    // any errors generated by the .add call
    await process.stdin.flush();
  } catch (error) {
    print('$error\n');
  }
}

jellynoone avatar Mar 03 '22 18:03 jellynoone

Aha, indeed! Thanks!

I wonder if there's something we can do to make that clearer... you're right that it is documented, though I totally missed it when reading it before. Somehow I don't think it was obvious to me that this error was the kind of error the "add" documentation was referring to...

I wonder if we could add something to the error message that would be printed only when the exception is uncaught or something? Something like "this exception can be caught using the IOSink.done property" or something.

Hixie avatar Mar 03 '22 23:03 Hixie

From the looks of it the error is reported to the Completer.future in io_sink.dart in _StreamSinkImpl._completeDoneError.

So, to improve the error message we'd have to decorate the method in the actual _IOSinkImpl and wrap the error. But is it really worth it? The initial stacktrace / error start deep in the _NativeSocket so the source of the error probably cannot contain the additional information.

Would perhaps adding a code sample to add on how to handle the error be sufficient something like:

To handle any OS errors and other errors adding to the [IOSink] call [flush] or await [done] to capture them. 
Example:
try {
  ioSink.add([1, 2, 3]);
  await ioSink.flush();
} catch (e) {
  myErrorHandler(e)
}

Although, the doc block says: "This operation is non-blocking. See [flush] or [done] for how to get any errors generated by this call." The flush and done say nothing about the errors.

jellynoone avatar Mar 04 '22 10:03 jellynoone

However, if I add a delay in between the stdin.add() call and stdin.flush(), the error is NOT handled:

import 'dart:async';
import 'dart:io';

Future<void> main() async {
  try {
    final process = await Process.start('echo', const <String>[]);
    process.stdin.add([1, 2, 3]);
    await Future.delayed(const Duration(seconds: 1));
    await process.stdin.flush();
  } catch (err, stackTrace) {
    // handle error
    print('handle $err\n\n$stackTrace');
  }
}

produces

$ dart main.dart
Unhandled exception:
SocketException: Write failed (OS Error: Broken pipe, errno = 32), port = 0
#0      _NativeSocket.write (dart:io-patch/socket_patch.dart:1246:34)
#1      _RawSocket.write (dart:io-patch/socket_patch.dart:2004:15)
#2      _Socket._write (dart:io-patch/socket_patch.dart:2479:18)
#3      _SocketStreamConsumer.write (dart:io-patch/socket_patch.dart:2214:28)
#4      _SocketStreamConsumer.addStream.<anonymous closure> (dart:io-patch/socket_patch.dart:2168:11)
#5      _RootZone.runUnaryGuarded (dart:async/zone.dart:1594:10)
#6      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:11)
#7      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:7)
#8      _SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:784:19)
#9      _StreamController._add (dart:async/stream_controller.dart:658:7)
#10     _StreamController.add (dart:async/stream_controller.dart:606:5)
#11     _StreamSinkImpl.add (dart:io/io_sink.dart:154:17)
#12     _Socket.add (dart:io-patch/socket_patch.dart:2321:38)
#13     _StdSink.add (dart:io/stdio.dart:350:11)
#14     main (file:///usr/local/google/home/fujino/git/tmp/dart_exception_test/main.dart:7:19)
<asynchronous suspension>

This was on:

Dart SDK version: 3.4.0-60.0.dev (dev) (Tue Jan 23 00:03:10 2024 -0800) on "linux_x64"

christopherfujino avatar Jan 25 '24 23:01 christopherfujino

On my macOS machine, @christopherfujino 's example is racy:

  1. The output is either the same OR
  2. There is no output at all (presumably that is https://github.com/dart-lang/sdk/issues/54735)

brianquinlan avatar Jan 25 '24 23:01 brianquinlan

This code seems to handle the exception:

Future<void> main() async {
  runZonedGuarded(() async {
    final process = await Process.start('echo', const <String>[]);
    process.stdin.add([1, 2, 3]);
    await Future.delayed(const Duration(seconds: 1));
    await process.stdin.flush();
  }, (_, __) {
    print('handle!');
  });
}

christopherfujino avatar Jan 25 '24 23:01 christopherfujino

I see the same exception without the flush call i.e.

import 'dart:async';
import 'dart:io';

Future<void> main() async {
  try {
    final process = await Process.start('echo', const <String>[]);
    process.stdin.add([1, 2, 3]);
    await Future.delayed(const Duration(seconds: 1));
//    await process.stdin.flush();
  } catch (err, stackTrace) {
    // handle error
    print('handle $err\n\n$stackTrace');
  }
}

=>

Unhandled exception:
SocketException: Write failed (OS Error: Broken pipe, errno = 32), port = 0
#0      _NativeSocket.write (dart:io-patch/socket_patch.dart:1246:34)
#1      _RawSocket.write (dart:io-patch/socket_patch.dart:2004:15)
#2      _Socket._write (dart:io-patch/socket_patch.dart:2481:18)
#3      _SocketStreamConsumer.write (dart:io-patch/socket_patch.dart:2216:28)
#4      _SocketStreamConsumer.addStream.<anonymous closure> (dart:io-patch/socket_patch.dart:2168:11)
#5      _RootZone.runUnaryGuarded (dart:async/zone.dart:1594:10)
#6      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:11)
#7      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:7)
#8      _SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:784:19)
#9      _StreamController._add (dart:async/stream_controller.dart:658:7)
#10     _StreamController.add (dart:async/stream_controller.dart:606:5)
#11     _StreamSinkImpl.add (dart:io/io_sink.dart:154:17)
#12     _Socket.add (dart:io-patch/socket_patch.dart:2323:38)
#13     _StdSink.add (dart:io/stdio.dart:350:11)
#14     main (file:///Users/bquinlan/dart/dart-sdk1/samples/listsync/process.dart:7:19)
<asynchronous suspension>

brianquinlan avatar Feb 01 '24 20:02 brianquinlan

Here is the problem:

brianquinlan avatar Feb 01 '24 23:02 brianquinlan

As far as I can see await process.stdin.flush() doesn't actually do anything.

brianquinlan avatar Feb 01 '24 23:02 brianquinlan

As far as I can see await process.stdin.flush() doesn't actually do anything.

lol

christopherfujino avatar Feb 02 '24 00:02 christopherfujino

But flush does seem to affect whether or not runZonedGuarded() will catch the exception (although it's possible this is racy too...).

This code handles the SocketException:

Future<void> main() async {
  final process = await Process.start('echo', const <String>[]);
  runZonedGuarded<void>(
    () {
      process.stdin.add([1, 2, 3]);
      process.stdin.flush();
    },
    (_, __) => print('handled'),
  );
}

While this does not:

Future<void> main() async {
  final process = await Process.start('echo', const <String>[]);
  runZonedGuarded<void>(
    () {
      process.stdin.add([1, 2, 3]);
      //process.stdin.flush();
    },
    (_, __) => print('handled'),
  );
}

christopherfujino avatar Feb 02 '24 01:02 christopherfujino

In my theoretical world, the flush is necessary because it keeps the runZonedGuarded alive until the Stream gets cleaned-up and the exception generated. But I don't know why Future.delayed is not sufficient for that.

brianquinlan avatar Feb 05 '24 18:02 brianquinlan

In my theoretical world, the flush is necessary because it keeps the runZonedGuarded alive until the Stream gets cleaned-up and the exception generated. But I don't know why Future.delayed is not sufficient for that.

Would not the ErrorZone continue to exist as long as the stream created from that zone does? (I am confident I have no idea what's going, just trying to explain my current mental model).

christopherfujino avatar Feb 05 '24 23:02 christopherfujino

Keeping a zone alive is not relevant. They stay alive as long as something want to run in them.

A bigger question is which zone the code that triggers the error is running in. If it's triggered by process.stdin.flush(), it may run in that error zone. If it's triggered by something else, it might happen outside of that zone.

Reporting an error in a future risks it getting caught at the boundary of the error zone of the runZonedGuarded, if the future and the listener were created on different sides of that boundary.

I don't think the stdin API is designed around zones, so it might be quite accidental in which zone something happens. Might be worth giving it a check, and see if there's something we can do to at least be consistent about which zone things happen in. (Say, have Process.stdin remember Zone.current where it's created, and do everything in terms of that. In which case this code should create the process inside the zone, if it wants to capture the done future error.)

lrhn avatar Feb 06 '24 12:02 lrhn

My plan right now is to capture the error from the socket and then surface it in process.stdin.flush().

brianquinlan avatar Feb 06 '24 18:02 brianquinlan

I traced through this a bit more and the exception is lost in _StreamSinkImpl<T>._completeDoneError, which is called from _StreamSinkImpl<T>_controller.

So I can work around the issue with:

  process.stdin.done
      .onError((error, stackTrace) => print('done error: $error'));

brianquinlan avatar Feb 09 '24 00:02 brianquinlan

    if (_modeHasStdio(_mode)) {
      // stdin going to process.
      _stdin = new _StdSink(new _Socket._writePipe().._owner = this);
+     _stdin!.done.ignore();

      // stdout coming from process.
      _stdout = new _StdStream(new _Socket._readPipe().._owner = this);
      // stderr coming from process.
      _stderr = new _StdStream(new _Socket._readPipe().._owner = this);
    }

Provides reasonable semantics, I think:

  • Process.stdin.flush, Process.stdin.done will still throw if there was an error
  • There will never be any Zone errors.

I'm going to see if this breaks anything and write some new tests to see if this works on all platforms.

brianquinlan avatar Feb 09 '24 01:02 brianquinlan

I have a repo with a minimal repro that I think more accurately models what we're hitting in the flutter tool: it has a C program that called via Process.start() that will close its stdin file handle and then sleep for 5 seconds, so that the Dart program will attempt to write to it before dart:io does any housekeeping for an exited application: https://github.com/chris-wdi-assignments/dart_stdin_broken_pipe_test

christopherfujino avatar Feb 15 '24 19:02 christopherfujino

@brianquinlan I think this is the cause for some flaky tests in the debug adapter (https://github.com/dart-lang/sdk/issues/55313). I see you reverted your most recent change so I wonder what the current status is - is this broken in a way we can't currently handle, or are there ways that we can ensure we catch these errors?

DanTup avatar May 15 '24 14:05 DanTup