smallrye-mutiny icon indicating copy to clipboard operation
smallrye-mutiny copied to clipboard

MultiGroupBy Failure: Thread Blocked

Open sahak1an opened this issue 1 year ago • 5 comments

...
.group().by(key)
flatMap(group -> switch (group.key()) {
...
})

failed with io.vertx.core.VertxException: Thread blocked

Stack tracke
    
 WARN  [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2954 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.Safepoint.freezeAtSafepoint(Safepoint.java:299)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.Safepoint.slowPathSafepointCheck0(Safepoint.java:225)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.Safepoint.slowPathSafepointCheck(Safepoint.java:187)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.Safepoint.slowPathSafepointCheck(Safepoint.java:425)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.Safepoint.enterSlowPathSafepointCheck(Safepoint.java:412)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.genscavenge.ThreadLocalAllocation.slowPathNewInstanceWithoutAllocating(ThreadLocalAllocation.java:236)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.genscavenge.ThreadLocalAllocation.slowPathNewInstance(ThreadLocalAllocation.java:221)
        at [email protected]/java.util.concurrent.CopyOnWriteArrayList.getArray(CopyOnWriteArrayList.java:117)
        at [email protected]/java.util.concurrent.CopyOnWriteArrayList.add(CopyOnWriteArrayList.java:463)
        at io.smallrye.mutiny.operators.multi.MultiCacheOp.onNext(MultiCacheOp.java:92)
        at io.smallrye.mutiny.subscription.MultiSubscriberAdapter.onItem(MultiSubscriberAdapter.java:27)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
        at io.smallrye.mutiny.operators.multi.MultiGroupByOp$State.drain(MultiGroupByOp.java:400)
        at io.smallrye.mutiny.operators.multi.MultiGroupByOp$State.onItem(MultiGroupByOp.java:354)
        at io.smallrye.mutiny.operators.multi.MultiGroupByOp$GroupedUnicast.onItem(MultiGroupByOp.java:285)
        at io.smallrye.mutiny.operators.multi.MultiGroupByOp$MultiGroupByProcessor.onItem(MultiGroupByOp.java:127)
        at io.smallrye.mutiny.operators.multi.MultiCacheOp$CacheSubscription.replay(MultiCacheOp.java:183)
        at io.smallrye.mutiny.operators.multi.MultiCacheOp.onNext(MultiCacheOp.java:95)
        at io.smallrye.mutiny.subscription.MultiSubscriberAdapter.onItem(MultiSubscriberAdapter.java:27)
        at io.smallrye.mutiny.operators.multi.MultiSelectWhereOp$MultiSelectWhereProcessor.onItem(MultiSelectWhereOp.java:57)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
        at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:219)
        at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:554)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
        at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onItem(UniToMultiPublisher.java:94)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:60)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:60)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromPublisher$PublisherSubscriber.onComplete(UniCreateFromPublisher.java:86)
        at io.smallrye.mutiny.subscription.MultiSubscriberAdapter.onCompletion(MultiSubscriberAdapter.java:37)
        at io.smallrye.mutiny.operators.multi.MultiEmitOnOp$MultiEmitOnProcessor.isDoneOrCancelled(MultiEmitOnOp.java:248)
        at io.smallrye.mutiny.operators.multi.MultiEmitOnOp$MultiEmitOnProcessor.run(MultiEmitOnOp.java:188)
        at io.quarkus.mongodb.impl.Wrappers.lambda$toMulti$2(Wrappers.java:32)
        at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
        at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261)
        at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:59)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at [email protected]/java.lang.Thread.runWith(Thread.java:1583)
        at [email protected]/java.lang.Thread.run(Thread.java:1570)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:853)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:829)
   
 

Env

OS: Linux Fedora 40 Java 21 Graal reactive mutiny 2.6.0

sahak1an avatar Jul 02 '24 14:07 sahak1an

You will have to give us more details. It seems that you are blocking in the "switch" part.

cescoffier avatar Jul 02 '24 14:07 cescoffier

(BTW, the best is to provide a minimal reproducer; without that, it's hard to say - as the faulty code is most probably on your side)

cescoffier avatar Jul 02 '24 14:07 cescoffier


 return  ...
            .group().by(Metadata::getOperationType) //enum value
            .flatMap(group -> switch (group.key()) {
                case MODIFIED -> {
                // logging, simple operations   
                }
                case NEW -> persist(group);
            })
    }

    private static Multi<?> persist(GroupedMulti<OperationType, Metadata> metadataGroup) {
        return metadataGroup
            .group().intoLists().of(4096)
            .onItem().transformToUniAndMerge(batchMetadata -> {
                
                // reactive mongo
                return Metadata.persist(batchMetadata);
            });
    }

sahak1an avatar Jul 02 '24 14:07 sahak1an

There are occurrences of MultiCacheOp in the stacktrace, this seems suspicious to me.

jponge avatar Jul 02 '24 15:07 jponge

@sahak1an are you using a cache operator somewhere?

jponge avatar Jul 02 '24 17:07 jponge