fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Provide StreamWithContext variant

Open r-glyde opened this issue 2 years ago • 2 comments

Kind of just gauging if there would be interest in this. We've found the approach useful before in akka streams so not sure if others would find it useful here or not.

The idea would be to define something like a StreamWithContext[F[_], O, Ctx] where most operations only work on the O and the Ctx is propagated through automatically.

Rough example of how this could be used with something like fs2-kafka:

val consumerSettings: ConsumerSettings[IO, String, String] = ???

def process(id: Int): IO[Unit] = ???

// StreamWithContext
KafkaConsumer.stream(consumerSettings)
  .subscribeTo("topic")
  .records
  .asStreamWithContext(cr => (cr.record.value, cr.offset))
  .map(_.length)
  .mapAsync(10)(process)
  .asStream { case (_, ack) => ack }
  .through(commitBatchWithin(500, 5.seconds))
  .compile.drain

// Stream
KafkaConsumer.stream(consumerSettings)
  .subscribeTo("topic")
  .records
  .map(cr => (cr.record.value, cr.offset))
  .map { case (str, ctx) => (str.length, ctx) }
  .mapAsync(10) { case (i, ctx) => process(i).map(_ => ctx) }
  .through(commitBatchWithin(500, 5.seconds))
  .compile.drain

So functions to convert between Stream and StreamWithContext and operators like map and mapAsync operate just as O => O2 or O => F[O2].

Happy to have a look if any interest but no worries if not - probably the sort of thing that would be easy to do externally to the core library as well.

r-glyde avatar Sep 08 '22 16:09 r-glyde

So with cats.data.Nested you can already do this for .map at least. I wonder if there's a way we can extend this to work for other combinators such as .mapAsync.

//> using lib "co.fs2::fs2-core::3.2.14"
//> using options "-Ykind-projector"

import cats.data._
import cats.effect._
import cats.syntax.all._
import fs2._

trait Ctx
type WithCtx[+A] = (Ctx, A)

def intStream: Stream[IO, WithCtx[Int]] = ???

def strStream: Stream[IO, WithCtx[String]] =
  Nested(intStream).map(_ * 2).map(_.toString).value

armanbilge avatar Sep 08 '22 17:09 armanbilge

You could also use Kleisli as your F:

  final case class Context(
    attribute: String
  )
  def myContextFunction[F[_]](implicit F: Async[F]) = {
    val lift = Kleisli.liftK[F, Context]

    val dataSource: fs2.Stream[F, String] = fs2.Stream.eval(F.delay("data"))

    def programThatNeedsContext(data: String, ctx: Context): F[Unit] = ???

    dataSource
      .translate(lift)
      .evalMap(data => Kleisli(ctx => programThatNeedsContext(data, ctx)))
      .compile
      .drain
      .run(Context("attribute"))
  }

If you're looking for a more through solution, consider instead using a tagless algebra (other libraries do this, such as https://github.com/tpolecat/natchez).

  import cats._
  final case class Context(
    attribute: String
  )
  trait ContextAlg[F[_]] {
    def context: F[Context]
  }

  def myContextFunction2[F[_]](implicit F: Async[F], C: ContextAlg[F]) = {
    val dataSource: fs2.Stream[F, String] = fs2.Stream.eval(F.delay("data"))

    def programThatNeedsContext(data: String, ctx: Context): F[Unit] = ???

    dataSource
      .evalMap(data => C.context.map(ctx => programThatNeedsContext(data, ctx)))
      .compile
      .drain
  }

  def main[F[_]: Async] = {
    myContextFunction2[Kleisli[F, Context, *]]
  }

With the algebra implementation for Kleisli.

  implicit def algForKleisli[F[_]: Applicative]: ContextAlg[Kleisli[F, Context, *]] =
    new ContextAlg[Kleisli[F, Context, *]] {
      def context: Kleisli[F, Context, Context] =
        Kleisli.ask[F, Context]
    }

Note: If you use IO you could consider using IOLocal. https://github.com/typelevel/cats-effect/blob/79a5462a2fe5b13f91feeedf659a2e8c54577cc5/core/shared/src/main/scala/cats/effect/IOLocal.scala Note 2: If just need to ask for the context, cats-mtl provides an algebra named Ask for this. Note 3: If you also need to update the context, consider the following:

  trait ContextAlg[F[_]] {
    def context: F[Context]

    def updated(f: Context => Context): F ~> F

    def modify[A](f: Context => Context)(fa: F[A]): F[A]
  }

  implicit def algForKleisli[F[_]: Applicative]: ContextAlg[Kleisli[F, Context, *]] =
    new ContextAlg[Kleisli[F, Context, *]] {
      def context: Kleisli[F, Context, Context] =
        Kleisli.ask[F, Context]

      def updated(f: Context => Context): Kleisli[F, Context, *] ~> Kleisli[F, Context, *] =
        new (Kleisli[F, Context, *] ~> Kleisli[F, Context, *]) {
          def apply[A](fa: Kleisli[F, Context, A]): Kleisli[F, Context, A] =
            Kleisli(ctx => fa.run(f(ctx)))
        }

      def modify[A](f: Context => Context)(fa: Kleisli[F, Context, A]): Kleisli[F, Context, A] =
        updated(f)(fa)
    }

ValdemarGr avatar Sep 12 '22 13:09 ValdemarGr

Thanks for the suggestions will give them a try. Closing the issue as it seems like having a built in solution for this probably wouldn't be very useful to most.

r-glyde avatar Oct 13 '22 00:10 r-glyde