scalaz-stream icon indicating copy to clipboard operation
scalaz-stream copied to clipboard

parallel multi sink

Open dmgcodevil opened this issue 6 years ago • 0 comments

I just started using scalaz-stream library and I'd like to implement the following scenario. I have a component which reads some events from a source stream (scalaz queue), mutates it's state and sends some messages to external components. Component should be able to send outgoing messages in parallel, so it shouldn't wait for a component A to receive the message before it can start to send another message. Below you will find my code, it works, but I'm skeptical and feel that there should be some standard way of doing such things, more idiomatic scalaz approach.

  object Component {

    val queue = async.boundedQueue[Event](10)

    // updates the internal state and creates new event
    def process(event: Event) : Event = _

    val componentA: Sink[Task, Event] = _
    val componentB: Sink[Task, Event] = _
    val componentC: Sink[Task, Event] = _

    val processorAsync: Channel[Task, Event, Event] = {
      val pf: Event => Task[Event] = { inEvent =>
        Task.now(process(inEvent)).flatMap { outEvent =>
          val sinks = merge.mergeN(10)(Process(
            send(outEvent, componentA),
            send(outEvent, componentB),
            send(outEvent, componentC)
          )).run
          sinks.map(_ => inEvent)
        }
      }
      channel.lift(pf)
    }

    def send(event: Event, out: Sink[Task, Event]): Process[Task, Unit] = {
      Process.eval(Task.now(event)) to out
    }

    def run(): Process[Task, Unit] = {
      queue.dequeue through processorAsync to emptySink // toSource isn't defined for Process, only for Process0
    }

    val emptySink: Sink[Task, Event] = sink.lift[Task, Event](event => Task.now(event))

    trait Event

  }

PS: thread safe to update the local state within scalaz Channel w/o any synchronization (locks, atomic and etc ) ?

https://stackoverflow.com/questions/49991517/scalaz-stream-parallel-multi-sink

dmgcodevil avatar Apr 25 '18 14:04 dmgcodevil