Akkling icon indicating copy to clipboard operation
Akkling copied to clipboard

ClusterClient needs to client.Tell twice, and the second invocation by tell succeeded

Open ingted opened this issue 3 years ago • 4 comments

#r "nuget: Akka.Serialization.Hyperion"
#r "nuget: Akka.Cluster"
#r "nuget: Akka.Remote"
#r "nuget: Akka.Cluster.Tools"
#r "nuget: Akkling"
#r "nuget: Akkling.Cluster.Sharding"
#r "nuget: Microsoft.Extensions.Logging"
#r "nuget: Microsoft.Extensions.Logging.Abstractions"

open System
open Akka.Cluster
open Akkling
open Akkling.Cluster
open Akka.Actor


let configWithPort port ifSeeding ifCluster =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
                #akka-pubsub = "Akka.Cluster.Tools.PublishSubscribe.Serialization.DistributedPubSubMessageSerializer, Akka.Cluster.Tools"
                akka-cluster-client : "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools"
              }
              serialization-bindings {
                "System.Object" = hyperion
                #"Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, Akka.Cluster.Tools" = akka-pubsub
                #"Akka.Cluster.Tools.PublishSubscribe.Internal.SendToOneSubscriber, Akka.Cluster.Tools" = akka-pubsub
                "Akka.Cluster.Tools.Client.IClusterClientMessage, Akka.Cluster.Tools" : akka-cluster-client
              }
              serialization-identifiers {
                #"Akka.Cluster.Tools.PublishSubscribe.Serialization.DistributedPubSubMessageSerializer, Akka.Cluster.Tools" = 9
                "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools" : 15
              }
            }
          remote {
            dot-netty.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = """ + port.ToString() + """
            }
          }""" + 
                    if ifCluster then """
          cluster {
            auto-down-unreachable-after = 5s""" + 
                                                    if ifSeeding then """
            seed-nodes = [ "akka.tcp://cluster-system@localhost:5000/" ]""" 
                                                    else "" + """
          }""" 
                    else "" + """
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"
          }""" +
                    if ifCluster then """
          extensions = ["Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider,Akka.Cluster.Tools"]
          """       else "" + """
        }
        """)
    if ifSeeding then 
        config.WithFallback(Akka.Cluster.Sharding.ClusterSharding.DefaultConfig())
    else
        config
          //.WithFallback(Akka.Cluster.Tools.Singleton.ClusterSingletonManager.DefaultConfig())
          //.WithFallback(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSub.DefaultConfig())

let system0 = Akkling.System.create "cluster-system" <| (configWithPort 5000 true true)
let system1 = Akkling.System.create "cluster-system" <| (configWithPort 5001 true true)
let system2 = Akkling.System.create "cluster-system" <| (configWithPort 5002 false false)
let ttc1:IActorRef<obj> = spawn system1 ("ttc" + Guid.NewGuid().ToString()) (props (Behaviors.printf "%A"))
let ttc2:IActorRef<obj> = spawn system2 ("ttc" + Guid.NewGuid().ToString()) (props (Behaviors.printf "%A"))

let receptionist1 = Akkling.Cluster.ClusterClient.receptionist system0
let receptionist2 = Akkling.Cluster.ClusterClient.receptionist system1

receptionist1.RegisterService (untyped ttc1)
receptionist2.RegisterService (untyped ttc2)

let initialContacts = 
    Collections.Immutable.ImmutableHashSet.Create<ActorPath>(
        [|
            ActorPath.Parse(Cluster.Get(system0).SelfAddress.ToString() + "/system/receptionist")
            ActorPath.Parse(Cluster.Get(system1).SelfAddress.ToString() + "/system/receptionist")
        |]
        )
let contacts = initialContacts :> Collections.Immutable.IImmutableSet<ActorPath>

let settingClient = 
    Akka.Cluster.Tools.Client.ClusterClientSettings.Create(system2).WithInitialContacts(contacts)

let propsClient = Akka.Cluster.Tools.Client.ClusterClient.Props(settingClient)

let client : IActorRef<obj> = 
    spawn system2 ("client" + Guid.NewGuid().ToString()) (Akkling.Props.Props.From propsClient)

let path = "/" +  ttc1.Path.ToString().Replace(ttc1.Path.Root.ToString(), "")

let send = 
    new Akka.Cluster.Tools.Client.ClusterClient.Send(
        path, 
        box "printIt", 
        true)
client.Tell(send, Akka.Actor.ActorRefs.Nobody) //nothing printed
client.Tell(send, Akka.Actor.ActorRefs.Nobody)

With Akkling the first tell doesn't work.

==================================================

#r "nuget: Akka.Serialization.Hyperion"
#r "nuget: Hyperion"
#r "nuget: Akka.Cluster"
#r "nuget: Akka.Remote"
#r "nuget: Akka.Cluster.Tools"
#r "nuget: Akka.Cluster.Sharding"
#r "nuget: Akka.FSharp"
#r "nuget: Microsoft.Extensions.Logging"
#r "nuget: Microsoft.Extensions.Logging.Abstractions"
#r "nuget: System.Collections.Immutable"
#r "nuget: System.Reflection.TypeExtensions"

open System
open Akka.Cluster
open Akka.FSharp
open Akka.Cluster
open Akka.Actor


let configWithPort port ifSeeding ifCluster =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
                #akka-pubsub = "Akka.Cluster.Tools.PublishSubscribe.Serialization.DistributedPubSubMessageSerializer, Akka.Cluster.Tools"
                akka-cluster-client : "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools"
              }
              serialization-bindings {
                "System.Object" = hyperion
                #"Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, Akka.Cluster.Tools" = akka-pubsub
                #"Akka.Cluster.Tools.PublishSubscribe.Internal.SendToOneSubscriber, Akka.Cluster.Tools" = akka-pubsub
                "Akka.Cluster.Tools.Client.IClusterClientMessage, Akka.Cluster.Tools" : akka-cluster-client
              }
              serialization-identifiers {
                #"Akka.Cluster.Tools.PublishSubscribe.Serialization.DistributedPubSubMessageSerializer, Akka.Cluster.Tools" = 9
                "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools" : 15
              }
            }
          remote {
            dot-netty.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = """ + port.ToString() + """
            }
          }""" + 
                    if ifCluster then """
          cluster {
            auto-down-unreachable-after = 5s""" + 
                                                    if ifSeeding then """
            seed-nodes = [ "akka.tcp://cluster-system@localhost:5000/" ]""" 
                                                    else "" + """
          }""" 
                    else "" + """
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"
          }""" +
                    if ifCluster then """
          extensions = ["Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider,Akka.Cluster.Tools"]
          """       else "" + """
        }
        """)
    if ifSeeding then 
        config.WithFallback(Akka.Cluster.Sharding.ClusterSharding.DefaultConfig())
    else
        config
          //.WithFallback(Akka.Cluster.Tools.Singleton.ClusterSingletonManager.DefaultConfig())
          //.WithFallback(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSub.DefaultConfig())

