reactor-addons icon indicating copy to clipboard operation
reactor-addons copied to clipboard

Usage of MathFlux.sumInt() with lift causing ClassCastException

Open bergfexing opened this issue 9 months ago • 1 comments

An issue was found on a SpringBoot 3.4.4 reactive (reactor core 3.7.4) application using MathFlux. Issue seems to be related to https://github.com/reactor/reactor-core/issues/3762

Expected Behavior

No exception is thrown

Actual Behavior

 java.lang.ClassCastException: class reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber cannot be cast to class reactor.core.Fuseable$QueueSubscription (reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber and reactor.core.Fuseable$QueueSubscription are in unnamed module of loader 'app')
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:264) ~[reactor-core-3.7.4.jar:3.7.4]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint  Handler com.example.ReactorMathReproducer.DemoController#sum() [DispatcherHandler]
	*__checkpoint  HTTP GET "/sum" [ExceptionHandlingWebHandler]
Original Stack Trace:
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:264) ~[reactor-core-3.7.4.jar:3.7.4]
		at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.7.4.jar:3.7.4]
		at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122) ~[reactor-core-3.7.4.jar:3.7.4]
		at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.7.4.jar:3.7.4]
		at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(MonoContextWriteRestoringThreadLocals.java:95) ~[reactor-core-3.7.4.jar:3.7.4]
		at reactor.math.MathSubscriber.onSubscribe(MathSubscriber.java:37) ~[reactor-extra-3.5.2.jar:3.5.2]
		at reactor.core.publisher.FluxContextWriteRestoringThreadLocalsFuseable$FuseableContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocalsFuseable.java:105) ~[reactor-core-3.7.4.jar:3.7.4]
		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:50) ~[reactor-core-3.7.4.jar:3.7.4]
		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59) ~[reactor-core-3.7.4.jar:3.7.4]
		at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68) ~[reactor-core-3.7.4.jar:3.7.4]
		at reactor.math.MonoSumInt.subscribe(MonoSumInt.java:42) ~[reactor-extra-3.5.2.jar:3.5.2]
		at reactor.core.publisher.MonoContextWriteRestoringThreadLocals.subscribe(MonoContextWriteRestoringThreadLocals.java:44) ~[reactor-core-3.7.4.jar:3.7.4]
....

Steps to Reproduce

Reproducer: https://github.com/bergfexing/reactor_class_cast_exception_reproducer/

To trigger the issue the following conditions must be fulfilled:

  • micrometer jar must be in classpath
  • reactor-core 3.6+
  • Hooks.enableAutomaticContextPropagation() must be called
  • onEachOperator hook is set up and provided a function that sets up tracing lift operator.
  • MathFlux.sumInt() must be called

	private static  <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
		return Operators.lift((a, b) -> b);
	}

	Hooks.onEachOperator("testTracingLift", tracingLift());
	Hooks.enableAutomaticContextPropagation();

        Flux<Integer> numbers = Flux.just(1, 2);
        MathFlux.sumInt(numbers);

Possible Solution

Your Environment

  • Reactor version(s) used: 3.7.4
  • Other relevant libraries versions (eg. netty, ...): SpringBoot 3.4.4
  • JVM version (java -version): 21.0.1
  • OS and version (eg uname -a): Windows 11

bergfexing avatar Apr 02 '25 09:04 bergfexing

Thank you for the report. I moved this to the appropriate repository. I will try to have a look when I have some time, but contributions are welcome if anyone would like to investigate this.

chemicL avatar Apr 03 '25 12:04 chemicL