nest
nest copied to clipboard
Detect disconnect of a GRPC stream
Hi there,
We use a GRPC stream with Subject + Observable as described here: https://docs.nestjs.com/microservices/grpc
Things work well, as long as the network connection is never interrupted. However, a subscription that was setup successfully once never emits an error or a 'completed' event. Even if you unplug the network cable, the GRPC stream seems to be 'up and running' - the 'closed' property of the subscription never becomes 'true'.
What is the correct way to detect the actual status of the GRPC stream?
Simplified code example:
this.remoteService = this.client.getService<RemoteService>('ExampleService');
const observable = this.remoteService.subscribeEvents(request);
const subscription = observable.subscribe(message => {
Logger.l.log('Received a message from cloud server');
}, error => {
Logger.l.error('Subscribing events from cloud server ended with error: ' + error.toString());
}, () => {
Logger.l.warn('Subscribing events from cloud server ended.');
});
Can you share a minimal repo that reproduces your issue? Would love to take a look
Thanks for your fast response.
I have publish a quick example here: https://github.com/sjkummer/nestjs-grpc-stream-helloworld
See the README for details how to setup run the GRPC client and server
Important: The server MUST run on another machine than the client
After unplugging the network cable of the client, the current output is something similar as:
2019-10-31 15:31:15 [INFO] Received a message from cloud server:
2019-10-31 15:31:15 [INFO] {
"message": "14"
}
2019-10-31 15:31:15 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:31:16 [INFO] Received a message from cloud server:
2019-10-31 15:31:16 [INFO] {
"message": "15"
}
2019-10-31 15:31:17 [INFO] Received a message from cloud server:
2019-10-31 15:31:17 [INFO] {
"message": "16"
}
2019-10-31 15:31:20 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:31:25 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:31:30 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:31:35 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:31:40 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:31:45 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:31:50 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:31:55 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:00 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:05 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:10 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:15 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:20 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:25 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:30 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:35 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:40 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:45 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:50 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:32:55 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:33:00 [DEBUG] Watchdog: GRPC stream subscription is up and running
Expected output would be the same as when the service is terminated gracefully (CTRL - C):
2019-10-31 15:38:09 [INFO] Received a message from cloud server:
2019-10-31 15:38:09 [INFO] {
"message": "431"
}
2019-10-31 15:38:10 [INFO] Received a message from cloud server:
2019-10-31 15:38:10 [INFO] {
"message": "433"
}
2019-10-31 15:38:11 [DEBUG] Watchdog: GRPC stream subscription is up and running
2019-10-31 15:38:11 [INFO] Received a message from cloud server:
2019-10-31 15:38:11 [INFO] {
"message": "435"
}
2019-10-31 15:38:11 [ERROR] Subscribing events from cloud server ended with error: Error: 2 UNKNOWN: Stream removed
2019-10-31 15:38:11 [ERROR] undefined
2019-10-31 15:38:11 [ERROR] {
"code": 2,
"metadata": {
"_internal_repr": {},
"flags": 0
},
"details": "Stream removed"
}
Hi there,
Can you clarify if this is actually a bug or just a lack of documentation?
Best regards, Sebastian
Any updated on this?
Also: The other way round does not work either. The server does not get an 'error' on 'completed' event after a client-grpc-stream stopps/disconnects (I am using the Observable approach here).
Sample repo: https://github.com/sjkummer/nestjs-grpc-stream-helloworld
I know this issue is pretty old, but I wanted to share some developments I've been working on for a similar setup using bidirecctional streaming. I've found that adding teardown logic with RxJS's Subscription.add method allows me to easily reconnect when the subscription is terminated (due to network errors, host server reboots, etc.). Currently this code will try to reconnect to the stream after 5s, though arguably adding some backoff logic would be better to ensure you're not continually trying to connect to a broken host.
export class WeatherService {
private readonly logger = new Logger(WeatherService.name);
private readonly weatherConnectionService: weatherClient;
private weatherSubscription: Subscription;
private readonly weatherRequests = new Subject<WeatherRequest>();
private weatherResponses: Subject<WeatherResponse>;
constructor(@Inject('WEATHER_PACKAGE') private client: ClientGrpc) {
this.weatherConnectionService =
this.client.getService<weatherClient>(WEATHER_SERVICE_NAME);
}
onModuleInit() {
this.initConnection();
}
initConnection() {
// Terminate existing subscriptions
this.weatherSubscription?.unsubscribe();
this.weatherResponses?.complete();
// Recreate subject and subscription
this.weatherResponses = new Subject<WeatherResponse>();
this.weatherSubscription = this.weatherConnectionService
.weather(this.weatherRequests.asObservable())
.subscribe(this.weatherResponses);
// Add finalizer (called when connection is terminated)
this.weatherSubscription.add(() => {
setTimeout(() => this.initConnection(), 5000);
});
}
// ...
}
The only way I can think of for Nest.js to handle this more gracefully would be with a fully managed stream implementation (such as with a @Stream() decorator) that uses similar reconnection logic internally. Otherwise, these streams are handled entirely in the user space and there would be no easy way for Nest.js to manage the disconnection status or automate reconnections.