Suggest to add a `chunkBy` function (to group and yield adjacent items sharing the same key) to the library
Description
Bumped into this at work the other day, and thought this could be a nice addition to the library (unless it's already part of it under another name which wouldn't be too surprising since I'm pretty good at missing out the very obvious).
Basically the idea would be to add a chunkBy function to the AsyncSeq module that allows to group and yield adjacent items sharing the same key in the gist of what has been done for Seq.chunkBy in the F#+ library here, i.e.:
open System
open System.Threading
open FSharpPlus
open FSharp.Control
[<RequireQualifiedAccess>]
module AsyncSeq =
let chunkBy (projection: 'T -> 'Key) (source: AsyncSeq<'T>) = asyncSeq {
use e = source.GetEnumerator()
let mutable currentMaybe = None
let! firstCurrentMaybe = e.MoveNext()
currentMaybe <- firstCurrentMaybe
if currentMaybe.IsSome then
let mutable g = projection currentMaybe.Value
let mutable members = ResizeArray()
members.Add currentMaybe.Value
let! preWhileCurrentMaybe = e.MoveNext()
currentMaybe <- preWhileCurrentMaybe
while currentMaybe.IsSome do
let key = projection currentMaybe.Value
if g = key then
members.Add currentMaybe.Value
else
yield (g, members)
g <- key
members <- ResizeArray ()
members.Add currentMaybe.Value
let! whileCurrentMaybe = e.MoveNext()
currentMaybe <- whileCurrentMaybe
yield (g, members)
}
let chunkBy (projection: 'T -> 'Key) (source: _ seq) = seq {
use e = source.GetEnumerator ()
if e.MoveNext () then
let mutable g = projection e.Current
let mutable members = ResizeArray ()
members.Add e.Current
while e.MoveNext () do
let key = projection e.Current
if g = key then members.Add e.Current
else
yield g, members
g <- key
members <- ResizeArray ()
members.Add e.Current
yield g, members }
A naive impl., ahem translation of above I ended up with:
let chunkBy (projection: 'T -> 'Key) (source: AsyncSeq<'T>) = asyncSeq {
use e = source.GetEnumerator()
let mutable currentMaybe = None
let! firstCurrentMaybe = e.MoveNext()
currentMaybe <- firstCurrentMaybe
if currentMaybe.IsSome then
let mutable g = projection currentMaybe.Value
let mutable members = ResizeArray()
members.Add currentMaybe.Value
let! preWhileCurrentMaybe = e.MoveNext()
currentMaybe <- preWhileCurrentMaybe
while currentMaybe.IsSome do
let key = projection currentMaybe.Value
if g = key then
members.Add currentMaybe.Value
else
yield (g, members)
g <- key
members <- ResizeArray ()
members.Add currentMaybe.Value
let! whileCurrentMaybe = e.MoveNext()
currentMaybe <- whileCurrentMaybe
yield (g, members)
}
I've checked a bunch of Seq more functional approaches which seem to require more allocations: https://stackoverflow.com/a/38495042/4636721
And here is a working example:
[<EntryPoint>]
let main _ =
let generateCpiEntities() =
let alphabet = [| 'a' .. 'z' |]
seq { 0 .. 13 }
|> Seq.map (fun x -> Thread.Sleep 500; FakeCpiEntity.Of(x / 3, [ Array.get alphabet x ]))
let pushGroupsToGrpc = String.join "," >> sprintf "Push %A" >> String.replace "\n" String.Empty >> printfn "%s"
let grpcEntityToSendThreshold = 2
generateCpiEntities()
|> Seq.chunkBy FakeCpiEntity.ToPrimaryKey
|> Seq.map FakeGrpcEntity.OfCpiEntities
|> Seq.chunkBySize grpcEntityToSendThreshold
|> Seq.iter pushGroupsToGrpc
generateCpiEntities()
|> AsyncSeq.ofSeq
|> AsyncSeq.chunkBy FakeCpiEntity.ToPrimaryKey
|> AsyncSeq.map FakeGrpcEntity.OfCpiEntities
|> AsyncSeq.chunkBySize grpcEntityToSendThreshold
|> AsyncSeq.iter pushGroupsToGrpc
|> Async.RunSynchronously
0
Sample output:
Push "{ Id = 0 Items = ['a'; 'b'; 'c'] },{ Id = 1 Items = ['d'; 'e'; 'f'] }"
Push "{ Id = 2 Items = ['g'; 'h'; 'i'] },{ Id = 3 Items = ['j'; 'k'; 'l'] }"
Push "{ Id = 4 Items = ['m'; 'n'] }"
Push "{ Id = 0 Items = ['a'; 'b'; 'c'] },{ Id = 1 Items = ['d'; 'e'; 'f'] }"
Push "{ Id = 2 Items = ['g'; 'h'; 'i'] },{ Id = 3 Items = ['j'; 'k'; 'l'] }"
Push "{ Id = 4 Items = ['m'; 'n'] }"
(Also, I know [random] vertical alignment is dumb but I can't help it)
Wdyt? Do you think this feature is worthy-enough🔨⚡ (or just relevant 🤔) to be part of the library?
@natalie-o-perret Looks good, please consider making a PR, thanks
You may want to take a close look at what happens if multiple threads are iterating with the resulting sequence. I think the code can cause race conditions. There are functions in this library that specifically warn about that, so maybe it’s enough to just say so in the doc comments.
I’m currently considering adding the same to TaskSeq, then came across this.
You may want to take a close look at what happens if multiple threads are iterating with the resulting sequence. I think the code can cause race conditions. There are functions in this library that specifically warn about that, so maybe it’s enough to just say so in the doc comments.
I’m currently considering adding the same to
TaskSeq, then came across this.
This is a good point 🤔.
Tbs, it's also the case with the seq then, innit?
What about the other functions of the AsyncSeq module?
Are they all safe regarding the race conditions when multiple threads are iterating the resulting sequence?
@natalie-o-perret I’m not sure about seq, I’d have to check. But I think the premise there is that you get a new enumerator, but that you shouldn’t share this enumerator between threads.
My concern with AsyncSeq is that there’s a much larger chance it’s used in multi threaded scenarios. Not entirely sure what the base premise is with sharing enumerators. While that’s generally code smell, I think this lib ought to be safe in that regard, but I’d have to check.
@dsyme is there a general idea/consensus in the dotnet ecosystem that sequences should be thread safe, but enumerators of these sequences are not (or: aren’t guaranteed to be)?
It waa my understanding that most collections are safe for reading, but not safe for writing. The thing is, in these functions like chunkBy or cache, we’re doing both, but the user probably considers it a read operation.