protoactor-kotlin icon indicating copy to clipboard operation
protoactor-kotlin copied to clipboard

Remote terminates when connection is lost

Open orjan opened this issue 7 years ago • 13 comments

I've been fiddling around a little bit on how to create a set of cross language acceptance tests. The first take was trying to port https://github.com/AsynkronIT/protoactor-go/blob/dev/examples/remoteactivate/node2/main.go to kotlin and call it from https://github.com/AsynkronIT/protoactor-go/blob/dev/examples/remoteactivate/node1/main.go

Node 1 will receive the message but when it'll exit the kotlin remote will fail hard.

fun main(args: Array<String>) {
    Serialization.registerFileDescriptor(Messages.getDescriptor())
    Remote.start("127.0.0.1", 8080)
    val props: Props = fromProducer { HelloActor() }
    Remote.registerKnownKind("hello", props)
    readLine()
}

class HelloActor : Actor {
    suspend override fun Context.receive(msg: Any) {
        when (msg) {
            is actor.proto.acceptance.Messages.HelloRequest -> {
                respond(Messages.HelloResponse.newBuilder()
                        .setMessage("Hello from kotlin")
                        .build())
            }
            else -> {
                println("Another message: " + msg)
            }
        }
    }
}
Connecting to address 127.0.0.1:8081
Connected to address 127.0.0.1:8081
io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
Dec 30, 2017 11:22:58 PM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed@79157d48
java.lang.NullPointerException
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:418)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:41)
	at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:663)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:41)
	at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:392)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:443)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:525)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:446)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:557)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

orjan avatar Dec 30 '17 22:12 orjan

The go implementation can survive that node 1 exits. It looks like the issue is that we're not passing an observer here: https://github.com/AsynkronIT/protoactor-kotlin/blob/master/proto-remote/src/main/kotlin/actor/proto/remote/EndpointWriter.kt#L81

orjan avatar Dec 30 '17 22:12 orjan

Is that related to never getting an EndpointTerminatedEvent on the Event Stream when a remote dies - and consequently not receiving any Terminated messages for watched actors on that remote? With the commented lines just below the above link - it doesn't appear EndpointTerminatedEvent is ever published anywhere in the code, and I think generating Terminated messages for watchers of related actors relies on this event?

james-cobb avatar Jan 13 '18 21:01 james-cobb

So this seems to work:

 streamWriter = client.receive(object : StreamObserver<RemoteProtos.Unit> {
            override fun onNext(value: RemoteProtos.Unit?) {
                //never called
            }
            override fun onCompleted() {
               //never called
            }
            override fun onError(t: Throwable?) {
                when(t) {
                    is StatusRuntimeException -> {
                        if ("UNAVAILABLE".equals(t.status.code.name)) {
                            val terminated: EndpointTerminatedEvent = EndpointTerminatedEvent(address)
                            EventStream.publish(terminated)
                            println("Lost connection to address $address")
                        } else {
                            println("Stream Status Runtime Exception "+t.toString())
                        }
                    }
                    else -> {
                        println("Other exception "+t.toString())
                    }
                }
            }
        })

The EndpointTerminatedEvent is now received by Event Stream subscribers. And watchers of actors on a terminated remote are sent a Terminated message.

Haven't yet tested:

  • What happens if the remote endpoint quickly recovers. I don't think there's currently any retry
  • What state the termination leaves the remote. So what happens if after the termination a message is sent to an actor on the terminated remote. Is the expected behaviour that this would cause a DeadLetter event? Or is another connection attempt expected?

james-cobb avatar Jan 14 '18 11:01 james-cobb

@james-cobb nice drill down of the problem! @rogeralsing would you mind having a look at the questions?

orjan avatar Jan 14 '18 11:01 orjan

What state the termination leaves the remote. So what happens if after the termination a message is sent to an actor on the terminated remote. Is the expected behaviour that this would cause a DeadLetter event? Or is another connection attempt expected?

In the Go and C# impl, the EndpintWatcher actor will detect the termination and trigger a Terminated event to be sent to all subscribers of a remote actors lifecycles.

That is, if a local actor has used context.Watch(remotePID) that local actor will get a Terminated message with the remote PID if the endpoint breaks.

rogeralsing avatar Jan 14 '18 11:01 rogeralsing

https://github.com/AsynkronIT/protoactor-dotnet/blob/dev/src/Proto.Remote/EndpointWatcher.cs#L59

I'm not sure in what state the Kotlin impl is in this regards, me and @PotterDai have done some work on both Go and C# in this specific regard. Also when it comes to re-trying connections

rogeralsing avatar Jan 14 '18 11:01 rogeralsing

Thanks for the responses. In digging into this I've found a couple of other issues that were causing problems for the EndpointWriter - I'll report as separate issues on github, then circle back on this one.

