spring-data-mongodb
spring-data-mongodb copied to clipboard
`NoSuchTransaction` in reactive mongodb client when working with transactions
Setup
Spring Boot 3.3.4 with org.springframework.boot:spring-boot-starter-webflux and org.springframework.boot:spring-boot-starter-data-mongodb-reactive. MongoDB version is 6.0.18.
MongoDB Config
Reactive mongo client is configured in configuration to activate transactional feature in mongo templates:
@Configuration
@EnableConfigurationProperties(MongoProperties::class)
class MongoConfiguration(
private val mongoProperties: MongoProperties,
) : AbstractReactiveMongoConfiguration() {
...
@Bean
fun transactionManager(
factory: ReactiveMongoDatabaseFactory?,
properties: MongoProperties,
): ReactiveMongoTransactionManager {
return ReactiveMongoTransactionManager(
factory!!,
TransactionOptions.builder().readPreference(ReadPreference.valueOf(properties.readPreference)).build(),
)
}
...
Docker Setup
For local and integration testing a mongodb is configured using docker compose. The db is configured as single node replica set. The here mentioned init script just runs rs.initiate(...) to register replica set. In the application properties the according connection string is set with mongodb://localhost:27017/?replicaSet=rs0.
services:
mongo:
image: mongo:6.0.18
ports:
- "27017:27017"
volumes:
- ./bin/mongodb-init-replica-set.sh:/docker-entrypoint-initdb.d/mongodb-init-replica-set.sh:ro
command: ["mongod", "--replSet", "rs0", "--bind_ip_all"]
...
Application Code
I have an endpoint PUT /foo which should update multiple entries in a single collection. This update should be transactional. Before updating the entries, all entries are fetched by ids and some validation is done before updating the entries:
// FooController.kt
@RestController
class FooController(private val fooUseCase: FooUseCase) {
...
@Transactional(label = ["mongo:readPreference=PRIMARY"])
@PutMapping(
value = ["/foo"],
consumes = [MediaType.APPLICATION_JSON_VALUE],
)
fun foo(@RequestBody request: RequestDto): Mono<Void> {
return fooUseCase
.process(request)
.doOnError { error ->
logger.error("Failed", error)
}
}
}
// FooUseCase.kt
@Service
class FooUseCase(private val repo: FooRepository, private val factory: FooFactory) {
fun process(request: RequestDto): Mono<Void> {
return repo
.findAllById(request.ids)
.collectList()
.flatMap { entries ->
// Do some checks
repo
.saveAll(entries.map { factory.from(request, it) }
.then()
}
}
}
Integration Tests
To test the transactional behavior I wrote a Spring Boot integration test. I leveraged coroutines to fire 100 requests concurrently against the endpoint using the web test client to make sure there are no side-effects.
@SpringBootTest(
webEnvironment = RANDOM_PORT,
properties = ["server.error.include-stacktrace=always"]
)
class FooIntegrationTest {
@Autowired
lateinit var webTestClient: WebTestClient
@Autowired
lateinit var fooUsecase: FooUseCase
@Autowired
lateinit var repo: FooRepository
// Clean-up in @BeforeEach and @AfterEach
@Test
fun `should rollback`() = runTest {
// 1. Store entries which should be updated in db
// 2. Assert entries are there
// 3. Run db command to set validator rule for specific id to enforce exception on db request without the need to mock something
// 4. Run test using web client:
val responseSpecs = (1..100).map {
async {
webTestClient
.put()
.uri {
it.path("/foo")
}
.body(Mono.just(request), RequestDto::class.java)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.exchange()
.expectStatus().is5xxServerError
.expectBody()
.jsonPath("$.trace").value<String> { stackTrace ->
stackTrace.shouldContain("DataIntegrityViolationException")
stackTrace.shouldNotContain("NoSuchTransaction")
}
}
}
responseSpecs.awaitAll()
// 5. Validate original entries in db are not altered
}
Unfortunately, I see side effects in the transactional behavior. On a random basis there is a MongoTransactionException thrown with NoSuchTransaction instead of the expected DataIntegrityViolationException. Therefore this test fails and I cannot explain why that is. Can anybody help?
What I forgot to mention:
Originally, I had the transactional annotation attached to the service method. This is when I recognized the side effects, so I adjusted that.
Besides the option with the @Transactional annotation I also tried to use the TransactionalOperator and wrapped the according chains in transactionOperator.execute { ... }, but didn't see any difference.
I either see that the transaction has already been aborted which imo means that requests are sharing a transaction:
org.springframework.data.mongodb.MongoTransactionException: Command failed with error 251 (NoSuchTransaction): 'Transaction with { txnNumber: 67 } has been aborted.' on server localhost:27017. The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Transaction with { txnNumber: 67 } has been aborted.", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1728556211, "i": 10}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1728556211, "i": 10}}}
Or I see NoSuchTransaction with the message that the txNumber is not matching which seams like it is not finding the correct transaction in the Reactor context.
Error disappears when I set max connection pool size in the mongo config to 1.
Thank you @fremarti for getting in touch. That's a hard problem to spot - we'll look into it - thanks already for the pool size hint.
@fremarti can you please package things up and share a complete minimal sample (something that we can unzip or git clone, build, and deploy) that reproduces the problem. Thank you!
@christophstrobl we have created a minimal-sample for you. The tests in the IntergrationTest class are flapping due to the before mentioned issue.
- The test named 'should save AnyDocuments transactionally' is green when run seperately, but sometimes red when run together in bulk.
- The transaction rollback test always emits NoSuchTransaction messages in the test execution log.
If you got further questions, please don't hesitate to ask. We would be happy to hearing from you, soon.
We have changed up the sample a little bit to simplify the demo even further. We now have a setup with one test updating different documents in separate requests and transactions. We are still getting NoSuchTransaction errors without any custom configuration.
Kind Regards David
In order to analyze the error, David, Frederik and I have also replaced the Repository pattern with the direct usage of the reactive mongo template. Please see the modified minimal sample attached. We were not able to reproduce the NoSuchTransaction exception when using only reactive mongo template for injection. However we were not quite sure, if this is just the case due to avoiding spring-data-mongodb error handling by usage of the template or if the error does not occur.
Kind regards Lennard (colleague of Frederik and David)
👍 - thank you both - it's on this weeks todo list.
From a Spring Data point of view it looks like things are working as expected.
Judging from the log output it seems as if the drivers internal transaction number counter gets out of sync at some point, leading to the encountered failure where the given transaction number (txNumber in the logs) does not match the one within the ServerSession.
o.s.d.m.ReactiveMongoTransactionManager : About to start transaction for session
[
ClientSessionPublisherImpl@b55eb88 id = {"id": {"$binary": {"base64": "UYdiB/LBQW+xdCuRI/XQOw==", "subType": "04"}}},
causallyConsistent = true,
txActive = false,
txNumber = 32,
closed = false,
clusterTime = null
]
o.s.d.m.ReactiveMongoTransactionManager : Started transaction for session
[
ClientSessionPublisherImpl@b55eb88 id = {"id": {"$binary": {"base64": "UYdiB/LBQW+xdCuRI/XQOw==", "subType": "04"}}},
causallyConsistent = true,
txActive = true,
txNumber = 33,
closed = false,
clusterTime = null
].
Command failed with error 251 (NoSuchTransaction): 'Given transaction number 33 does not match any in-progress transactions.
The active transaction number is 32' on server 127.0.0.1:27017.
The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Given transaction number 33 does not match any in-progress transactions. The active transaction number is 32", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730184660, "i": 11}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1730184660, "i": 11}}}
o.s.d.m.ReactiveMongoTransactionManager : Initiating transaction rollback
o.s.d.m.ReactiveMongoTransactionManager : About to abort transaction for session
[
ClientSessionPublisherImpl@b55eb88 id = {"id": {"$binary": {"base64": "UYdiB/LBQW+xdCuRI/XQOw==", "subType": "04"}}},
causallyConsistent = true,
txActive = true,
txNumber = 33,
closed = false,
clusterTime = {"clusterTime": {"$timestamp": {"t": 1730184660, "i": 11}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}
]
@jyemin could you have a look/share some thoughts from the drivers perspective?
@christophstrobl will have a look, but wanted to get your thoughts on this comment:
We were not able to reproduce the NoSuchTransaction exception when using only reactive mongo template for injection. However we were not quite sure, if this is just the case due to avoiding spring-data-mongodb error handling by usage of the template or if the error does not occur.
Does it suggest anything to you? I'm not clear how the two scenarios differ in practice.
@jyemin For the ReactiveMongoTransactionManager the two scenarios should not really differ.
We've seen such issues, that only manifest in larger/more complex chains, in other projects as well. Might be caused by Reactor switching Threads and ServerSessionImpl#transactionNumber not being volatile.
@DavidFischer1010 I tried reproducing the error using https://github.com/user-attachments/files/17540925/minimal-sample.zip and was unsuccessful. Here's the steps I followed:
- Downloaded MongoDB 7.0.1
- Started a single node replica set on port 27017
- Download the project and loaded it into IntelliJ
- Configured IntelliJ to use Java 17 for both Gradle and the project itself
- Ran the
com.example.demo.IntegrationTest#should process multiple save AnyDocument requeststest 5 times from IntelliJ. All runs completed successfully.
Can you shed any light on what you are doing differently?
@jyemin we used MongoDB 6.0.18 and Java 21, but the problem should also occur with your version setup as I just tried it. When I run the tests approx. every second run fails. Weird. Can you try to run the test more often to verify that it is green all the time?
I just tried it 10 times in a row and every run is green. Just to confirm, I'm using the zip file that contains this configuration:
@Configuration
@EnableReactiveMongoRepositories
class MongoConfiguration : AbstractReactiveMongoConfiguration() {
override fun getDatabaseName(): String {
return "anyname"
}
@Bean
fun transactionManager(factory: ReactiveMongoDatabaseFactory): ReactiveMongoTransactionManager {
return ReactiveMongoTransactionManager(factory)
}
}
Is that the right one?
Yes, that is correct. I did exactly the same. I downloaded the https://github.com/user-attachments/files/17540925/minimal-sample.zip demo project and ran that with the exact same versions as you in Intellij. When I replace @Test with @RepeatedTest(50) I get the following result:
We also confirmed that on a second machine. Did you use the docker-compose setup from the minimal example with the given init script for the replica set?
I used temurin-17.0.13+11 to run the test. A failed test stack trace looks likes this:
2024-10-30T16:15:28.409+01:00 ERROR 92039 --- [ Thread-10] a.w.r.e.AbstractErrorWebExceptionHandler : [a6c7130e-5] 500 Server Error for HTTP POST "/foo"
org.springframework.data.mongodb.MongoTransactionException: Command failed with error 251 (NoSuchTransaction): 'Given transaction number 5 does not match any in-progress transactions. The active transaction number is 4' on server 127.0.0.1:27017. The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Given transaction number 5 does not match any in-progress transactions. The active transaction number is 4", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730301327, "i": 10}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1730301327, "i": 9}}}
at org.springframework.data.mongodb.core.MongoExceptionTranslator.translateExceptionIfPossible(MongoExceptionTranslator.java:130) ~[spring-data-mongodb-4.3.4.jar:4.3.4]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Handler com.example.demo.FooController#foo(AnyDocumentRequest) [DispatcherHandler]
*__checkpoint ⇢ HTTP POST "/foo" [ExceptionHandlingWebHandler]
Original Stack Trace:
at org.springframework.data.mongodb.core.MongoExceptionTranslator.translateExceptionIfPossible(MongoExceptionTranslator.java:130) ~[spring-data-mongodb-4.3.4.jar:4.3.4]
at org.springframework.data.mongodb.core.ReactiveMongoTemplate.potentiallyConvertRuntimeException(ReactiveMongoTemplate.java:2768) ~[spring-data-mongodb-4.3.4.jar:4.3.4]
at org.springframework.data.mongodb.core.ReactiveMongoTemplate.lambda$translateException$99(ReactiveMongoTemplate.java:2751) ~[spring-data-mongodb-4.3.4.jar:4.3.4]
at reactor.core.publisher.Mono.lambda$onErrorMap$29(Mono.java:3862) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4576) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.6.10.jar:3.6.10]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:205) ~[reactor-core-3.6.10.jar:3.6.10]
at com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.lambda$sinkToCallback$33(MongoOperationPublisher.java:520) ~[mongodb-driver-reactivestreams-5.0.1.jar:na]
at com.mongodb.reactivestreams.client.internal.OperationExecutorImpl.lambda$execute$9(OperationExecutorImpl.java:126) ~[mongodb-driver-reactivestreams-5.0.1.jar:na]
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.operation.AsyncOperationHelper.lambda$exceptionTransformingCallback$17(AsyncOperationHelper.java:330) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:97) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier$RetryingCallback.onResult(RetryingAsyncCallbackSupplier.java:118) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:97) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:97) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$executeBulkWriteBatchAsync$9(MixedBulkWriteOperation.java:339) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.function.AsyncCallbackLoop$LoopingCallback.onResult(AsyncCallbackLoop.java:73) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.function.AsyncCallbackLoop$LoopingCallback.onResult(AsyncCallbackLoop.java:62) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$executeBulkWriteBatchAsync$7(MixedBulkWriteOperation.java:333) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.lambda$executeAsync$0(DefaultServer.java:245) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.CommandProtocolImpl.lambda$executeAsync$0(CommandProtocolImpl.java:86) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.lambda$sendAndReceiveAsync$1(DefaultConnectionPool.java:774) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.UsageTrackingInternalConnection.lambda$sendAndReceiveAsync$1(UsageTrackingInternalConnection.java:150) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection.lambda$sendCommandMessageAsync$0(InternalStreamConnection.java:534) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:826) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:789) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:648) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:245) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:228) ~[mongodb-driver-core-5.0.1.jar:na]
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129) ~[na:na]
at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:160) ~[na:na]
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:573) ~[na:na]
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276) ~[na:na]
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297) ~[na:na]
at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:144) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:122) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:111) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection.access$600(InternalStreamConnection.java:93) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:779) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:763) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:648) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:245) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:228) ~[mongodb-driver-core-5.0.1.jar:na]
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129) ~[na:na]
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:447) ~[na:na]
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:195) ~[na:na]
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:217) ~[na:na]
at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312) ~[na:na]
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Given transaction number 5 does not match any in-progress transactions. The active transaction number is 4' on server 127.0.0.1:27017. The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Given transaction number 5 does not match any in-progress transactions. The active transaction number is 4", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730301327, "i": 10}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1730301327, "i": 9}}}
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection.lambda$sendCommandMessageAsync$0(InternalStreamConnection.java:523) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:826) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:789) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:648) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:245) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:228) ~[mongodb-driver-core-5.0.1.jar:na]
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129) ~[na:na]
at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:160) ~[na:na]
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:573) ~[na:na]
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276) ~[na:na]
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297) ~[na:na]
at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:144) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:122) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:111) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection.access$600(InternalStreamConnection.java:93) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:779) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:763) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:648) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:645) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:245) ~[mongodb-driver-core-5.0.1.jar:na]
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:228) ~[mongodb-driver-core-5.0.1.jar:na]
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129) ~[na:na]
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:447) ~[na:na]
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:195) ~[na:na]
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:217) ~[na:na]
at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312) ~[na:na]
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
2024-10-30T16:15:28.415+01:00 ERROR 92039 --- [ Test worker] o.s.t.w.reactive.server.ExchangeResult : Request details for assertion failure:
> POST http://localhost:62932/foo
> accept-encoding: [gzip]
> user-agent: [ReactorNetty/1.1.22]
> host: [localhost:62932]
> WebTestClient-Request-Id: [5]
> Accept: [application/json]
> Content-Type: [application/json]
> Content-Length: [32]
{"list":[{"id":"3"},{"id":"4"}]}
< 500 INTERNAL_SERVER_ERROR Internal Server Error
< Content-Type: [application/json]
< Content-Length: [129]
{"timestamp":"2024-10-30T15:15:28.409+00:00","path":"/foo","status":500,"error":"Internal Server Error","requestId":"a6c7130e-5"}
Status
Expected :200 OK
Actual :500 INTERNAL_SERVER_ERROR
<Click to see difference>
java.lang.AssertionError: Status expected:<200 OK> but was:<500 INTERNAL_SERVER_ERROR>
at org.springframework.test.util.AssertionErrors.fail(AssertionErrors.java:59)
at org.springframework.test.util.AssertionErrors.assertEquals(AssertionErrors.java:122)
at org.springframework.test.web.reactive.server.StatusAssertions.lambda$assertStatusAndReturn$4(StatusAssertions.java:236)
at org.springframework.test.web.reactive.server.ExchangeResult.assertWithDiagnostics(ExchangeResult.java:232)
at org.springframework.test.web.reactive.server.StatusAssertions.assertStatusAndReturn(StatusAssertions.java:236)
at org.springframework.test.web.reactive.server.StatusAssertions.isOk(StatusAssertions.java:68)
at com.example.demo.IntegrationTest.should process multiple save AnyDocument requests(IntegrationTest.kt:88)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:711)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Thanks. Using @RepeatedTest(50) I can reproduce the issue. It fails 9 of the 50 executions.
@christophstrobl I also tried with a locally modified version of mongodb-driver-core where I changed com.mongodb.internal.session.ServerSessionPool.ServerSessionImpl#transactionNumber to volatile, but it had no effect. So something else must be going on. Will continue to investigate.
I enabled debug logging for org.mongodb.driver.protocol.command and I think I can now explain what's going on.
The application is calling the method SimpleReactiveMongoRepository#saveAll(java.lang.Iterable<S>) to save the two entities. Because EntityInformation#isNew returns false, that method in turn executes this line:
Flux.fromIterable(entities).flatMap(this::save)
The interesting part here is that the save operations are all executed concurrently, and because the execution has been made transactional, they all execute in the same transaction context. This ultimately results in the following two update commands being sent to the server concurrently:
{"update": "any-document", "ordered": true, "$db": "anyname", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730310127, "i": 8}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "588FpS8YSsuL3Ev0Oizuww==", "subType": "04"}}}, "txnNumber": 50, "startTransaction": true, "autocommit": false, "updates": [{"q": {"_id": "1"}, "u": {"_id": "1", "_class": "com.example.demo.AnyDocument"}, "upsert": true}]}
{"update": "any-document", "ordered": true, "$db": "anyname", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730310127, "i": 8}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "588FpS8YSsuL3Ev0Oizuww==", "subType": "04"}}}, "txnNumber": 50, "autocommit": false, "updates": [{"q": {"_id": "2"}, "u": {"_id": "2", "_class": "com.example.demo.AnyDocument"}, "upsert": true}]}
Because the commands are both part of the same transaction, they have the same lsid and txnNumber, but only the first has startTransaction: true, which is required by the server for the first command in any transaction. But because these commands are executed concurrently, occasionally the second one will arrive before the first, and since the second one does not include startTransaction: true, the server will, correctly, reply with a NoSuchTransaction error.
That's why the documentation for com.mongodb.reactivestreams.client.MongoCluster#startSession() has the following admonition:
Note: A ClientSession instance can not be used concurrently in multiple operations.
@christophstrobl, given this, I think there's a good argument that the bug is in that saveAll method, because it's executing operations concurrently using the same session. Instead of using flatMap to update each document independently, could it perhaps use ReactiveBulkOperations to do all the updates at once, similar to what it does for inserts when EntityInformation#isNew returns true (by calling ReactiveMongoOperations#insert(Collection<? extends T>, String))?
Thank you @jyemin for your findings. I'll update the save operation and get back to you.
My bad - there was indeed a flatMap in that particular code path where it should have been flatMapSequential.
Thank you all for your time ❤️ !
#4824 will fix the issue - snapshot builds for 4.4.x-GH-4804-SNAPSHOT should be available soon if you want to give it a try.
Thanks @christophstrobl . flatMapSequential seems correct, but the performance will be poor compared to the insert, which is done in bulk. Is there any way that the code could take advantage of MongoCollection#bulkWrite here, so there is just one database round trip for all the updates?
there are differences in our event publication & callback handling. If one wants to do true batched insert it needs to be done via the template and ReactiveBulkOperations.
Thanks for your help with this and the quick solution!
I just tried to apply the fix and still get the same error:
// FooController.kt
@PostMapping(value = ["/foo"], consumes = [MediaType.APPLICATION_JSON_VALUE])
@Transactional
fun foo(@RequestBody request: AnyDocumentRequest): Mono<Void> {
return Flux.fromIterable(request.list)
.flatMapSequential { anyDocumentRepository.save(AnyDocument(it.id)) }
.then()
}
I also enabled debug logging for org.mongodb.driver.protocol.command.
It seems that race conditions can still occur. Here e.g. the order of command execution seems correct (according to the logs), but according to the timestamp they seem to be executed in about the same moment.
2024-11-01T10:39:35.317+01:00 DEBUG 26060 --- [tter-3-thread-1] org.mongodb.driver.protocol.command : Command "update" started on database "anyname" using a connection with driver-generated ID 6 and server-generated ID 60 to 127.0.0.1:27017. The request ID is 56 and the operation ID is 54. Command: {"update": "any-document", "ordered": true, "$db": "anyname", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730453975, "i": 48}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "Ux/pvBjaRxK7kXC0EqqCNA==", "subType": "04"}}}, "txnNumber": 13, "startTransaction": true, "autocommit": false, "updates": [{"q": {"_id": "1"}, "u": {"_id": "1", "_class": "com.example.demo.AnyDocument"}, "upsert": true}]}
2024-11-01T10:39:35.317+01:00 DEBUG 26060 --- [tter-3-thread-1] org.mongodb.driver.protocol.command : Command "update" started on database "anyname" using a connection with driver-generated ID 5 and server-generated ID 59 to 127.0.0.1:27017. The request ID is 57 and the operation ID is 55. Command: {"update": "any-document", "ordered": true, "$db": "anyname", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730453975, "i": 48}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "Ux/pvBjaRxK7kXC0EqqCNA==", "subType": "04"}}}, "txnNumber": 13, "autocommit": false, "updates": [{"q": {"_id": "2"}, "u": {"_id": "2", "_class": "com.example.demo.AnyDocument"}, "upsert": true}]}
2024-11-01T10:39:35.319+01:00 DEBUG 26060 --- [ Thread-5] org.mongodb.driver.protocol.command : Command "update" succeeded on database "anyname" in 1.959125 ms using a connection with driver-generated ID 6 and server-generated ID 60 to 127.0.0.1:27017. The request ID is 56 and the operation ID is 54. Command reply: {"n": 1, "electionId": {"$oid": "7fffffff0000000000000002"}, "opTime": {"ts": {"$timestamp": {"t": 1730453975, "i": 41}}, "t": 2}, "upserted": [{"index": 0, "_id": "1"}], "nModified": 0, "ok": 1.0, "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730453975, "i": 48}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1730453975, "i": 41}}}
2024-11-01T10:39:35.320+01:00 DEBUG 26060 --- [ Thread-14] org.mongodb.driver.protocol.command : Command "update" failed on database "anyname" in 2.354958 ms using a connection with driver-generated ID 5 and server-generated ID 59 to 127.0.0.1:27017. The request ID is 57 and the operation ID is 55.
com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Given transaction number 13 does not match any in-progress transactions. The active transaction number is 12' on server 127.0.0.1:27017. The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Given transaction number 13 does not match any in-progress transactions. The active transaction number is 12", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1730453975, "i": 48}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1730453975, "i": 41}}}
What do you think?
I can get it fixed if I use concatMap instead of flatMapSequential. As I understand this ensures that not only the subscription process is sequentially, but also the result emission and therefore the result order is ensured:
@PostMapping(value = ["/foo"], consumes = [MediaType.APPLICATION_JSON_VALUE])
@Transactional
fun foo(@RequestBody request: AnyDocumentRequest): Mono<Void> {
return Flux.fromIterable(request.list)
.concatMap { anyDocumentRepository.save(AnyDocument(it.id)) }
.then()
}
There are differences in our event publication & callback handling. If one wants to do true batched insert it needs to be done via the template and ReactiveBulkOperations.
I'm not sure I understand this, @christophstrobl . The current code looks like this:
return source.stream().allMatch(entityInformation::isNew) ? //
mongoOperations.insert(source.stream().collect(Collectors.toList()), entityInformation.getCollectionName()) : //
Flux.fromIterable(entities).flatMap(this::save);
IIUC the insert is done in bulk, so couldn't the save be done in bulk as well? Is the issue that there is no method in ReactiveMongoOperations like:
<T> Flux<T> save(Collection<? extends T> batchToSave, String collectionName);
that translates to MongoCollection#bulkWrite?
I'm just concerned because it might surprise users of saveAll that inserts execute in bulk while upserts execute sequentially.
I did a bit more digging, and now I see that the implementation of
<T> Flux<T> insert(Collection<? extends T> batchToSave, String collectionName);
in ReactiveMongoTemplate also inserts each document individually. However, it also uses flatMapSequential. I suspect these also need to use concatMap in order to be correct when executed within a transaction.
But this change is going to have potentially significant effects on performance, so it's worrisome.
@jyemin for save operations of versioned entities we need to make sure a replace can only happen if the db version still matches which is something we cannot do in bulk. Generally speaking the idea of spring data repository does not really fit bulk operations.
Maybe @violetagg & @chemicL can give some advise on flatMapSquential vs. concatMap for the given scenario.
for save operations of versioned entities we need to make sure a replace can only happen if the db version still matches which is something we cannot do in bulk.
Why can't this be done in bulk? Is the issue that you need to report, separately for each document, whether there actually was a version mismatch?
@christophstrobl Hey, only now I just found the time to look into this, so it looks I'm late to the party. However, from the vague understanding that I seem to have after reading the lengthy discussion, I feel this is what was settled upon:
flatMapran concurrently and the transaction start operation could have been issued after another operation that referred to its ID was issued -> error.flatMapSequentialseemed to improve things but in the end after more testing had the same issue.concatMapissues requests sequentially so everything runs in the appropriate order.
What I was wondering is whether the request containing "startTransaction": true needs to run first, but the rest can be parallelized. If that's the case it's worth considering a sequence similar to the below:
Flux.just(firstItem)
.flatMap(this::save)
.concatWith(Flux.fromIterable(remainderOfItems)
.flatMapSequential(this::save)
)
This would ensure the transaction beginning arrives at the target server before the others. But then the rest could run in parallel, speeding things up a bit, potentially. It's also worth noting that flatMapSequential subscribes eagerly just like flatMap, but the difference is that it produces results in the order of the upstream chain's emissions and requires a buffer for out-of-order arrivals.
Let me know if this helps or whether concurrent execution is undesired - if so, please disregard the above :)