fs2
fs2 copied to clipboard
Provide StreamWithContext variant
Kind of just gauging if there would be interest in this. We've found the approach useful before in akka streams so not sure if others would find it useful here or not.
The idea would be to define something like a StreamWithContext[F[_], O, Ctx]
where most operations only work on the O
and the Ctx
is propagated through automatically.
Rough example of how this could be used with something like fs2-kafka:
val consumerSettings: ConsumerSettings[IO, String, String] = ???
def process(id: Int): IO[Unit] = ???
// StreamWithContext
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.asStreamWithContext(cr => (cr.record.value, cr.offset))
.map(_.length)
.mapAsync(10)(process)
.asStream { case (_, ack) => ack }
.through(commitBatchWithin(500, 5.seconds))
.compile.drain
// Stream
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.map(cr => (cr.record.value, cr.offset))
.map { case (str, ctx) => (str.length, ctx) }
.mapAsync(10) { case (i, ctx) => process(i).map(_ => ctx) }
.through(commitBatchWithin(500, 5.seconds))
.compile.drain
So functions to convert between Stream
and StreamWithContext
and operators like map
and mapAsync
operate just as O => O2
or O => F[O2]
.
Happy to have a look if any interest but no worries if not - probably the sort of thing that would be easy to do externally to the core library as well.
So with cats.data.Nested
you can already do this for .map
at least. I wonder if there's a way we can extend this to work for other combinators such as .mapAsync
.
//> using lib "co.fs2::fs2-core::3.2.14"
//> using options "-Ykind-projector"
import cats.data._
import cats.effect._
import cats.syntax.all._
import fs2._
trait Ctx
type WithCtx[+A] = (Ctx, A)
def intStream: Stream[IO, WithCtx[Int]] = ???
def strStream: Stream[IO, WithCtx[String]] =
Nested(intStream).map(_ * 2).map(_.toString).value
You could also use Kleisli
as your F
:
final case class Context(
attribute: String
)
def myContextFunction[F[_]](implicit F: Async[F]) = {
val lift = Kleisli.liftK[F, Context]
val dataSource: fs2.Stream[F, String] = fs2.Stream.eval(F.delay("data"))
def programThatNeedsContext(data: String, ctx: Context): F[Unit] = ???
dataSource
.translate(lift)
.evalMap(data => Kleisli(ctx => programThatNeedsContext(data, ctx)))
.compile
.drain
.run(Context("attribute"))
}
If you're looking for a more through solution, consider instead using a tagless algebra (other libraries do this, such as https://github.com/tpolecat/natchez).
import cats._
final case class Context(
attribute: String
)
trait ContextAlg[F[_]] {
def context: F[Context]
}
def myContextFunction2[F[_]](implicit F: Async[F], C: ContextAlg[F]) = {
val dataSource: fs2.Stream[F, String] = fs2.Stream.eval(F.delay("data"))
def programThatNeedsContext(data: String, ctx: Context): F[Unit] = ???
dataSource
.evalMap(data => C.context.map(ctx => programThatNeedsContext(data, ctx)))
.compile
.drain
}
def main[F[_]: Async] = {
myContextFunction2[Kleisli[F, Context, *]]
}
With the algebra implementation for Kleisli
.
implicit def algForKleisli[F[_]: Applicative]: ContextAlg[Kleisli[F, Context, *]] =
new ContextAlg[Kleisli[F, Context, *]] {
def context: Kleisli[F, Context, Context] =
Kleisli.ask[F, Context]
}
Note: If you use IO
you could consider using IOLocal
.
https://github.com/typelevel/cats-effect/blob/79a5462a2fe5b13f91feeedf659a2e8c54577cc5/core/shared/src/main/scala/cats/effect/IOLocal.scala
Note 2: If just need to ask
for the context, cats-mtl
provides an algebra named Ask
for this.
Note 3: If you also need to update the context, consider the following:
trait ContextAlg[F[_]] {
def context: F[Context]
def updated(f: Context => Context): F ~> F
def modify[A](f: Context => Context)(fa: F[A]): F[A]
}
implicit def algForKleisli[F[_]: Applicative]: ContextAlg[Kleisli[F, Context, *]] =
new ContextAlg[Kleisli[F, Context, *]] {
def context: Kleisli[F, Context, Context] =
Kleisli.ask[F, Context]
def updated(f: Context => Context): Kleisli[F, Context, *] ~> Kleisli[F, Context, *] =
new (Kleisli[F, Context, *] ~> Kleisli[F, Context, *]) {
def apply[A](fa: Kleisli[F, Context, A]): Kleisli[F, Context, A] =
Kleisli(ctx => fa.run(f(ctx)))
}
def modify[A](f: Context => Context)(fa: Kleisli[F, Context, A]): Kleisli[F, Context, A] =
updated(f)(fa)
}
Thanks for the suggestions will give them a try. Closing the issue as it seems like having a built in solution for this probably wouldn't be very useful to most.