grpc-node icon indicating copy to clipboard operation
grpc-node copied to clipboard

grpc-js bidirectional client hangs open when server ends stream directly following async iterator

Open sehrgut opened this issue 3 years ago • 2 comments

Problem description

A server consuming a bidirectional (Duplex) stream using an async iterator and calling call.end() after the iterator loop completes leaves the client hanging open. The only workaround is to add an end event handler on the server stream, and call call.end() from there.

Reproduction steps

  1. Compile the following biditest.proto to ./proto/generated:
syntax = "proto3";
package biditest;

message Block {
	uint32	seq		= 1;
}

service BidiTest {
	rpc Do (stream Block) returns (stream Block);
}
  1. Run the following example script:
var	grpc			= require('@grpc/grpc-js'),
	services		= require('./proto/generated/biditest_grpc_pb'),
	messages		= require('./proto/generated/biditest_pb');

function getDeadline(millis) {
	return Date.now() + millis;
}

async function sleep(interval) {
  return new Promise(resolve => {
    setTimeout(() => resolve(), interval);
  });
}

async function randomSleep(maxMillis) {
	let millis = Math.floor(Math.random() * maxMillis);
	await sleep(millis);
}

async function handleDoWithEndEvent(call) {

	call.on('end', () => {
		console.log("[Server1] Got end event");
		console.log("[Server1] Ending");
		call.end();
	});

	for await (const req of call) {
		let seq = req.getSeq();
		console.log("[Server1] Received %d", seq);
		let res = new messages.Block();
		res.setSeq(seq);
		
		console.log("[Server1] Sending %d", seq);
		call.write(res);
		await randomSleep(100);
	}
	
	console.log('[Server1] Async iterator ended');
}

async function handleDoPureAsync(call) {
	for await (const req of call) {
		let seq = req.getSeq();
		console.log("[Server2] Received %d", seq);
		let res = new messages.Block();
		res.setSeq(seq);
		
		console.log("[Server2] Sending %d", seq);
		call.write(res);
		await randomSleep(100);
	}
	console.log('[Server2] Async iterator ended');
	
	console.log('[Server2] Ending');
	call.end();
}

async function doClient(blocks) {
	var client	= new services.BidiTestClient(
		'localhost:50051',
		grpc.credentials.createInsecure()
	);

	let call = client.do({deadline:getDeadline(10000)});
	
	call.on('data', (res) => {
		console.log("[Client] Received %d", res.getSeq());
	});
	
	call.on('error', (err) => {
		console.error("[Client][ERROR] %s", err);
	});
	
	call.on('end', () => {
		console.log("[Client] Got End");
	});
	
	for (var i=0; i<blocks; i++) {
		console.log("[Client] Sending %d", i);
		var block = new messages.Block();
		block.setSeq(i);
		call.write(block);
		await randomSleep(100);
	}

	console.log('[Client] Ending');	
	call.end();

}

function main1() {
	var server = new grpc.Server();
	server.addService(services.BidiTestService, {
		"do": handleDoWithEndEvent
	});
	server.bindAsync("127.0.0.1:50051",
		grpc.ServerCredentials.createInsecure(),
		() => {
			server.start();
			console.log("async-bidi-test:handleDoWithEndEvent service started on 127.0.0.1:50051");
			doClient(10).then(() => server.tryShutdown(()=> main2()));
		});
}

function main2() {
	var server = new grpc.Server();
	server.addService(services.BidiTestService, {
		"do": handleDoPureAsync
	});
	server.bindAsync("127.0.0.1:50051",
		grpc.ServerCredentials.createInsecure(),
		() => {
			server.start();
			console.log("async-bidi-test:handleDoPureAsync service started on 127.0.0.1:50051");
			doClient(10).then(() => server.tryShutdown(()=>{}));
		});
}

main1();
  1. You will observe output similar to the following:
