clickhouse-scala-client icon indicating copy to clipboard operation
clickhouse-scala-client copied to clipboard

Possible memory leak in queryWithProgress

Open sjoerdmulder opened this issue 5 years ago • 0 comments

Running the following example, with "-Xmx512m" will cause a java.lang.OutOfMemoryError: Java heap space within a minute:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.crobox.clickhouse.ClickhouseClient
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.concurrent.duration._

object MemoryLeakApp extends App {

  implicit val system = ActorSystem("memory")
  implicit val materializer = ActorMaterializer()
  import system.dispatcher
  val size = 1024 * 1024
  val clickhouseClient = new ClickhouseClient(ConfigFactory.parseString(
    """
      |crobox.clickhouse.client {
      |    connection {
      |        type = "single-host",
      |        host = "localhost",
      |        port = 8123
      |    }
      |}
    """.stripMargin).withFallback(ConfigFactory.defaultReference()))

  def execute(): Unit = {
    val source = Source.fromFutureSource(Future {
      s"SELECT * FROM system.numbers LIMIT $size"
    }.map(query =>
        clickhouseClient.queryWithProgress(query)
    )).mapMaterializedValue(_.flatten)
    Sink.ignore.runWith(source).foreach(response =>
      println("Received response with", response.length)
    )
  }
  system.scheduler.schedule(0.seconds, 1.seconds)({execute()})

}

Somehow using the Source.fromFutureSource with .mapMaterializedValue(_.flatten) causes the issue to appear...

sjoerdmulder avatar Mar 22 '19 15:03 sjoerdmulder