[SPARK-46937][SQL] Improve concurrency performance for FunctionRegistry
What changes were proposed in this pull request?
This PR propose to improve concurrency performance for FunctionRegistry.
Why are the changes needed?
Currently, SimpleFunctionRegistryBase adopted the mutable.Map caching function infos. The SimpleFunctionRegistryBase guarded by this so as ensure security under multithreading.
Because all the mutable state are related to functionBuilders, we can delegate security to ConcurrentHashMap.
ConcurrentHashMap has higher concurrency activity and responsiveness.
After this change, FunctionRegistry have better perf than before.
Does this PR introduce any user-facing change?
'No'.
How was this patch tested?
GA. The benchmark test.
object FunctionRegistryBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("FunctionRegistry") {
val iters = 1000000
val threadNum = 4
val functionRegistry = FunctionRegistry.builtin
val names = FunctionRegistry.expressions.keys.toSeq
val barrier = new CyclicBarrier(threadNum + 1)
val threadPool = ThreadUtils.newDaemonFixedThreadPool(threadNum, "test-function-registry")
val benchmark = new Benchmark("SimpleFunctionRegistry", iters, output = output)
benchmark.addCase("only read") { _ =>
for (_ <- 1 to threadNum) {
threadPool.execute(new Runnable {
val random = new Random()
override def run(): Unit = {
barrier.await()
for (_ <- 1 to iters) {
val name = names(random.nextInt(names.size))
val fun = functionRegistry.lookupFunction(new FunctionIdentifier(name))
assert(fun.map(_.getName).get == name)
functionRegistry.listFunction()
}
barrier.await()
}
})
}
barrier.await()
barrier.await()
}
benchmark.run()
}
}
}
The benchmark output before this PR.
Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-5350U CPU @ 1.80GHz
SimpleFunctionRegistry: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
only read 54858 55043 261 0.0 54858.1 1.0X
The benchmark output after this PR.
Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-5350U CPU @ 1.80GHz
SimpleFunctionRegistry: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
only read 20202 20264 88 0.0 20202.1 1.0X
Was this patch authored or co-authored using generative AI tooling?
'No'.
ping @cloud-fan cc @MaxGekk @viirya
What's the level of concurrency you expect for function registration/lookup? Do you have perf numbers?
What's the level of concurrency you expect for function registration/lookup? Do you have perf numbers?
The perf data has been added into PR description.
Could you resolve the conflicts, @beliefer ?
I resolved the conflicts for you~
Let me rebase again.
Thank you, @beliefer
Merged into master. Thank you @dongjoon-hyun @cloud-fan @viirya
Shall we revert this if https://github.com/apache/spark/pull/44976#discussion_r1630428579 is a real issue? I don't think this is a critical path for performance (how much parallelism do you expect for function lookups in a Spark session?), and synchronizing this seems simpler and good enough.
+1 for https://github.com/apache/spark/pull/44976#issuecomment-2153612894
I've sent out the revert PR: https://github.com/apache/spark/pull/46940