Streaming does not work for MySQL
I wrote the following code
val xs = Transactor.fromDriverManager[IO](
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://myserver:3306/mydb?useLegacyDatetimeCode=false&nullNamePatternMatchesAll=true&serverTimezone=America/Chicago&useSSL=false",
"user",
"pass")
sql"select id from foo"
.query[Long]
.stream
.take(10)
.compile
.toList
.transact(xs)
.unsafeRunSync
.foreach(println)
Here the table foo has 100 million records. when I run this code I get an error
java.lang.OutOfMemoryError: Java heap space
So even though I am using Streaming, the program is trying to load all the IDs into memory and that is why it is getting a out of memory exception.
In the past, I tried to do Streaming of query results using Slick and I faced similar issue there as well. It turned out that in order to enable Streaming on MySQL. we need a special setup. This was documented here
I want to know what is the right syntax in Doobie so that we can stream results from MySQL as opposed to loading the entire resultset in memory.
I have posted this problem here as well https://stackoverflow.com/questions/50008051/doobie-streaming-not-working-for-mysql
Give this a try. It's faster but still pretty slow, and it's sensitive to the LIMIT clause so I don't think it's quite working. I followed all the hints I could find.
import cats.effect.{ Async, IO }
import doobie._, doobie.implicits._
import java.sql.ResultSet
import scala.Predef._
object MySqlStreaming {
// an interpreter that sets up queries for postgres streaming
object Interp extends KleisliInterpreter[IO] {
val M = implicitly[Async[IO]]
override lazy val ConnectionInterpreter =
new ConnectionInterpreter {
override def prepareStatement(sql: String) = primitive { c =>
println("*** prepareStatement")
val ps = c.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
ps.unwrap(classOf[com.mysql.cj.jdbc.StatementImpl]).enableStreamingResults()
ps
}
}
override lazy val PreparedStatementInterpreter =
new PreparedStatementInterpreter {
override def setFetchSize(n: Int) = primitive { ps =>
println("*** setFetchSize")
ps.setFetchSize(Int.MinValue) // ignore what was requested
}
}
}
val baseXa = Transactor.fromDriverManager[IO](
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://localhost:3306/world?useSSL=false&serverTimezone=America/Chicago",
"root", ""
)
// A transactor that uses our interpreter above
val xa: Transactor[IO] =
Transactor.interpret.set(baseXa, Interp.ConnectionInterpreter)
def prog: IO[List[(String, String)]] =
sql"SELECT c1.xname, c2.xname, c3.xname FROM city c1, city c2, city c3 limit 10000000"
.query[(String, String)]
.stream
.take(10)
.compile
.toList
.transact(xa)
def main(args: Array[String]): Unit = {
import System.{ currentTimeMillis => now }
val t0 = now
prog.unsafeRunSync.foreach(println)
println(s"Elapsed: ${now - t0} ms.")
}
}
WIP at https://github.com/tpolecat/doobie/tree/issue/715
Hi Rob, has anything changed since may? Seems like I have encountered the same issue, maybe I can help some way?
WIP at https://github.com/tpolecat/doobie/tree/issue/715
Curious, this branch doesn't exist. Anything I can help with? A client is semi-blocked (HA!) on this.
Hello, wondering if somebody is looking into this?
Hi, I'm also interested to know if this will be supported
PRs welcome if you know how to fix it, but I'm not working on it.