macwire icon indicating copy to clipboard operation
macwire copied to clipboard

Shared singleton for using with Spark

Open aleris opened this issue 3 years ago • 2 comments

When using Spark with external resources like a database, a somehow common pattern is to make the database client shared between tasks so the connection pool is shared. Otherwise, with a large number of tasks/threads, the database connections are exhausted and will lead to issues when scaling. This rises some complications when using such an object, as it must implement some kind of singleton shared between threads that receive serialized objects. Any idea on how to do this with MacWire? Any pattern that can be used?

A simple example:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.must.Matchers.{be, convertToAnyMustWrapper}

class ModuleWithSparkSpec extends AnyFunSpec {
  it("runs module with spark") {
    val parallelism = 4
    val module = new Module {
      override lazy val connectionString: String = ""
      override lazy val sparkConf: SparkConf = new SparkConf().setAppName("Test").setMaster(s"local[$parallelism]")
    }

    module.run(parallelism * 3) must be(parallelism * 3) // prints 4 thread ids and 4 different hash codes for 3 times
  }
}

class Runner(val sparkConf: SparkConf, val database: Database) extends Serializable {
  def run(count: Int): Long = {
    val database = this.database
    val sparkConf = this.sparkConf
    SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
      .sparkContext
      .parallelize(0 until count)
      .map { n => database.insert(n) }
      .count()
  }
}

trait Module extends Serializable {
  def run(count: Int): Long = runner.run(count)

  import com.softwaremill.macwire._
  protected lazy val connectionString: String = ""
  protected lazy val sparkConf: SparkConf = new SparkConf().setAppName("").setMaster("")
  protected lazy val database: Database = wire[Database] // this will be serialized and duplicated 4 times
  protected lazy val runner: Runner = wire[Runner]
}

class Database(connectionString: String) extends Serializable with AutoCloseable {
  def insert(n: Int): Unit = {
    println(s"Insert $n on thread id = ${Thread.currentThread().getId}, instance hash code = ${hashCode()}")
  }
  override def close(): Unit = {}
}

So the idea would be to have something instead of wire, or beside, that would make it use a single instance. I was thinking to implement a shared singleton Scope that picks the instance from a concurrent collection, would this be the best way to do it?

  protected lazy val database: Database = sharedSingleton(wire[Database])

aleris avatar Sep 03 '22 05:09 aleris

I think if you want to share some state between deserialised objects, you'll need some kind of global state, or deserialise a Database => Module function.

If you'd go with the global state, I think it's exactly as you write - you need some kind of cache. and Cache.get("db", wire[Database]), where the second argument is lazily-evaluated and provides the default value is the way to go. But that would be outside the scope of macwire.

But maybe serialising the funciton to create a Module, given a Database would work.

adamw avatar Sep 08 '22 08:09 adamw

Thanks for the response. I played a little an implemented a sharedSingleton that keeps the instances in a TrieMap, however it needs some changes to the way the proxy is created to work with Spark. it needs a proxyFactory.setUseWriteReplace(false) in ProxyCreator.createProxy

https://github.com/softwaremill/macwire/blob/master/proxy/src/main/scala/com/softwaremill/macwire/proxy/ProxyCreator.scala#L11

The rest is quite similar with how the ProxyingScope is implemented.

aleris avatar Sep 08 '22 09:09 aleris