james-cobb avatar Jan 15 '18 19:01 james-cobb

@james-cobb great initiative, feel free to submit pull requests if you want to!

// Örjan

On Mon, Jan 15, 2018 at 8:38 PM, james-cobb [email protected] wrote:

Thanks for the responses. In digging into this I've found a couple of other issues that were causing problems for the EndpointWriter - I'll report as separate issues on github, then circle back on this one.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/AsynkronIT/protoactor-kotlin/issues/27#issuecomment-357771820, or mute the thread https://github.com/notifications/unsubscribe-auth/AAHkgKFYw1CB9mxqN2M5Y0WechbhquWnks5tK6lGgaJpZM4RPtnM .

-- Örjan Sjöholm 0727 - 130037

orjan avatar Jan 16 '18 11:01 orjan

@orjan thanks! I think I'll be comfortable to submit some pull requests soon. But need some guidance on the required behaviour in some of the cases I've been unpicking - see #29 #30 #31

james-cobb avatar Jan 16 '18 18:01 james-cobb

@orjan I'm readying a pull request. In addition to the StreamObserver to catch exceptions from gRPC, I made some of the gRPC options like keepalives configurable. When a node vanishes, it was taking 15 minutes for gRPC to realise, which means 15 minutes of messages sent to remote actors that are silently lost. Using keepAliveTime and keepAliveTimeout we can reduce this to only a few seconds, but I think this needs to be configurable for each application.

Can I check if you're open to the way I've done this, using the RemoteConfig class before I make a PR?

https://github.com/crowdconnected/protoactor-kotlin/blob/4a9565735213b43d26998dc05677d3d2b4b8ca82/proto-remote/src/main/kotlin/actor/proto/remote/RemoteConfig.kt

james-cobb avatar Jan 18 '18 11:01 james-cobb

I would say go ahead and submit a PR!

2 small concerns:

  • Does all fields really need to be nullable
  • Can we apply sensible default values

On Thu, Jan 18, 2018 at 12:47 PM, james-cobb [email protected] wrote:

@orjan https://github.com/orjan I'm readying a pull request. In addition to the StreamObserver to catch exceptions from gRPC, I made some of the gRPC options like keepalives configurable. When a node vanishes, it was taking 15 minutes for gRPC to realise, which means 15 minutes of messages sent to remote actors that are silently lost. Using keepAliveTime and keepAliveTimeout we can reduce this to only a few seconds, but I think this needs to be configurable for each application.

Can I check if you're open to the way I've done this, using the RemoteConfig class before I make a PR?

https://github.com/crowdconnected/protoactor-kotlin/blob/ 4a9565735213b43d26998dc05677d3d2b4b8ca82/proto-remote/src/ main/kotlin/actor/proto/remote/RemoteConfig.kt

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/AsynkronIT/protoactor-kotlin/issues/27#issuecomment-358623551, or mute the thread https://github.com/notifications/unsubscribe-auth/AAHkgKLnMM6GWIJmhAuFkXQej92d0YTwks5tLy9mgaJpZM4RPtnM .

-- Örjan Sjöholm 0727 - 130037

orjan avatar Jan 19 '18 07:01 orjan

My thinking was not to apply any defaults within the code - if a parameter is not explicitly set in an application using Remote.Config, then it's not set at all when instantiating the gRPC server / client. Which means their defaults are used, and if the gRPC library changes in future releases, we don't need to update default values that have become embedded into proto.actor. That was the reason for everything being nullable. Except usePlainText, which defaults to true simply to make sure that the change is backward compatible. If no settings are specified, the connection will fall back to exactly the current set up.

I'll go ahead with a PR

james-cobb avatar Jan 19 '18 08:01 james-cobb

Sorry my bad! You're right, it's better to use the default from gRPC!

On Fri, Jan 19, 2018 at 9:46 AM, james-cobb [email protected] wrote:

My thinking was not to apply any defaults within the code - if a parameter is not explicitly set in an application using Remote.Config, then it's not set at all when instantiating the gRPC server / client. Which means their defaults are used, and if the gRPC library changes in future releases, we don't need to update default values that have become embedded into proto.actor. That was the reason for everything being nullable. Except usePlainText, which defaults to true simply to make sure that the change is backward compatible. If no settings are specified, the connection will fall back to exactly the current set up.

I'll go ahead with a PR

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/AsynkronIT/protoactor-kotlin/issues/27#issuecomment-358902480, or mute the thread https://github.com/notifications/unsubscribe-auth/AAHkgAqhBZZV4Zf5y6BO5Ff6HDmKKPogks5tMFZrgaJpZM4RPtnM .

-- Örjan Sjöholm 0727 - 130037

orjan avatar Jan 19 '18 13:01 orjan