FSharp.Control.AsyncSeq
FSharp.Control.AsyncSeq copied to clipboard
AsyncSeq.mapAsyncParallel alternatives
AsyncSeq.mapAsyncParallel returns a sequence with order preserved, however we often want to read the first thing that's available and throw away the rest
I propose a generic version AsyncSeq.mapAsyncUnorderedParallel that returns a sequence in the order of first available
@dsyme @martinmoec @eulerfx
Here's the code
module ChannelExtention
open System.Threading
open System.Threading.Channels
open FSharp.Control
let CreateCompleted () =
let C = Channel.CreateUnbounded()
C.Writer.Complete()
C
open LanguagePrimitives
let inline private AsyncRangeProduce (dataChannel:Channel<'a>) producer range =
match range with
| AbsPosRangeByLength (pos, length) ->
async{
let End = pos + length - GenericOne
let mutable i = pos;
let Mutex = new Mutex();
let mutable ExecutedTasks = GenericZero;
while i<=End do
let lc_i = i
async{
try
let! value = producer(lc_i)
let! _ = dataChannel.Writer.WriteAsync(value).AsTask() |> Async.AwaitTask
()
finally
Mutex.WaitOne() |> ignore
ExecutedTasks <- ExecutedTasks + GenericOne
Mutex.ReleaseMutex()
if ExecutedTasks = length then dataChannel.Writer.Complete()
} |> Async.Start
i <- i + GenericOne
} |> Async.Start
dataChannel.Reader.ReadAllAsync() |> toAsyncSeq
| _ -> CreateCompleted().Reader.ReadAllAsync() |> toAsyncSeq
let inline private AsyncSequenceProduce (dataChannel:Channel<'a>) producer source =
async{
let Mutex = new Mutex();
let mutable Completed = GenericZero;
let mutable Schedueled = GenericZero;
let mutable HasElement = false;
source
|> AsyncSeq.iter (fun x->
HasElement <- true
Schedueled <- Checked.(+) Schedueled GenericOne
let lc_x = x
async{
try
let! value = producer(lc_x)
let! _ = dataChannel.Writer.WriteAsync(value).AsTask() |> Async.AwaitTask
()
finally
Mutex.WaitOne() |> ignore
Completed <- Completed + GenericOne
Mutex.ReleaseMutex()
if Completed = Schedueled then dataChannel.Writer.Complete()
} |> Async.Start)
|> Async.RunSynchronously
if not HasElement then dataChannel.Writer.Complete()
} |> Async.Start
dataChannel.Reader.ReadAllAsync() |> toAsyncSeq
let inline writeRangeToBounded capacity = AsyncRangeProduce (CreateChannelHelper.CreateBounded(capacity))
let inline writeRangeToUnbounded() = AsyncRangeProduce (CreateChannelHelper.CreateUnbounded())
let inline writeSequenceToBounded capacity = AsyncSequenceProduce (CreateChannelHelper.CreateBounded(capacity))
let inline writeSequenceToUnbounded () = AsyncSequenceProduce (CreateChannelHelper.CreateUnbounded())
//Takes a function, AsyncSeq<'a>, returns AsyncSeq<'b> with first availability
@Xyncgas Seems reasonable!