let system0 = System.create "cluster-system" <| (configWithPort 5000 true true)
let system1 = System.create "cluster-system" <| (configWithPort 5001 true true)
let system2 = System.create "cluster-system" <| (configWithPort 5002 false false)
let ttc1 = spawn system1 ("ttc-" + Guid.NewGuid().ToString()) (actorOf (fun m -> printfn "%A" m))
let ttc2 = spawn system1 ("ttc-" + Guid.NewGuid().ToString()) (actorOf (fun m -> printfn "%A" m))

Akka.Cluster.Tools.Client.ClusterClientReceptionist.Get(system0).RegisterService(ttc1)
Akka.Cluster.Tools.Client.ClusterClientReceptionist.Get(system1).RegisterService(ttc2)

let initialContacts = 
    Collections.Immutable.ImmutableHashSet.Create<ActorPath>(
        [|
            ActorPath.Parse(Cluster.Get(system0).SelfAddress.ToString() + "/system/receptionist")
            ActorPath.Parse(Cluster.Get(system1).SelfAddress.ToString() + "/system/receptionist")
        |]
        )
let contacts = initialContacts :> Collections.Immutable.IImmutableSet<ActorPath>

let settingClient = 
    Akka.Cluster.Tools.Client.ClusterClientSettings.Create(system2).WithInitialContacts(contacts)

let propsClient = Akka.Cluster.Tools.Client.ClusterClient.Props(settingClient)

let client : IActorRef = 
    system2.ActorOf(propsClient, ("client" + Guid.NewGuid().ToString()))

let path = "/" +  ttc1.Path.ToString().Replace(ttc1.Path.Root.ToString(), "")

let send = 
    new Akka.Cluster.Tools.Client.ClusterClient.Send(
        path, 
        box "printIt", 
        true)
client.Tell(send, Akka.Actor.ActorRefs.Nobody)

With Akka.FSharp, the first tell immediately works.

Not sure if there is anything I missed... but this is a littly weired...

ingted avatar Dec 28 '20 10:12 ingted

Is I select all and press 'Alt+Enter' sending the all script to fsi, it works.

image

But if I select from first to "let send =xxxx", 'Alt+Enter', there will be still nothing printed... Still need second tell.

image

And I take a look at the end of the [INFO], string "printIt" is not there...

image

ingted avatar Dec 28 '20 10:12 ingted

image

Akka.FSharp, first tell with string printed.

ingted avatar Dec 28 '20 10:12 ingted

I've tried your top-most code for Akkling and it seems to work:

[INFO][2020-12-28 12:55:58 PM][Thread 0018][akka.tcp://cluster-system@localhost:5002/user/client522d79a9-7d51-4698-8af4-b9887e1d2f0d] Connected to [akka.tcp://cluster-system@localhost:5000/system/recept
ionist]
[INFO][2020-12-28 12:55:59 PM][Thread 0018][Cluster (akka://cluster-system)] Cluster Node [akka.tcp://cluster-system@localhost:5000] - Leader is moving node [akka.tcp://cluster-system@localhost:5001] to
 [Up]
"printIt""printIt"[INFO][2020-12-28 12:55:59 PM][Thread 0018][Cluster (akka://cluster-system)] Cluster Node [akka.tcp://cluster-system@localhost:5001] - Welcome from [akka.tcp://cluster-system@localhost
:5000]

Horusiath avatar Dec 28 '20 12:12 Horusiath

Yes, me too. Execution of all script selected result correctly. But if I select only code from first line to

let send = 
    new Akka.Cluster.Tools.Client.ClusterClient.Send(
        path, 
        box "printIt", 
        true)

And [Alt+Enter] first time

Then execute ([Alt+Enter] second time)

client.Tell(send, Akka.Actor.ActorRefs.Nobody)

Fsi would not print Once I execute ([Alt+Enter] third time)

client.Tell(send, Akka.Actor.ActorRefs.Nobody)

again, it prints...

(just feel weird, would you mind try it once? not to execute all code in one time, but interactively execute the last "tell"...)

Do you think this code snippet could be part of the example? if so, please let me have the pleasure to make a merge request later...

Thank you! ^__^

ingted avatar Dec 28 '20 16:12 ingted