FSharp.Control.AsyncSeq icon indicating copy to clipboard operation
FSharp.Control.AsyncSeq copied to clipboard

AsyncSeq.mapAsyncParallel alternatives

Open Xyncgas opened this issue 3 years ago • 3 comments

AsyncSeq.mapAsyncParallel returns a sequence with order preserved, however we often want to read the first thing that's available and throw away the rest

I propose a generic version AsyncSeq.mapAsyncUnorderedParallel that returns a sequence in the order of first available

Xyncgas avatar Sep 06 '22 09:09 Xyncgas

@dsyme @martinmoec @eulerfx

Xyncgas avatar Sep 06 '22 09:09 Xyncgas

Here's the code

module ChannelExtention
    open System.Threading
    open System.Threading.Channels
    open FSharp.Control
    
    let CreateCompleted () =
        let C = Channel.CreateUnbounded()
        C.Writer.Complete()
        C

    open LanguagePrimitives
    let inline private AsyncRangeProduce (dataChannel:Channel<'a>) producer range =
        match range with
        | AbsPosRangeByLength (pos, length) ->
            async{
                let End = pos + length - GenericOne
                let mutable i = pos;
                let Mutex = new Mutex();
                let mutable ExecutedTasks = GenericZero;
                while i<=End do
                    let lc_i = i
                    async{
                        try
                            let! value = producer(lc_i)
                            let! _ = dataChannel.Writer.WriteAsync(value).AsTask() |> Async.AwaitTask
                            ()
                        finally
                            Mutex.WaitOne() |> ignore
                            ExecutedTasks <- ExecutedTasks + GenericOne
                            Mutex.ReleaseMutex()
                            if ExecutedTasks = length then dataChannel.Writer.Complete()
                    } |> Async.Start
                    i <- i + GenericOne
            } |> Async.Start
            dataChannel.Reader.ReadAllAsync() |> toAsyncSeq
        | _ -> CreateCompleted().Reader.ReadAllAsync() |> toAsyncSeq

    let inline private AsyncSequenceProduce (dataChannel:Channel<'a>) producer source =
        async{
            let Mutex = new Mutex();
            let mutable Completed = GenericZero;
            let mutable Schedueled = GenericZero;
            let mutable HasElement = false;
            source
            |> AsyncSeq.iter (fun x->
                HasElement <- true
                Schedueled <- Checked.(+) Schedueled GenericOne
                let lc_x = x
                async{
                    try
                        let! value = producer(lc_x)
                        let! _ = dataChannel.Writer.WriteAsync(value).AsTask() |> Async.AwaitTask
                        ()
                    finally
                        Mutex.WaitOne() |> ignore
                        Completed <- Completed + GenericOne
                        Mutex.ReleaseMutex()
                        if Completed = Schedueled then dataChannel.Writer.Complete()
                } |> Async.Start)
            |> Async.RunSynchronously
            if not HasElement then dataChannel.Writer.Complete()
        } |> Async.Start
        dataChannel.Reader.ReadAllAsync() |> toAsyncSeq

    let inline writeRangeToBounded capacity = AsyncRangeProduce (CreateChannelHelper.CreateBounded(capacity)) 
    
    let inline writeRangeToUnbounded() = AsyncRangeProduce (CreateChannelHelper.CreateUnbounded())

    let inline writeSequenceToBounded capacity = AsyncSequenceProduce (CreateChannelHelper.CreateBounded(capacity)) 

    let inline writeSequenceToUnbounded () = AsyncSequenceProduce (CreateChannelHelper.CreateUnbounded())
//Takes a function, AsyncSeq<'a>, returns AsyncSeq<'b> with first availability

Xyncgas avatar Sep 06 '22 10:09 Xyncgas

@Xyncgas Seems reasonable!

dsyme avatar Sep 07 '22 11:09 dsyme