reactor-netty icon indicating copy to clipboard operation
reactor-netty copied to clipboard

Make it possible to instrument task execution

Open bsideup opened this issue 5 years ago • 8 comments

See https://github.com/spring-cloud/spring-cloud-sleuth/issues/1392

Context

Since most of tracing solutions rely on ThreadLocals, they have to instrument task execution to handle the thread context switches and populate them. For Reactor, it wraps ScheduledExecutorService, but reactor-netty does not use them.

Requested change

It would help if reactor-netty provides means to instrument every Runnable that is submitted to be executed on another Thread.

bsideup avatar Jul 05 '19 09:07 bsideup

@bsideup I am not sure if I fully understands your issue. But here is my two cents.

I figured out a way of using the combination of publishOn, subscribeOn and Schedulers.single to make sure all the operators and subscription executed in one same thread for each call.

Scheduler subscriberPool = Schedulers.newParallel("subscriberPool", 10);
        //Pick a dedicate worker from subscriberPool for the following code
        Scheduler single = Schedulers.single(subscriberPool);

        Scheduler.Worker worker = single.createWorker();
        //Stores the Thread used by  worker
        AtomicReference<Thread> workerThreadRef = new AtomicReference<>();
        worker.schedule(() -> workerThreadRef.set(Thread.currentThread()));

        Mono<String> responseSingle = HttpClient.create().wiretap(true).baseUrl("http://httpbin.org")
            .post().uri(
                "/status/500").send((req, out) -> out.sendString(Mono.just("Test"))).responseSingle(
                (resp, byteBuf) -> {
                    Mono<String> body = byteBuf.asString().switchIfEmpty(Mono.just(resp.status().toString()));
                    if (resp.status() != HttpResponseStatus.OK) {
                        return body.flatMap(
                            error -> Mono.error(new RuntimeException(
                                "Request failed!(status={}, uri={})", resp.status(), resp.uri())));
                    } else {
                        return body;
                    }
                }).publishOn(single);

        String result = responseSingle.doOnError(throwable -> {
            throwable.printStackTrace();
            assertSame(Thread.currentThread(), workerThreadRef.get());
        }).onErrorReturn("error").doOnSubscribe(s -> {
            assertSame(Thread.currentThread(), workerThreadRef.get());
        }).subscriberContext(c -> {
            assertSame(Thread.currentThread(), workerThreadRef.get());
            return Context.empty();
        }).subscribeOn(single).block();
        assertEquals("error", result);

dantesun avatar Jul 13 '19 08:07 dantesun

Hello, is there any progress about this? We need this feature a lot!

copbint avatar Apr 19 '22 15:04 copbint

@copbint Can you describe your use case? Is it related only to event loop or reactive pipeline or may be tracing?

violetagg avatar Apr 19 '22 19:04 violetagg

When working with webflux(underlying is reactor), we want to be compatible with ThreadLocal. As Schedulers provided onScheduleHook, we write code like this, to carry MDC ThreadLocal when change to another thread:

        Schedulers.onScheduleHook("test", runnable -> {
            Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
            return () -> {
                MDC.setContextMap(copyOfContextMap);
                runnable.run();
            };
        });

But reactor-netty's Eventloop do not have a mechanism to instrument runnable like this.

I think the pattern provided by this doc(what-is-a-good-pattern-for-contextual-logging-mdc) is too complex.

Thanks for your reply, Have a nice day!

copbint avatar Apr 20 '22 00:04 copbint

@copbint Would you like to take a look at this PR https://github.com/reactor/reactor-core/pull/2983 and provide us feedback. This will be really helpful!

violetagg avatar Apr 20 '22 04:04 violetagg

@violetagg This PR is doing great job! It will be really helpful if we can get ContextView in onScheduleHook

copbint avatar Apr 21 '22 09:04 copbint

Describe in detail: We are developing a framework based on webflux in our company, framework code are all writing in reactive style, and will save some information into ContextView.

While reactive is difficult, We support our framework user to write Controller code in sync style(by switch to thread can blocking). If Controller is write in sync style, it's unable to get ContextView.

If ContextView is avaliable in Schedulers.onScheduleHook, then we can save ContextView to ThreadLocal, then user can get it from ThreadLocal , this will be really helpful.

While currently reactor-netty's Eventloop do not have a mechanism to instrument runnable like Schedulers.onScheduleHook, it will be perfect if Eventloop also support this.

Thanks you all!

copbint avatar Apr 21 '22 15:04 copbint

All It would be very helpful if you can review the PRs listed below and provide feedback directly there. Thanks a lot! https://github.com/reactor/reactor-core/pull/2983 https://github.com/reactor/reactor-core/pull/3145 https://github.com/reactor/reactor-core/pull/3147

violetagg avatar Aug 18 '22 08:08 violetagg

This is superseded by the new features in Reactor Core. If we need more we can reopen the issue.

violetagg avatar Oct 20 '22 11:10 violetagg