fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Exercises

Open mpilquist opened this issue 5 years ago • 6 comments

Suggested by @matthughes. Let's use this issue to catalog some fs2 (or cats-effect) exercises. We can then organize them and add them to the microsite/guide, along with sample solutions, hints, etc.

mpilquist avatar Dec 01 '20 22:12 mpilquist

Some simple ones to get started:

Q: Create an infinite stream that starts at 0 and increments by 1, wrapping around at Int.MaxValue. A:

Stream.iterate(0)(_ + 1)

Q: Without using the built-in Stream.iterate, define the following function such that the resulting stream consists of the starting element and then the result of applying each emitted element to the function f:

def iterate[A](start: A)(f: A => A): Stream[Nothing, A]

A:

def iterate[A](start: A)(f: A => A): Stream[Nothing, A] =
  Stream.emit(start) ++ iterate(f(start))(f)

Add some commentary on ++ being lazy on RHS and talk about recursion being a key technique for constructing streams/pulls.

(note: I think we can get a bunch of exercises from existing high level operations that use lower level stuff in interesting ways)

mpilquist avatar Dec 02 '20 02:12 mpilquist

Scenario: I have a program in IOApp that needs to consume from 2 or more independent streams (maybe from Kafka or SQS or something). They are not coupled, and don't rely on each other. Maybe one does a transformation and emits a new stream. The other saves to a DB or something.

  • The difference between s1.concurrently(s2) or Stream(s1, s2).parJoinUnbounded and when to use either/or.

drewboardman avatar Dec 03 '20 16:12 drewboardman

Notes:

areas that should be covered: parJoin, concurrently + state (and hold), metered, writing a take Pull, interruptWhen, unNone, stepLeg

Port exercises from pipes-concurrency

SystemFw avatar Dec 03 '20 17:12 SystemFw

I think the topic of splitting a stream is worth covering. There are several solutions on Stack Overflow. I'm unable to say which one is the best. Eg. Splitting the fs2 stream output to two files (note that I've just left a comment below the accepted answer, that the .spawn is probably a mistake) or How do I “split” a stream in fs2?

DLakomy avatar Dec 03 '20 19:12 DLakomy

Should we target adding these to https://www.scala-exercises.org/? Some of it is a little rote but I like the ability to have explanatory text as you build up the exercises.

Are we targeting 2.x or 3.0 API? Maybe doesn't matter much for these exercises.

matthughes avatar Dec 05 '20 19:12 matthughes

I liked a lot solving this one

Q: merge part of mergeSort. Combine two ordered ascending Int streams to a single ordered ascending Int stream.

A:


import fs2.Chunk
import fs2.Pull
import fs2.Stream
import fs2.Stream.StepLeg
import cats.syntax.option._
import cats.syntax.apply._
import fs2.Pure

import scala.annotation.tailrec

final class fs2sample[F[_]] {

  def mergeSort(s1: Stream[F, Int], s2: Stream[F, Int]): Stream[F, Int] = {
    val pull =
      for {
        p1 <- s1.pull.stepLeg
        p2 <- s2.pull.stepLeg
        _ <- combine(p1, p2)
      } yield {}

    pull.stream
  }

  @tailrec
  private def mergeLists(acc: List[Int], l1: List[Int], l2: List[Int]): (List[Int], List[Int], List[Int]) =
    (l1, l2) match {
      case (h1 :: tl1, h2 :: tl2) =>
        if (h1 < h2) mergeLists(h1 :: acc, tl1, h2 :: tl2)
        else mergeLists(h2 :: acc, h1 :: tl1, tl2)
      case _ => (acc.reverse, l1, l2)
    }

  private def combine(legOpt1: Option[StepLeg[F, Int]], legOpt2: Option[StepLeg[F, Int]]): Pull[F, Int, Unit] =
    (legOpt1, legOpt2) match {
      case (Some(leg1), Some(leg2)) =>
        val (merged, head1, head2) = mergeLists(List.empty, leg1.head.toList, leg2.head.toList)

        val nextLeg1 = if (head1.isEmpty) leg1.stepLeg else Pull.pure(leg1.setHead(Chunk.seq(head1)).some)
        val nextLeg2 = if (head2.isEmpty) leg2.stepLeg else Pull.pure(leg2.setHead(Chunk.seq(head2)).some)

        Pull.output[F, Int](Chunk.seq(merged)) *>
          (nextLeg1, nextLeg2).tupled.flatMap { case (l1, l2) => combine(l1, l2) }
      case (Some(leg1), None) => Pull.output[F, Int](leg1.head) *> leg1.stream.pull.echo
      case (None, Some(leg2)) =>  Pull.output[F, Int](leg2.head) *> leg2.stream.pull.echo
      case (None, None) => Pull.done
    }
}

object fs2sample {

  def test() = {
    val sample = new fs2sample[Pure]

    val s1 = Stream.chunk(Chunk(1)) ++ Stream.chunk(Chunk(3, 5)) ++ Stream.chunk(Chunk(7,9))
    val s2 = Stream.chunk(Chunk(2,4,6)) ++ Stream.chunk(Chunk(8, 10))

    sample.mergeSort(s1, s2).compile.toList
  }
}

nikiforo avatar Dec 07 '20 21:12 nikiforo