Exercises
Suggested by @matthughes. Let's use this issue to catalog some fs2 (or cats-effect) exercises. We can then organize them and add them to the microsite/guide, along with sample solutions, hints, etc.
Some simple ones to get started:
Q: Create an infinite stream that starts at 0 and increments by 1, wrapping around at Int.MaxValue.
A:
Stream.iterate(0)(_ + 1)
Q: Without using the built-in Stream.iterate, define the following function such that the resulting stream consists of the starting element and then the result of applying each emitted element to the function f:
def iterate[A](start: A)(f: A => A): Stream[Nothing, A]
A:
def iterate[A](start: A)(f: A => A): Stream[Nothing, A] =
Stream.emit(start) ++ iterate(f(start))(f)
Add some commentary on ++ being lazy on RHS and talk about recursion being a key technique for constructing streams/pulls.
(note: I think we can get a bunch of exercises from existing high level operations that use lower level stuff in interesting ways)
Scenario: I have a program in IOApp that needs to consume from 2 or more independent streams (maybe from Kafka or SQS or something). They are not coupled, and don't rely on each other. Maybe one does a transformation and emits a new stream. The other saves to a DB or something.
- The difference between
s1.concurrently(s2)orStream(s1, s2).parJoinUnboundedand when to use either/or.
Notes:
areas that should be covered: parJoin, concurrently + state (and hold), metered, writing a take Pull, interruptWhen, unNone, stepLeg
Port exercises from pipes-concurrency
I think the topic of splitting a stream is worth covering. There are several solutions on Stack Overflow. I'm unable to say which one is the best. Eg. Splitting the fs2 stream output to two files
(note that I've just left a comment below the accepted answer, that the .spawn is probably a mistake) or How do I “split” a stream in fs2?
Should we target adding these to https://www.scala-exercises.org/? Some of it is a little rote but I like the ability to have explanatory text as you build up the exercises.
Are we targeting 2.x or 3.0 API? Maybe doesn't matter much for these exercises.
I liked a lot solving this one
Q: merge part of mergeSort. Combine two ordered ascending Int streams to a single ordered ascending Int stream.
A:
import fs2.Chunk
import fs2.Pull
import fs2.Stream
import fs2.Stream.StepLeg
import cats.syntax.option._
import cats.syntax.apply._
import fs2.Pure
import scala.annotation.tailrec
final class fs2sample[F[_]] {
def mergeSort(s1: Stream[F, Int], s2: Stream[F, Int]): Stream[F, Int] = {
val pull =
for {
p1 <- s1.pull.stepLeg
p2 <- s2.pull.stepLeg
_ <- combine(p1, p2)
} yield {}
pull.stream
}
@tailrec
private def mergeLists(acc: List[Int], l1: List[Int], l2: List[Int]): (List[Int], List[Int], List[Int]) =
(l1, l2) match {
case (h1 :: tl1, h2 :: tl2) =>
if (h1 < h2) mergeLists(h1 :: acc, tl1, h2 :: tl2)
else mergeLists(h2 :: acc, h1 :: tl1, tl2)
case _ => (acc.reverse, l1, l2)
}
private def combine(legOpt1: Option[StepLeg[F, Int]], legOpt2: Option[StepLeg[F, Int]]): Pull[F, Int, Unit] =
(legOpt1, legOpt2) match {
case (Some(leg1), Some(leg2)) =>
val (merged, head1, head2) = mergeLists(List.empty, leg1.head.toList, leg2.head.toList)
val nextLeg1 = if (head1.isEmpty) leg1.stepLeg else Pull.pure(leg1.setHead(Chunk.seq(head1)).some)
val nextLeg2 = if (head2.isEmpty) leg2.stepLeg else Pull.pure(leg2.setHead(Chunk.seq(head2)).some)
Pull.output[F, Int](Chunk.seq(merged)) *>
(nextLeg1, nextLeg2).tupled.flatMap { case (l1, l2) => combine(l1, l2) }
case (Some(leg1), None) => Pull.output[F, Int](leg1.head) *> leg1.stream.pull.echo
case (None, Some(leg2)) => Pull.output[F, Int](leg2.head) *> leg2.stream.pull.echo
case (None, None) => Pull.done
}
}
object fs2sample {
def test() = {
val sample = new fs2sample[Pure]
val s1 = Stream.chunk(Chunk(1)) ++ Stream.chunk(Chunk(3, 5)) ++ Stream.chunk(Chunk(7,9))
val s2 = Stream.chunk(Chunk(2,4,6)) ++ Stream.chunk(Chunk(8, 10))
sample.mergeSort(s1, s2).compile.toList
}
}