roboquant icon indicating copy to clipboard operation
roboquant copied to clipboard

ParallelStrategy fails to run within Optimizer with a large SearchSpace

Open vstoyanov opened this issue 9 months ago • 1 comments

There seems to be a slight issue when running a ParallelStrategy within optimiser. Of course with CombinedStrategy everything is fine. I have a hinch the reason might be runBlocking {}, but have yet to verify.

package eu.vstoyanov.example

import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout
import org.roboquant.Roboquant
import org.roboquant.backtest.CAGR
import org.roboquant.backtest.Optimizer
import org.roboquant.backtest.RandomSearch
import org.roboquant.binance.BinanceHistoricFeed
import org.roboquant.brokers.sim.SimBroker
import org.roboquant.common.Timeframe
import org.roboquant.common.USDT
import org.roboquant.common.Wallet
import org.roboquant.loggers.SilentLogger
import org.roboquant.metrics.PNLMetric
import org.roboquant.strategies.CombinedStrategy
import org.roboquant.strategies.ParallelStrategy
import org.roboquant.strategies.Strategy
import org.roboquant.ta.TaLibStrategy
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.Duration.Companion.seconds

/**
 * This example demonstrates the deadlock that can occur when using ParallelStrategy 
 * with BinanceHistoricFeed in an Optimizer context.
 */
object ParallelStrategyDeadlockDemo {

    /**
     * Run the demonstration with both strategy types
     */
    fun run() {
        println("=== ParallelStrategy vs CombinedStrategy Deadlock Demonstration ===")
        
        // Test with CombinedStrategy (should work fine)
        testWithBinanceFeed(useParallelStrategy = false)
        
        // Test with ParallelStrategy (may deadlock)
        testWithBinanceFeed(useParallelStrategy = true)
    }
    
    /**
     * Test with BinanceHistoricFeed using either CombinedStrategy or ParallelStrategy
     */
    private fun testWithBinanceFeed(useParallelStrategy: Boolean, numRuns: Int = 1500, timeoutSeconds: Long = 30) {
        println("Testing with ${if (useParallelStrategy) "ParallelStrategy" else "CombinedStrategy"}, $numRuns runs")
        
        // Create BinanceHistoricFeed with 3 months of data for BTC and ETH
        val symbols = listOf("BTCUSDT", "ETHUSDT")
        val end = Instant.now()
        val start = end.minus(90, ChronoUnit.DAYS)
        val timeframe = Timeframe(start, end)
        
        println("Creating BinanceHistoricFeed for ${symbols.size} symbols")
        val feed = BinanceHistoricFeed()
        feed.retrieve(*symbols.toTypedArray(), timeframe = timeframe)
        
        // Define parameter space
        val space = RandomSearch(numRuns)
        space.add("fastPeriod", 5..50 step 5)
        space.add("slowPeriod", 10..100 step 10)
        space.add("bbPeriod", 10..30 step 5)
        
        // Track runs for diagnostic
        val runCounter = AtomicInteger(0)
        val completedCounter = AtomicInteger(0)
        
        // Create optimizer
        val opt = Optimizer(space, CAGR()) { params ->
            val runId = runCounter.incrementAndGet()
            val fastPeriod = params.getInt("fastPeriod")
            val slowPeriod = params.getInt("slowPeriod")
            val bbPeriod = params.getInt("bbPeriod")
            
            // Create two simple strategies
            val emaStrategy = TaLibStrategy().apply {
                buy {
                    val fast = ema(it.close, fastPeriod)
                    val slow = ema(it.close, slowPeriod)
                    fast > slow
                }
                
                sell {
                    val fast = ema(it.close, fastPeriod)
                    val slow = ema(it.close, slowPeriod)
                    fast < slow
                }
            }
            
            val bbStrategy = TaLibStrategy().apply {
                buy {
                    val (upper, mid, lower) = bbands(it.close, bbPeriod, 2.0)
                    it.close.last() > lower && it.close.size > 1 && it.close[it.close.size - 2] <= lower
                }
                
                sell {
                    val (upper, mid, lower) = bbands(it.close, bbPeriod, 2.0)
                    it.close.last() < upper && it.close.size > 1 && it.close[it.close.size - 2] >= upper
                }
            }
            
            // Use either parallel or combined strategy based on parameter
            val strategy: Strategy = if (useParallelStrategy) {
                ParallelStrategy(emaStrategy, bbStrategy)
            } else {
                CombinedStrategy(emaStrategy, bbStrategy)
            }

            Roboquant(
                strategy = strategy,
                metrics = listOf(PNLMetric()),
                broker = SimBroker(initialDeposit = Wallet(100_000.0.USDT)),
                logger = SilentLogger()
            ).also {
                completedCounter.incrementAndGet()
            }
        }
        
        // Run with timeout to prevent infinite wait if deadlock occurs
        try {
            println("Starting optimization with timeout of $timeoutSeconds seconds...")
            
                val result = opt.train(feed, timeframe)
                println("Optimization completed successfully with ${result.size} results!")
            
            println("Test completed successfully. All ${completedCounter.get()} runs completed.")
        } catch (e: TimeoutCancellationException) {
            println("TIMEOUT REACHED after $timeoutSeconds seconds!")
            println("Only ${completedCounter.get()} of ${runCounter.get()} runs completed")
            println("This indicates a deadlock when using ParallelStrategy with Optimizer")
        } catch (e: Exception) {
            println("Exception during optimization: ${e.message}")
        }
        
        println("Test finished. ${if (useParallelStrategy) "ParallelStrategy" else "CombinedStrategy"} " +
                "completed ${completedCounter.get()}/${runCounter.get()} runs")
        println("---------------------------------------")
    }
}

/**
 * Run the demonstration
 */
fun main() {
    ParallelStrategyDeadlockDemo.run()
}

vstoyanov avatar Apr 16 '25 20:04 vstoyanov

In the 3.0 snapshot releases, the way parallel works has changed. When I checked similar code, things are not failing when I checked it (although for now the Binance Feed has not been ported to 3.0 so had to use another feed).

jbaron avatar May 02 '25 17:05 jbaron