async-bidi-test:handleDoWithEndEvent service started on 127.0.0.1:50051
[Client] Sending 0
[Server1] Received 0
[Server1] Sending 0
[Client] Received 0
[Client] Sending 1
[Server1] Received 1
[Server1] Sending 1
[Client] Received 1
[Client] Sending 2
[Server1] Received 2
[Server1] Sending 2
[Client] Received 2
[Client] Sending 3
[Server1] Received 3
[Server1] Sending 3
[Client] Received 3
[Client] Sending 4
[Server1] Received 4
[Server1] Sending 4
[Client] Received 4
[Client] Sending 5
[Server1] Received 5
[Server1] Sending 5
[Client] Received 5
[Client] Sending 6
[Server1] Received 6
[Server1] Sending 6
[Client] Received 6
[Client] Sending 7
[Server1] Received 7
[Server1] Sending 7
[Client] Sending 8
[Client] Received 7
[Server1] Received 8
[Server1] Sending 8
[Client] Received 8
[Client] Sending 9
[Client] Ending
[Server1] Received 9
[Server1] Sending 9
[Server1] Got end event
[Server1] Ending
[Client] Received 9
[Client] Got End
async-bidi-test:handleDoPureAsync service started on 127.0.0.1:50051
[Client] Sending 0
[Server2] Received 0
[Server2] Sending 0
[Client] Received 0
[Client] Sending 1
[Client] Sending 2
[Server1] Async iterator ended
[Server2] Received 1
[Server2] Sending 1
[Client] Received 1
[Client] Sending 3
[Server2] Received 2
[Server2] Sending 2
[Client] Received 2
[Client] Sending 4
[Server2] Received 3
[Server2] Sending 3
[Client] Received 3
[Client] Sending 5
[Server2] Received 4
[Server2] Sending 4
[Client] Received 4
[Server2] Received 5
[Server2] Sending 5
[Client] Sending 6
[Client] Received 5
[Client] Sending 7
[Server2] Received 6
[Server2] Sending 6
[Client] Received 6
[Server2] Received 7
[Server2] Sending 7
[Client] Sending 8
[Client] Received 7
[Server2] Received 8
[Server2] Sending 8
[Client] Received 8
[Client] Sending 9
[Client] Ending
[Server2] Received 9
[Server2] Sending 9
[Client] Received 9
[Server2] Async iterator ended
[Server2] Ending
[Client][ERROR] Error: 4 DEADLINE_EXCEEDED: Deadline exceeded
    at Object.callErrorFromStatus (/Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/call.js:31:26)
    at Object.onReceiveStatus (/Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/client.js:390:49)
    at Object.onReceiveStatus (/Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/client-interceptors.js:299:181)
    at /Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/call-stream.js:145:78
    at processTicksAndRejections (internal/process/task_queues.js:77:11) {
  code: 4,
  details: 'Deadline exceeded',
  metadata: [Metadata]
}
[Client] Got End

Environment

  • Mac OS 11.0.1 x86-64
  • Node v14.17.1 and v16.4.0
  • Node installed from npm/n
  • @grpc/[email protected]

Additional context

The Server1 instance uses the workaround, checking for the end of the source stream by listening to the end event, and ending the sink stream at that point. This successfully ends the stream for the client. The Server2 instance uses a pure async iterator implementation, which I would argue is a "least astonishment" implementation. After await returns from the async iterator, implying the source stream has ended, it calls call.end(), ending the sink stream. This ought to end the stream for the client, but instead the client does not end until much later, with DEADLINE_EXCEEDED.

The completion of an async iterator on a Readable stream should indicate the source stream has ended. At that point, if the Readable was a Duplex, calling the end() method should end the sink stream. Using the stream event interface alongside the async iterator should not be necessary.

As an aside, note that when pipelined like this the ending of Server1's async iterator occurs significantly AFTER Server2 has started, which happens within the server1 tryShutdown callback. Since the async iterator is occurring inside the async service method, it seems likely that tryShutdown is completing before it ought to; since this handler should have to complete before tryShutdown ends the server and calls its callback. I haven't investigated this in-depth, since I only discovered it while creating this repro; but perhaps it ought to be another separate bug report. I'm mentioning it here on the chance that it points to a flaw in async service method handling that could be causing the main bug.

sehrgut avatar Jul 01 '21 00:07 sehrgut

It looks to me like the problem is that the end event does not cause the async iteration to end. It's hard to tell from that log, but my guess is that it is waiting for the close event, which wouldn't be emitted on a Duplex stream until some time after both sides have called the end method.

Regarding tryShutdown, the purpose of that method is to prevent new requests from starting and to wait for existing open requests to end. At the point when you call it, that request has ended, so it's OK for tryShutdown to finish quickly. The fact that there may also be some local asynchronous cleanup work doesn't really matter.

murgatroid99 avatar Jul 08 '21 15:07 murgatroid99

stream.on("end") notify just the end of the readable part of the duplex stream, while the writable stream is paused (not the expected behaviour) . To force close even the writable stream is needed ta call stream.end() inside the stream.on("end") event. It's really ugly but it works.

ttessarolo avatar Feb 23 '23 11:02 ttessarolo