apisix
apisix copied to clipboard
bug: The GRPC Proxy test shows that the GRPC Client connection is blocked
Current Behavior
Hi, I would like to use GRPC proxy, but there are some problems in pressure measurement GRPC Client 100 concurrent requests, 500 or 1000 requests GRPC Client connections will be closed. Apisix logs do not exist. The Apisix log level is WARN. The failure rate increases with the number of requests
Expected Behavior
No response
Error Logs
io.grpc.StatusRuntimeException: UNAVAILABLE: HTTP/2 error code: NO_ERROR Received Goaway at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:235) at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:216) at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:141) at com.infra.spring.example.grpc.common.GreeterGrpc$GreeterBlockingStub.sayHello(GreeterGrpc.java:448) at com.infra.spring.example.grpc.client.GreeteHelloGrpcPerfTest.batchUserDeviceRequest(GreeteHelloGrpcPerfTest.java:145) at com.infra.spring.example.grpc.client.GreeteHelloGrpcPerfTest$1.run(GreeteHelloGrpcPerfTest.java:68) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Steps to Reproduce
-
The operating environment: Java environment:1.8.0_291 grpc version: 1.24.2
-
helloworld.proto syntax = "proto3"; option java_multiple_files = true; option java_package = "com.infra.spring.example.grpc.common"; option java_outer_classname = "HelloWorldProto"; package helloworld;
// The greeting service definition. service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) { } }
// The request message containing the user's name. message HelloRequest { string name = 1; int64 age = 2; }
// The response message containing the greetings message HelloReply { int32 success = 1; string message = 2; int64 age = 3; string time = 4; }
-
grpc server @GRpcService public class GreeterGrpcImpl extends GreeterGrpc.GreeterImplBase { private static final Logger LOGGER = LoggerFactory.getLogger(GreeterGrpcImpl.class); @Override public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) { LOGGER.info("GreeterGrpc.GreeterImplBase name: {}", request.getName()); HelloReply resp = HelloReply.newBuilder().setMessage("hello," + (request.getName())).setSuccess(200).setAge(request.getAge()). setTime(String.valueOf(System.currentTimeMillis())).build(); responseObserver.onNext(resp); responseObserver.onCompleted(); } }
-
GRPC pressure test client package com.infra.spring.example.grpc.client; import com.infra.spring.example.grpc.common.GreeterGrpc; import com.infra.spring.example.grpc.common.HelloReply; import com.infra.spring.example.grpc.common.HelloRequest; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder;
import java.text.NumberFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors;
public class GreeteHelloGrpcPerfTest { static int count = 0; static int num = 0; //总访问量是clientNum,并发量是threadNum private final int threadNum; private final int clientNum;
private float successRate;
private float avgExecTime;
private float sumExecTime;
private long firstExecTime = Long.MAX_VALUE;
private long lastDoneTime = Long.MIN_VALUE;
private float totalExecTime;
private int successNum;
private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub stub;
public GreeteHelloGrpcPerfTest(int threadNum, int clientNum) {
this.threadNum = threadNum;
this.clientNum = clientNum;
// channel = ManagedChannelBuilder.forAddress("ip", port).usePlaintext().build(); //server
channel = ManagedChannelBuilder.forAddress("ip", 9081).usePlaintext().build(); // apisix proxy
this.stub = GreeterGrpc.newBlockingStub(channel);
}
public void run() {
final ConcurrentHashMap<Integer, ThreadRecord> records = new ConcurrentHashMap<Integer, ThreadRecord>();
// 建立ExecutorService线程池
final ExecutorService exec = Executors.newFixedThreadPool(threadNum);
// thread_num个线程可以同时访问
// 模拟client_num个客户端访问
final CountDownLatch doneSignal = new CountDownLatch(clientNum);
final Random random = new Random();
int uid = random.nextInt();
if (batchUserDeviceRequest(uid) == 200) {
for (int i = 0; i < clientNum; i++) {
Runnable run = new Runnable() {
public void run() {
int index = getIndex();
boolean sflag = false;
int uidNum = random.nextInt();
long st = System.currentTimeMillis();
try {
if (batchUserDeviceRequest(uidNum) == 200) {
sflag = true;
}
} catch (Exception e) {
e.printStackTrace();
}
records.put(index, new ThreadRecord(st, System.currentTimeMillis(), sflag));
doneSignal.countDown();//每调用一次countDown()方法,计数器减1
}
};
exec.execute(run);
}
try {
//计数器大于0 时,await()方法会阻塞程序继续执行
doneSignal.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行完成,开始计算结果!");
ArrayList<Long> recordsList = new ArrayList<>();
for (int i : records.keySet()) {
ThreadRecord r = records.get(i);
sumExecTime += ((double) (r.et - r.st));
recordsList.add(r.et - r.st);
if (r.sflag) {
successNum = getNum();
}
if (r.st < firstExecTime) {
firstExecTime = r.st;
}
if (r.et > lastDoneTime) {
this.lastDoneTime = r.et;
}
}
List<Long> rList = recordsList.stream().sorted(Long::compareTo).collect(Collectors.toList());
this.avgExecTime = this.sumExecTime / records.size();
this.successRate = ((float) this.successNum / (float) records.size()) * 100;
this.totalExecTime = ((float) (this.lastDoneTime - this.firstExecTime));
NumberFormat nf = NumberFormat.getNumberInstance();
nf.setMaximumFractionDigits(2);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("===========================================================");
System.out.println("并发数: " + threadNum);
System.out.println("总执行次数: " + clientNum);
System.out.println("开始时间: " + formatter.format(firstExecTime));
System.out.println("结束时间: " + formatter.format(lastDoneTime));
System.out.println("平均响应时间: " + nf.format(this.avgExecTime) + " ms");
System.out.println("最小响应时间: " + nf.format(rList.get(0)) + " ms");
System.out.println("最大响应时间: " + nf.format(rList.get(rList.size() - 1)) + " ms");
System.out.println("总用时: " + nf.format(this.totalExecTime) + " ms");
System.out.println("成功率: " + nf.format(this.successRate) + "%");
System.out.println("TPS: " + nf.format(this.clientNum / (this.totalExecTime / 1000)) + " /s");
System.out.println("===========================================================");
} else {
System.out.println("Request exception");
}
}
public Integer batchUserDeviceRequest(int age) {
HelloRequest.Builder re= HelloRequest.newBuilder();
re.setName("name");
re.setAge(age);
HelloReply responseObserver = stub.sayHello(re.build());
return responseObserver.getSuccess();
}
public static int getIndex() {
return ++count;
}
public static int getNum() {
return ++num;
}
public static void main(String[] args) {
new GreeteHelloGrpcPerfTest(100, 500).run();
System.out.println("finished!");
}
public static class ThreadRecord {
public long st;
public long et;
public boolean sflag;
public ThreadRecord(long st, long et, boolean sflag) {
this.st = st;
this.et = et;
this.sflag = sflag;
}
}
}
- apisix etcd route /apisix/routes/420140932562682597 {"id":"420140932562682597","create_time":1659953110,"update_time":1660112069,"uri":"/helloworld.Greeter/SayHello","name":"example","methods":["GET","POST","PUT","DELETE","PATCH","HEAD","OPTIONS","CONNECT","TRACE"],"upstream_id":"420384452577854181","status":1}
Environment
- APISIX version (run
apisix version
):2.14.1 - Operating system (run
uname -a
):# 101-Ubuntu SMP Fri - OpenResty / Nginx version (run
openresty -V
ornginx -V
):nginx version: openresty/1.19.9.1 - etcd version, if relevant (run
curl http://127.0.0.1:9090/v1/server_info
):3.4.0 - APISIX Dashboard version, if relevant: 2.11.0
- Plugin runner version, for issues related to plugin runners:
- LuaRocks version, for installation issues (run
luarocks --version
):luarocks 3.8.0
Are you sending theses requests concurrently? APISIX limits the concurrent http2 stream to 128 by default (http2_max_concurrent_stream).
Yes, the original GRPC client request GRPC server to share a channel, now the same channel request Apisix error, where can I modify the HTTP2_MAX_concurrent_stream parameter? And isn't the Apisix proxy GRPC Server also reusing HTTP2 long connections? You'll see GRPC Server threads keep spiking when you make concurrent requests
@Bigwen-1 The Nginx gRPC proxy module cannot reuse the HTTP/2 connections due to its connection pool design.
And you can set http2_max_concurrent_stream
via https://github.com/apache/apisix/blob/master/conf/config-default.yaml#L209.
ping @Bigwen-1 change the http2_max_concurrent_stream
solve your problem?
Thank you very much for your concern. Based on this, the problem has been solved @tzssangglass
I'm closing this issue now.