LazyConnectionDataSourceProxy support in R2DBC
I would like to route requests to either the master or slave database based on the readOnly attribute of the @Transactional annotation, similar to how it's done in Spring MVC.
However, in R2DBC, there's no component equivalent to LazyConnectionDataSourceProxy.
Because of this, the transaction is not yet initialized at the time TransactionSynchronizationManager is queried, so we cannot determine whether the transaction is read-only.
Is there a specific reason why a LazyConnectionDataSourceProxy-like mechanism hasn't been defined for R2DBC?
I'd love to hear your thoughts on this. Thanks for all the great work on the project!
The approach you mentioned in https://github.com/spring-projects/spring-data-relational/issues/1261#issuecomment-1621167552 doesn't fit our use case. This is because the same read query could be executed against both the master and the slave, which we > want to avoid.
MVC example
core component: AbstractRoutingDataSource, LazyConnectionDataSourceProxy
@Bean
public DataSource dataSource(
@Qualifier("writableDataSource") DataSource writableDataSource,
@Qualifier("readonlyDataSource") DataSource readonlyDataSource
) {
final ReadWriteRoutingDataSource routingDataSource = new ReadWriteRoutingDataSource();
Map<Object, Object> dataSourceMap = new HashMap<>();
dataSourceMap.put(DataSourceType.WRITABLE, writableDataSource);
dataSourceMap.put(DataSourceType.READONLY, readonlyDataSource);
routingDataSource.setTargetDataSources(dataSourceMap);
routingDataSource.setDefaultTargetDataSource(writableDataSource);
routingDataSource.afterPropertiesSet();
return new LazyConnectionDataSourceProxy(routingDataSource);
}
public class ReadWriteRoutingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return TransactionSynchronizationManager.isCurrentTransactionReadOnly()
? DataSourceType.READONLY
: DataSourceType.WRITABLE;
}
}
Have you been able to take a look at AbstractRoutingConnectionFactory that has routing functionality towards different ConnectionFactorys?
@mp911de
Sure, I’ve considered AbstractRoutingConnectionFactory, as you suggested in this comment.
While AbstractRoutingConnectionFactory can certainly serve as a replacement for AbstractRoutingDataSource, it doesn’t replace the role of LazyConnectionDataSourceProxy.
As I mentioned earlier, the key issue is that AbstractRoutingConnectionFactory has no way of accessing the readOnly attribute of the transaction, since the transaction hasn’t been initialized yet at that point.
If I’ve misunderstood anything or missed an important point, I’d really appreciate your guidance.
Here's how I’ve been using it on the Spring MVC side:
plugins {
kotlin("jvm") version "1.9.25"
kotlin("plugin.spring") version "1.9.25"
id("org.springframework.boot") version "3.4.4"
id("io.spring.dependency-management") version "1.1.7"
}
group = "juro"
version = "0.0.1-SNAPSHOT"
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("mysql:mysql-connector-java:8.0.33")
testImplementation("org.testcontainers:mysql:1.19.8")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.boot:spring-boot-testcontainers")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
}
kotlin {
compilerOptions {
freeCompilerArgs.addAll("-Xjsr305=strict")
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import org.assertj.core.api.Assertions.assertThat
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.jdbc.core.JdbcTemplate
import org.springframework.jdbc.datasource.DataSourceTransactionManager
import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource
import org.springframework.stereotype.Repository
import org.springframework.stereotype.Service
import org.springframework.test.context.DynamicPropertyRegistry
import org.springframework.test.context.DynamicPropertySource
import org.springframework.transaction.PlatformTransactionManager
import org.springframework.transaction.annotation.Transactional
import org.springframework.transaction.support.TransactionSynchronizationManager
import org.testcontainers.containers.MySQLContainer
import java.sql.DriverManager
import javax.sql.DataSource
import kotlin.test.Test
@SpringBootTest
@Import(value = [DbConfig::class, RoutingService::class, RoutingRepository::class])
class RoutingDBTest {
@Autowired
private lateinit var routingService: RoutingService
@Test
fun shouldReturnWritableValueFromMasterDatabase() {
// given
executeInitSql(master, "CREATE TABLE test_table (id INT PRIMARY KEY, name VARCHAR(255));")
executeInitSql(master, "INSERT INTO test_table VALUES (1, 'Master');")
// when
val writeOnly = routingService.writable()
// then
assertThat(writeOnly).isNotNull()
assertThat(writeOnly).isEqualTo("Master")
}
@Test
fun shouldReturnReadonlyValueFromSlaveDatabase() {
// given
executeInitSql(slave, "CREATE TABLE test_table (id INT PRIMARY KEY, name VARCHAR(255));")
executeInitSql(slave, "INSERT INTO test_table VALUES (1, 'Slave');")
// when
val readOnly = routingService.readOnly()
// then
assertThat(readOnly).isNotNull()
assertThat(readOnly).isEqualTo("Slave")
}
private fun executeInitSql(container: MySQLContainer<*>, sql: String) {
DriverManager.getConnection(container.jdbcUrl, container.username, container.password).use { conn ->
conn.createStatement().use { stmt ->
stmt.execute(sql)
}
}
}
companion object {
const val DB_NAME = "test_db"
const val USER_NAME = "root"
const val PASSWORD = "test_password"
val master: MySQLContainer<*> = MySQLContainer("mysql:8.0.33").apply {
withDatabaseName(DB_NAME)
withUsername(USER_NAME)
withPassword(PASSWORD)
.run { start() }
}
val slave: MySQLContainer<*> = MySQLContainer("mysql:8.0.33").apply {
withDatabaseName(DB_NAME)
withUsername(USER_NAME)
withPassword(PASSWORD)
.run { start() }
}
@JvmStatic
@DynamicPropertySource
fun registerProperties(registry: DynamicPropertyRegistry) {
registry.add("mysql.master.jdbc-url") { master.jdbcUrl }
registry.add("mysql.master.username") { master.username }
registry.add("mysql.master.password") { master.password }
registry.add("mysql.master.driver-class-name") { "com.mysql.cj.jdbc.Driver" }
registry.add("mysql.slave.jdbc-url") { slave.jdbcUrl }
registry.add("mysql.slave.username") { slave.username }
registry.add("mysql.slave.password") { slave.password }
registry.add("mysql.slave.driver-class-name") { "com.mysql.cj.jdbc.Driver" }
}
}
}
@Service
class RoutingService(
private val routingRepository: RoutingRepository
) {
@Transactional(readOnly = true)
fun readOnly(): String? {
return routingRepository.getName()
}
@Transactional
fun writable(): String? {
return routingRepository.getName()
}
}
@Repository
class RoutingRepository(
private val routingJdbcTemplate: JdbcTemplate,
) {
fun getName(): String? {
return routingJdbcTemplate.queryForObject(
"SELECT name FROM test_table WHERE id = ?",
String::class.java,
1
)
}
}
@Configuration
class DbConfig {
enum class DataSourceType {
WRITABLE, READONLY
}
@Bean
fun transactionManager(routingDataSource: DataSource): PlatformTransactionManager {
return DataSourceTransactionManager(routingDataSource)
}
@Bean
fun routingJdbcTemplate(routingDataSource: DataSource): JdbcTemplate {
return JdbcTemplate(routingDataSource)
}
@Bean
fun routingDataSource(
@Qualifier("writableDataSource") writableDataSource: DataSource,
@Qualifier("readonlyDataSource") readonlyDataSource: DataSource
): DataSource {
val dataSourceMap: MutableMap<Any, Any> = HashMap()
dataSourceMap[DataSourceType.WRITABLE] = writableDataSource
dataSourceMap[DataSourceType.READONLY] = readonlyDataSource
val routingDataSource = ReadWriteRoutingDataSource()
routingDataSource.setTargetDataSources(dataSourceMap)
routingDataSource.setDefaultTargetDataSource(writableDataSource)
routingDataSource.afterPropertiesSet()
return LazyConnectionDataSourceProxy(routingDataSource)
}
@Bean
fun readonlyDataSource(hikariSlaveConfig: HikariConfig): DataSource {
return HikariDataSource(hikariSlaveConfig)
}
@Bean
@ConfigurationProperties("mysql.slave")
fun hikariSlaveConfig(): HikariConfig {
return HikariConfig()
}
@Bean
fun writableDataSource(hikariMasterConfig: HikariConfig): DataSource {
return HikariDataSource(hikariMasterConfig)
}
@Bean
@ConfigurationProperties("mysql.master")
fun hikariMasterConfig(): HikariConfig {
return HikariConfig()
}
class ReadWriteRoutingDataSource : AbstractRoutingDataSource() {
override fun determineCurrentLookupKey(): Any {
return if (TransactionSynchronizationManager.isCurrentTransactionReadOnly())
DataSourceType.READONLY
else
DataSourceType.WRITABLE
}
}
}