eventmesh
eventmesh copied to clipboard
[Bug] Stream is already completed, no further calls are allowed
Search before asking
- [X] I had searched in the issues and found no similar issues.
Environment
Linux
EventMesh version
1.10.0
What happened
我这应用系统集成1.10的sdk,对接eventmesh服务端,用grpc协议接收消息,rocketmq是双机集群,发现有时能收到消息有时收不到, eventmesh 服务端总是报如下错误,
2024-04-11 19:15:24,290 ERROR [eventMesh-grpc-pushMsg-2] StreamPushRequest(StreamPushRequest.java:91) - message|eventMesh2client|exception=Stream is already completed, no further calls are allowed |emitter|topic=TEST-TOPIC-GRPC-SYNC|bizSeqNo=949462082066922981761046640402|uniqueId=988003831724711254497105493010|cost=0 java.lang.IllegalStateException: Stream is already completed, no further calls are allowed at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.0.1-jre.jar:?] at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:375) ~[grpc-stub-1.43.2.jar:1.43.2] at org.apache.eventmesh.runtime.core.protocol.grpc.push.StreamPushRequest.tryPushRequest(StreamPushRequest.java:81) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release] at org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler.lambda$handle$1(MessageHandler.java:75) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
我把EventEmitter 这类改成如下方式,客户端肯定能收到消息,没有消息会丢或者收不到。 但是clustering模式失效了,变成广播模式接收了,即所有节点能收到所有的消息。 不知道作者们,能不能处理一下这个问题。
package org.apache.eventmesh.runtime.core.protocol.grpc.service;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
@Slf4j public class EventEmitter<T> { private boolean inError = false;
private final StreamObserver<T> emitter;
public EventEmitter(StreamObserver<T> emitter) {
this.emitter = emitter;
}
public synchronized void onNext(T event) {
if (inError) {
return;
}
try {
emitter.onNext(event);
} catch (final Exception e) {
log.warn("StreamObserver Error onNext. {}", e.getMessage());
onError(e);
}
// try { // emitter.onNext(event); // } catch (Exception e) { // log.warn("StreamObserver Error onNext. {}", e.getMessage()); // } }
public synchronized void onCompleted() {
if (inError) {
return;
}
try {
emitter.onCompleted();
} catch (final Exception e) {
log.warn("StreamObserver Error onCompleted. {}", e.getMessage());
onError(e);
}
// try { // emitter.onCompleted(); // } catch (Exception e) { // log.warn("StreamObserver Error onCompleted. {}", e.getMessage()); // } }
public synchronized void onError(Throwable t) {
emitter.onError(t);
inError = true;
log.warn("StreamObserver Error onError. {}", t.getMessage());
// try { // emitter.onError(t); // } catch (Exception e) { // log.warn("StreamObserver Error onError. {}", e.getMessage()); // } }
public StreamObserver<T> getEmitter() {
return emitter;
}
}
How to reproduce
我这应用系统集成1.10的sdk,对接eventmesh服务端,用grpc协议接收消息,发现有时能收到消息有时收不到, eventmesh 服务端总是报错。
Debug logs
2024-04-11 19:15:24,290 ERROR [eventMesh-grpc-pushMsg-2] StreamPushRequest(StreamPushRequest.java:91) - message|eventMesh2client|exception=Stream is already completed, no further calls are allowed |emitter|topic=TEST-TOPIC-GRPC-SYNC|bizSeqNo=949462082066922981761046640402|uniqueId=988003831724711254497105493010|cost=0
java.lang.IllegalStateException: Stream is already completed, no further calls are allowed
at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.0.1-jre.jar:?]
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:375) ~[grpc-stub-1.43.2.jar:1.43.2]
at org.apache.eventmesh.runtime.core.protocol.grpc.push.StreamPushRequest.tryPushRequest(StreamPushRequest.java:81) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
at org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler.lambda$handle$1(MessageHandler.java:75) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [ ] I agree to follow this project's Code of Conduct *
Please correct markdown code block, otherwise it is un-readable.
You may submit a PR and it will be easier to see changes before and after under diff view.
It has been 90 days since the last activity on this issue. Apache EventMesh values the voices of the community. Please don't hesitate to share your latest insights on this matter at any time, as the community is more than willing to engage in discussions regarding the development and optimization directions of this feature.
If you feel that your issue has been resolved, please feel free to close it. Should you have any additional information to share, you are welcome to reopen this issue.