grpc-node
grpc-node copied to clipboard
grpc-js bidirectional client hangs open when server ends stream directly following async iterator
Problem description
A server consuming a bidirectional (Duplex) stream using an async iterator and calling call.end()
after the iterator loop completes leaves the client hanging open. The only workaround is to add an end
event handler on the server stream, and call call.end()
from there.
Reproduction steps
- Compile the following
biditest.proto
to./proto/generated
:
syntax = "proto3";
package biditest;
message Block {
uint32 seq = 1;
}
service BidiTest {
rpc Do (stream Block) returns (stream Block);
}
- Run the following example script:
var grpc = require('@grpc/grpc-js'),
services = require('./proto/generated/biditest_grpc_pb'),
messages = require('./proto/generated/biditest_pb');
function getDeadline(millis) {
return Date.now() + millis;
}
async function sleep(interval) {
return new Promise(resolve => {
setTimeout(() => resolve(), interval);
});
}
async function randomSleep(maxMillis) {
let millis = Math.floor(Math.random() * maxMillis);
await sleep(millis);
}
async function handleDoWithEndEvent(call) {
call.on('end', () => {
console.log("[Server1] Got end event");
console.log("[Server1] Ending");
call.end();
});
for await (const req of call) {
let seq = req.getSeq();
console.log("[Server1] Received %d", seq);
let res = new messages.Block();
res.setSeq(seq);
console.log("[Server1] Sending %d", seq);
call.write(res);
await randomSleep(100);
}
console.log('[Server1] Async iterator ended');
}
async function handleDoPureAsync(call) {
for await (const req of call) {
let seq = req.getSeq();
console.log("[Server2] Received %d", seq);
let res = new messages.Block();
res.setSeq(seq);
console.log("[Server2] Sending %d", seq);
call.write(res);
await randomSleep(100);
}
console.log('[Server2] Async iterator ended');
console.log('[Server2] Ending');
call.end();
}
async function doClient(blocks) {
var client = new services.BidiTestClient(
'localhost:50051',
grpc.credentials.createInsecure()
);
let call = client.do({deadline:getDeadline(10000)});
call.on('data', (res) => {
console.log("[Client] Received %d", res.getSeq());
});
call.on('error', (err) => {
console.error("[Client][ERROR] %s", err);
});
call.on('end', () => {
console.log("[Client] Got End");
});
for (var i=0; i<blocks; i++) {
console.log("[Client] Sending %d", i);
var block = new messages.Block();
block.setSeq(i);
call.write(block);
await randomSleep(100);
}
console.log('[Client] Ending');
call.end();
}
function main1() {
var server = new grpc.Server();
server.addService(services.BidiTestService, {
"do": handleDoWithEndEvent
});
server.bindAsync("127.0.0.1:50051",
grpc.ServerCredentials.createInsecure(),
() => {
server.start();
console.log("async-bidi-test:handleDoWithEndEvent service started on 127.0.0.1:50051");
doClient(10).then(() => server.tryShutdown(()=> main2()));
});
}
function main2() {
var server = new grpc.Server();
server.addService(services.BidiTestService, {
"do": handleDoPureAsync
});
server.bindAsync("127.0.0.1:50051",
grpc.ServerCredentials.createInsecure(),
() => {
server.start();
console.log("async-bidi-test:handleDoPureAsync service started on 127.0.0.1:50051");
doClient(10).then(() => server.tryShutdown(()=>{}));
});
}
main1();
- You will observe output similar to the following:
async-bidi-test:handleDoWithEndEvent service started on 127.0.0.1:50051
[Client] Sending 0
[Server1] Received 0
[Server1] Sending 0
[Client] Received 0
[Client] Sending 1
[Server1] Received 1
[Server1] Sending 1
[Client] Received 1
[Client] Sending 2
[Server1] Received 2
[Server1] Sending 2
[Client] Received 2
[Client] Sending 3
[Server1] Received 3
[Server1] Sending 3
[Client] Received 3
[Client] Sending 4
[Server1] Received 4
[Server1] Sending 4
[Client] Received 4
[Client] Sending 5
[Server1] Received 5
[Server1] Sending 5
[Client] Received 5
[Client] Sending 6
[Server1] Received 6
[Server1] Sending 6
[Client] Received 6
[Client] Sending 7
[Server1] Received 7
[Server1] Sending 7
[Client] Sending 8
[Client] Received 7
[Server1] Received 8
[Server1] Sending 8
[Client] Received 8
[Client] Sending 9
[Client] Ending
[Server1] Received 9
[Server1] Sending 9
[Server1] Got end event
[Server1] Ending
[Client] Received 9
[Client] Got End
async-bidi-test:handleDoPureAsync service started on 127.0.0.1:50051
[Client] Sending 0
[Server2] Received 0
[Server2] Sending 0
[Client] Received 0
[Client] Sending 1
[Client] Sending 2
[Server1] Async iterator ended
[Server2] Received 1
[Server2] Sending 1
[Client] Received 1
[Client] Sending 3
[Server2] Received 2
[Server2] Sending 2
[Client] Received 2
[Client] Sending 4
[Server2] Received 3
[Server2] Sending 3
[Client] Received 3
[Client] Sending 5
[Server2] Received 4
[Server2] Sending 4
[Client] Received 4
[Server2] Received 5
[Server2] Sending 5
[Client] Sending 6
[Client] Received 5
[Client] Sending 7
[Server2] Received 6
[Server2] Sending 6
[Client] Received 6
[Server2] Received 7
[Server2] Sending 7
[Client] Sending 8
[Client] Received 7
[Server2] Received 8
[Server2] Sending 8
[Client] Received 8
[Client] Sending 9
[Client] Ending
[Server2] Received 9
[Server2] Sending 9
[Client] Received 9
[Server2] Async iterator ended
[Server2] Ending
[Client][ERROR] Error: 4 DEADLINE_EXCEEDED: Deadline exceeded
at Object.callErrorFromStatus (/Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/call.js:31:26)
at Object.onReceiveStatus (/Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/client.js:390:49)
at Object.onReceiveStatus (/Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/client-interceptors.js:299:181)
at /Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/call-stream.js:145:78
at processTicksAndRejections (internal/process/task_queues.js:77:11) {
code: 4,
details: 'Deadline exceeded',
metadata: [Metadata]
}
[Client] Got End
Environment
- Mac OS 11.0.1 x86-64
- Node v14.17.1 and v16.4.0
- Node installed from npm/n
- @grpc/[email protected]
Additional context
The Server1 instance uses the workaround, checking for the end of the source stream by listening to the end
event, and ending the sink stream at that point. This successfully ends the stream for the client. The Server2 instance uses a pure async iterator implementation, which I would argue is a "least astonishment" implementation. After await returns from the async iterator, implying the source stream has ended, it calls call.end()
, ending the sink stream. This ought to end the stream for the client, but instead the client does not end until much later, with DEADLINE_EXCEEDED
.
The completion of an async iterator on a Readable
stream should indicate the source stream has ended. At that point, if the Readable
was a Duplex
, calling the end()
method should end the sink stream. Using the stream event interface alongside the async iterator should not be necessary.
As an aside, note that when pipelined like this the ending of Server1's async iterator occurs significantly AFTER Server2 has started, which happens within the server1 tryShutdown
callback. Since the async iterator is occurring inside the async service method, it seems likely that tryShutdown
is completing before it ought to; since this handler should have to complete before tryShutdown
ends the server and calls its callback. I haven't investigated this in-depth, since I only discovered it while creating this repro; but perhaps it ought to be another separate bug report. I'm mentioning it here on the chance that it points to a flaw in async service method handling that could be causing the main bug.
It looks to me like the problem is that the end
event does not cause the async iteration to end. It's hard to tell from that log, but my guess is that it is waiting for the close
event, which wouldn't be emitted on a Duplex stream until some time after both sides have called the end
method.
Regarding tryShutdown
, the purpose of that method is to prevent new requests from starting and to wait for existing open requests to end. At the point when you call it, that request has ended, so it's OK for tryShutdown
to finish quickly. The fact that there may also be some local asynchronous cleanup work doesn't really matter.
stream.on("end") notify just the end of the readable part of the duplex stream, while the writable stream is paused (not the expected behaviour) . To force close even the writable stream is needed ta call stream.end() inside the stream.on("end") event. It's really ugly but it works.