Akkling icon indicating copy to clipboard operation
Akkling copied to clipboard

Streams example is broken

Open anpin opened this issue 3 years ago • 4 comments

Hi, thank you for your effort with this project! I found that streams.fsx example is throwing following exception when I do dotnet fsi ./streams.fsx

System.InvalidCastException: Unable to cast object of type 'Akka.NotUsed' to type 'Microsoft.FSharp.Core.Unit'.
   at Akka.Streams.Implementation.Module.<>c__DisplayClass28_0`3.<Compose>b__0(Object x, Object y)
   at Akka.Streams.Implementation.MaterializerSession.ResolveMaterialized(IMaterializedValueNode node, IDictionary`2 values, Int32 spaces)
   at Akka.Streams.Implementation.MaterializerSession.ResolveMaterialized(IMaterializedValueNode node, IDictionary`2 values, Int32 spaces)
   at Akka.Streams.Implementation.MaterializerSession.ResolveMaterialized(IMaterializedValueNode node, IDictionary`2 values, Int32 spaces)
   at Akka.Streams.Implementation.MaterializerSession.MaterializeModule(IModule module, Attributes effectiveAttributes)
   at Akka.Streams.Implementation.MaterializerSession.Materialize()
   at Akka.Streams.Implementation.ActorMaterializerImpl.Materialize[TMat](IGraph`2 runnable, Func`2 subFlowFuser, Attributes initialAttributes)
   at Akka.Streams.Implementation.ActorMaterializerImpl.Materialize[TMat](IGraph`2 runnable)
   at Akka.Streams.Dsl.RunnableGraph`1.Run(IMaterializer materializer)
   at [email protected](Unit unitVar) in /home/a/Akkling/examples/streams.fsx:line 105
   at Microsoft.FSharp.Control.AsyncPrimitives.CallThenInvoke[T,TResult](AsyncActivation`1 ctxt, TResult result1, FSharpFunc`2 part2) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 517
   at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 112
--- End of stack trace from previous location ---
   at Microsoft.FSharp.Control.AsyncResult`1.Commit() in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 454
   at Microsoft.FSharp.Control.AsyncPrimitives.QueueAsyncAndWaitForResultSynchronously[a](CancellationToken token, FSharpAsync`1 computation, FSharpOption`1 timeout) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 1140
   at Microsoft.FSharp.Control.FSharpAsync.RunSynchronously[T](FSharpAsync`1 computation, FSharpOption`1 timeout, FSharpOption`1 cancellationToken) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 1511
   at <StartupCode$FSI_0002>.$FSI_0002.main@() in /home/a/Akkling/examples/streams.fsx:line 104
   at System.RuntimeMethodHandle.InvokeMethod(Object target, Void** arguments, Signature sig, Boolean isConstructor)
   at System.Reflection.MethodInvoker.Invoke(Object obj, IntPtr* args, BindingFlags invokeAttr)

anpin avatar Dec 08 '22 16:12 anpin

parser ported from the docs returns different signature and works

let replParser =
    Flow.Create<string>().TakeWhile(fun c -> c <> "q")
        .Concat(Source.Single("BYE"))
        .Select(fun elem -> ByteString.FromString($"{elem}\n"));

let repl = Flow.Create<ByteString>()
                .Via(Framing.Delimiter(
                    ByteString.FromString("\n"),
                    maximumFrameLength = 256,
                    allowTruncation = true))
                .Select(fun c -> c.ToString())
                .Select(fun text ->
                            printf $"Server: {text}"
                            text
                )
                .Select(fun text -> Console.ReadLine())
                .Via(replParser);

anpin avatar Dec 08 '22 17:12 anpin

The workaround is:

module Source =
    let singleton s = 
        Source.Single(s).MapMaterializedValue(Func<Akka.NotUsed,Akka.NotUsed>(fun _ -> Akka.NotUsed.Instance))    
        

ingted avatar Jan 02 '23 00:01 ingted

Process A:

image

Process B: image

ingted avatar Jan 02 '23 00:01 ingted

my fork: (master) https://github.com/ingted/Akkling

ingted avatar Jan 02 '23 00:01 ingted