grpc-dart
grpc-dart copied to clipboard
server.shutdown never finishes when streaming data
When the server is streaming data to a client and server.shutdown() is called, it'll hang as the connection never finishes.
grpc:
dependency: "direct main"
description:
name: grpc
url: "https://pub.dartlang.org"
source: hosted
version: "2.1.3"
Repro steps
replace the helloworld.proto, server.dart and client.dart files with the ones bellow and re-run
protoc --dart_out=grpc:lib/src/generated -Iprotos protos/helloworld.proto
run dart bin/sever.dart and then quickly run dart bin/client.dart (as the server will attempt to close after 5 seconds).
The server never fully shuts down as the connection.finish() call never finishes and both the server and client will hang until the server process is closed.
If you change https://github.com/grpc/grpc-dart/blob/master/lib/src/server/server.dart#L147 to
final done = _connections.map((connection) => connection.terminate()).toList();
then it works as I'd expect, though this is probably not the proper fix for this.
Details
helloworld.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
server.dart
import 'dart:async';
import 'package:grpc/grpc.dart';
import 'package:helloworld/src/generated/helloworld.pb.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
class GreeterService extends GreeterServiceBase {
@override
Stream<HelloReply> sayHello(ServiceCall call, HelloRequest request) {
return Stream.periodic(Duration(seconds: 1), (value) {
return HelloReply()..message = 'Hello, ${request.name}!';
});
}
}
Future<void> main(List<String> args) async {
final server = Server([GreeterService()]);
await server.serve(port: 50051);
print('Server listening on port ${server.port}...');
await Future.delayed(Duration(seconds: 5));
print('shutting down');
await server.shutdown();
print('shutdown');
}
client.dart
import 'dart:async';
import 'package:grpc/grpc.dart';
import 'package:helloworld/src/generated/helloworld.pb.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
Future<void> main(List<String> args) async {
final channel = ClientChannel(
'localhost',
port: 50051,
options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
);
final stub = GreeterClient(channel);
final name = args.isNotEmpty ? args[0] : 'world';
try {
final response = await stub.sayHello(HelloRequest()..name = name);
response.listen((value) {
print('Greeter client received: ${value.message}');
}, onError: (error) {
print(error);
channel.shutdown();
});
} catch (e) {
print('Caught error: $e');
}
}
This is still a bug. shutdown() does not force streams to close and waits for them to finish. The only workaround is to ensure the streams are closed before calling shutdown(). However I find that call.sendTrailers() must be manually called at the end of the stream generator because it is not automatically called at the end of the stream unlike how it is documented.
The workaround I have at the moment looks like this in the service RPC implementation:
// sendTrailers is not always called automatically when the stream ends
// despite the documentation.
// Without calling this, the grpc stream will hang and never close.
final controller = StreamController<Event>(
onCancel: () => call.sendTrailers(),
);
// When upstream stream is done, cancel this one
controller.addStream(stream).then((_) => controller.close());
return controller.stream;