reactor-addons
reactor-addons copied to clipboard
Usage of MathFlux.sumInt() with lift causing ClassCastException
An issue was found on a SpringBoot 3.4.4 reactive (reactor core 3.7.4) application using MathFlux. Issue seems to be related to https://github.com/reactor/reactor-core/issues/3762
Expected Behavior
No exception is thrown
Actual Behavior
java.lang.ClassCastException: class reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber cannot be cast to class reactor.core.Fuseable$QueueSubscription (reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber and reactor.core.Fuseable$QueueSubscription are in unnamed module of loader 'app')
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:264) ~[reactor-core-3.7.4.jar:3.7.4]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint Handler com.example.ReactorMathReproducer.DemoController#sum() [DispatcherHandler]
*__checkpoint HTTP GET "/sum" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:264) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(MonoContextWriteRestoringThreadLocals.java:95) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.math.MathSubscriber.onSubscribe(MathSubscriber.java:37) ~[reactor-extra-3.5.2.jar:3.5.2]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocalsFuseable$FuseableContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocalsFuseable.java:105) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:50) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.math.MonoSumInt.subscribe(MonoSumInt.java:42) ~[reactor-extra-3.5.2.jar:3.5.2]
at reactor.core.publisher.MonoContextWriteRestoringThreadLocals.subscribe(MonoContextWriteRestoringThreadLocals.java:44) ~[reactor-core-3.7.4.jar:3.7.4]
....
Steps to Reproduce
Reproducer: https://github.com/bergfexing/reactor_class_cast_exception_reproducer/
To trigger the issue the following conditions must be fulfilled:
- micrometer jar must be in classpath
- reactor-core 3.6+
Hooks.enableAutomaticContextPropagation()must be calledonEachOperatorhook is set up and provided a function that sets up tracing lift operator.- MathFlux.sumInt() must be called
private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
return Operators.lift((a, b) -> b);
}
Hooks.onEachOperator("testTracingLift", tracingLift());
Hooks.enableAutomaticContextPropagation();
Flux<Integer> numbers = Flux.just(1, 2);
MathFlux.sumInt(numbers);
Possible Solution
Your Environment
- Reactor version(s) used: 3.7.4
- Other relevant libraries versions (eg.
netty, ...): SpringBoot 3.4.4 - JVM version (
java -version): 21.0.1 - OS and version (eg
uname -a): Windows 11
Thank you for the report. I moved this to the appropriate repository. I will try to have a look when I have some time, but contributions are welcome if anyone would like to investigate this.