Akkling
Akkling copied to clipboard
Streams example is broken
Hi, thank you for your effort with this project! I found that streams.fsx example is throwing following exception when I do
dotnet fsi ./streams.fsx
System.InvalidCastException: Unable to cast object of type 'Akka.NotUsed' to type 'Microsoft.FSharp.Core.Unit'.
at Akka.Streams.Implementation.Module.<>c__DisplayClass28_0`3.<Compose>b__0(Object x, Object y)
at Akka.Streams.Implementation.MaterializerSession.ResolveMaterialized(IMaterializedValueNode node, IDictionary`2 values, Int32 spaces)
at Akka.Streams.Implementation.MaterializerSession.ResolveMaterialized(IMaterializedValueNode node, IDictionary`2 values, Int32 spaces)
at Akka.Streams.Implementation.MaterializerSession.ResolveMaterialized(IMaterializedValueNode node, IDictionary`2 values, Int32 spaces)
at Akka.Streams.Implementation.MaterializerSession.MaterializeModule(IModule module, Attributes effectiveAttributes)
at Akka.Streams.Implementation.MaterializerSession.Materialize()
at Akka.Streams.Implementation.ActorMaterializerImpl.Materialize[TMat](IGraph`2 runnable, Func`2 subFlowFuser, Attributes initialAttributes)
at Akka.Streams.Implementation.ActorMaterializerImpl.Materialize[TMat](IGraph`2 runnable)
at Akka.Streams.Dsl.RunnableGraph`1.Run(IMaterializer materializer)
at [email protected](Unit unitVar) in /home/a/Akkling/examples/streams.fsx:line 105
at Microsoft.FSharp.Control.AsyncPrimitives.CallThenInvoke[T,TResult](AsyncActivation`1 ctxt, TResult result1, FSharpFunc`2 part2) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 517
at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 112
--- End of stack trace from previous location ---
at Microsoft.FSharp.Control.AsyncResult`1.Commit() in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 454
at Microsoft.FSharp.Control.AsyncPrimitives.QueueAsyncAndWaitForResultSynchronously[a](CancellationToken token, FSharpAsync`1 computation, FSharpOption`1 timeout) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 1140
at Microsoft.FSharp.Control.FSharpAsync.RunSynchronously[T](FSharpAsync`1 computation, FSharpOption`1 timeout, FSharpOption`1 cancellationToken) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 1511
at <StartupCode$FSI_0002>.$FSI_0002.main@() in /home/a/Akkling/examples/streams.fsx:line 104
at System.RuntimeMethodHandle.InvokeMethod(Object target, Void** arguments, Signature sig, Boolean isConstructor)
at System.Reflection.MethodInvoker.Invoke(Object obj, IntPtr* args, BindingFlags invokeAttr)
parser ported from the docs returns different signature and works
let replParser =
Flow.Create<string>().TakeWhile(fun c -> c <> "q")
.Concat(Source.Single("BYE"))
.Select(fun elem -> ByteString.FromString($"{elem}\n"));
let repl = Flow.Create<ByteString>()
.Via(Framing.Delimiter(
ByteString.FromString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.Select(fun c -> c.ToString())
.Select(fun text ->
printf $"Server: {text}"
text
)
.Select(fun text -> Console.ReadLine())
.Via(replParser);
The workaround is:
module Source =
let singleton s =
Source.Single(s).MapMaterializedValue(Func<Akka.NotUsed,Akka.NotUsed>(fun _ -> Akka.NotUsed.Instance))
Process A:

Process B:

my fork: (master) https://github.com/ingted/Akkling