reactor-netty
reactor-netty copied to clipboard
Make it possible to instrument task execution
See https://github.com/spring-cloud/spring-cloud-sleuth/issues/1392
Context
Since most of tracing solutions rely on ThreadLocal
s, they have to instrument task execution to handle the thread context switches and populate them. For Reactor, it wraps ScheduledExecutorService
, but reactor-netty does not use them.
Requested change
It would help if reactor-netty provides means to instrument every Runnable
that is submitted to be executed on another Thread.
@bsideup I am not sure if I fully understands your issue. But here is my two cents.
I figured out a way of using the combination of publishOn
, subscribeOn
and Schedulers.single
to make sure all the operators and subscription executed in one same thread for each call.
Scheduler subscriberPool = Schedulers.newParallel("subscriberPool", 10);
//Pick a dedicate worker from subscriberPool for the following code
Scheduler single = Schedulers.single(subscriberPool);
Scheduler.Worker worker = single.createWorker();
//Stores the Thread used by worker
AtomicReference<Thread> workerThreadRef = new AtomicReference<>();
worker.schedule(() -> workerThreadRef.set(Thread.currentThread()));
Mono<String> responseSingle = HttpClient.create().wiretap(true).baseUrl("http://httpbin.org")
.post().uri(
"/status/500").send((req, out) -> out.sendString(Mono.just("Test"))).responseSingle(
(resp, byteBuf) -> {
Mono<String> body = byteBuf.asString().switchIfEmpty(Mono.just(resp.status().toString()));
if (resp.status() != HttpResponseStatus.OK) {
return body.flatMap(
error -> Mono.error(new RuntimeException(
"Request failed!(status={}, uri={})", resp.status(), resp.uri())));
} else {
return body;
}
}).publishOn(single);
String result = responseSingle.doOnError(throwable -> {
throwable.printStackTrace();
assertSame(Thread.currentThread(), workerThreadRef.get());
}).onErrorReturn("error").doOnSubscribe(s -> {
assertSame(Thread.currentThread(), workerThreadRef.get());
}).subscriberContext(c -> {
assertSame(Thread.currentThread(), workerThreadRef.get());
return Context.empty();
}).subscribeOn(single).block();
assertEquals("error", result);
Hello, is there any progress about this? We need this feature a lot!
@copbint Can you describe your use case? Is it related only to event loop or reactive pipeline or may be tracing?
When working with webflux(underlying is reactor), we want to be compatible with ThreadLocal. As Schedulers provided onScheduleHook, we write code like this, to carry MDC ThreadLocal when change to another thread:
Schedulers.onScheduleHook("test", runnable -> {
Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
return () -> {
MDC.setContextMap(copyOfContextMap);
runnable.run();
};
});
But reactor-netty's Eventloop do not have a mechanism to instrument runnable
like this.
I think the pattern provided by this doc(what-is-a-good-pattern-for-contextual-logging-mdc) is too complex.
Thanks for your reply, Have a nice day!
@copbint Would you like to take a look at this PR https://github.com/reactor/reactor-core/pull/2983 and provide us feedback. This will be really helpful!
@violetagg This PR is doing great job! It will be really helpful if we can get ContextView
in onScheduleHook
。
Describe in detail:
We are developing a framework based on webflux in our company, framework code are all writing in reactive style, and will save some information into ContextView
.
While reactive is difficult, We support our framework user to write Controller code in sync style(by switch to thread can blocking). If Controller is write in sync style, it's unable to get ContextView
.
If ContextView
is avaliable in Schedulers.onScheduleHook
, then we can save ContextView
to ThreadLocal, then user can get it from ThreadLocal , this will be really helpful.
While currently reactor-netty's Eventloop do not have a mechanism to instrument runnable like Schedulers.onScheduleHook
, it will be perfect if Eventloop also support this.
Thanks you all!
All It would be very helpful if you can review the PRs listed below and provide feedback directly there. Thanks a lot! https://github.com/reactor/reactor-core/pull/2983 https://github.com/reactor/reactor-core/pull/3145 https://github.com/reactor/reactor-core/pull/3147
This is superseded by the new features in Reactor Core. If we need more we can reopen the issue.