Fumble icon indicating copy to clipboard operation
Fumble copied to clipboard

Stream results using AsyncSeq / TaskSeq?

Open njlr opened this issue 2 months ago • 0 comments

It would be useful to receive query results as an async pull sequence.

My attempt:

#r "nuget: FSharp.Control.TaskSeq, 0.4.0"
#r "nuget: Fumble, 0.8.3"

open FSharp.Control
open Fumble

[<RequireQualifiedAccess>]
module Sqlite =

  open System.Threading.Channels
  open System.Threading.Tasks

  let streamAsync (read : SqliteRowReader -> 'a) (db : Sqlite.SqlProps) : TaskSeq<'a> =
    taskSeq {
      let channel =
        let opts = BoundedChannelOptions(1024)
        opts.SingleWriter <- true
        opts.SingleReader <- true
        Channel.CreateBounded(opts)

      let fillChannel =
        let writer = channel.Writer
        Task.Run<unit>(
          fun () ->
            task {
              try
                let! outcome =
                  db
                  |> Sqlite.iterAsync
                    (fun r ->
                      let a = read r
                      if not (writer.TryWrite(a)) then
                        writer.WriteAsync(a).AsTask()
                        |> Async.AwaitTask
                        |> Async.RunSynchronously) // Is there a better way?

                match outcome with
                | Ok () ->
                  writer.Complete()
                | Error exn ->
                  writer.Complete(exn)
              with exn ->
                writer.Complete(exn)

                return raise exn
            })

      yield! channel.Reader.ReadAllAsync()

      do! fillChannel
    }

Is this a better way?

Perhaps this should be added?

njlr avatar Oct 24 '25 15:10 njlr