kantan.csv
kantan.csv copied to clipboard
FS2 streams integration
It would be nice to have integration with FS2 streams or at least some low level API to help implementing this.
I was under the impression that the iterator-like CsvReader
was basically all you needed and plugging that into a stream was essentially a one liner. Was that incorrect?
I have a similar use case, where I get an fs2 stream (or ZIO stream for that matter) and want to transform it into CSV. The iterator in CsvReader
does not encode effects, so I cannot use it. For example:
val readFromDB[A]: Stream[IO, A] = ???
object csv {
import fs2.{Pipe, Stream}
import kantan.csv.HeaderEncoder
import kantan.csv._
import kantan.csv.ops._
def streamCsv[F[_], A: HeaderEncoder]: Pipe[F, A, String] = s =>
implicitly[HeaderEncoder[A]].header.fold[Stream[F, String]](Stream.empty)(h => Stream.emit(h.mkString(",") + "\r\n")) ++
s.chunks
.map { chunk =>
chunk.toList.asCsv(rfc)
}
}
val webServerResponse[A: HeaderEncoder]: Stream[IO, Byte] = readFromDB.through(streamCsv)