tracing icon indicating copy to clipboard operation
tracing copied to clipboard

Close opened Scope in the child thread

Open sondemar opened this issue 7 months ago • 0 comments

Hi, I would like to encapsulate a method returning a CompletableFuture step/task with the Observation API, ensuring full observability (the observation scope should enclose the method's scope).

The CompletableFuture in the method will contain asynchronous operations. This means the Observation needs to be propagated to child threads, and at the same time, the Observation needs to be stopped (with the scope closed) in the child thread without further propagation to the next steps in the chain (outside this method). I need to have control over closing the opened scope because it may lead to thread pollution and memory leaks.

The only solution that comes to mind is to open the observation scope manually, make a snapshot of it, and then set thread locals in every CompletableFuture sub-step inside the method.

I can illustrate the scenario with the following example:

    @Test
    void testObservableAwareCompletableFuture() {
        CompletableFuture<Void> cf = CompletableFuture.completedFuture("hello ").thenApplyAsync(s -> {
                    log("dummy logging in a separate thread");
                    return s;
                }).thenCombine(observableExternalApi(), (s, s2) -> s + s2)
                .thenAccept(s -> log("result: " + s));
        cf.join();
        log("after completion the whole chain of steps");
    }

    private void log(String message) {
        if (registry.getCurrentObservation() == null) {
            message += " not";
        }
        message += " observable";
        log.info(message);
    }

    private CompletableFuture<String> observableExternalApi() {
        Observation observation = Observation.createNotStarted("test", registry).start();
        log("before opening scope");
        ContextSnapshot snapshot;
        try (Observation.Scope __ = observation.openScope()) {
            snapshot = ContextSnapshot.captureAll();
            log("after creating a snapshot");
        }

        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    //    threadPoolTaskExecutor.setTaskDecorator(snapshot::wrap);
        threadPoolTaskExecutor.afterPropertiesSet();


        CompletableFuture<String> cf = CompletableFuture.completedFuture("world").thenApply(s -> {
                    try (ContextSnapshot.Scope __ = snapshot.setThreadLocals()) {
                        log("before async send");
                        return s;
                    }
                }).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
                            try (ContextSnapshot.Scope __ = snapshot.setThreadLocals()) {
                                try {
                                    TimeUnit.SECONDS.sleep(1);
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                                log("after async send");
                                return s;
                            }
                        }
                        ,
                        threadPoolTaskExecutor))
                .whenComplete((s, throwable) -> {
                    try (ContextSnapshot.Scope __ = snapshot.setThreadLocals()) {
                        log("before stopping observation - ");
                    }
                    observation.stop();
                    log("after stopping observation - ");

                });
        return cf;
    }

Which produces the following expected output:

[ForkJoinPool.commonPool-worker-1] INFO  [] [] w.api.functional.ApplicationFlowTest - dummy logging in a separate thread not observable
[main] INFO  [] [] w.api.functional.ApplicationFlowTest - before opening scope not observable
[main] INFO  [e7e4306f9f0d9810] [e7e4306f9f0d9810] w.api.functional.ApplicationFlowTest - after creating a snapshot observable
[main] INFO  [e7e4306f9f0d9810] [e7e4306f9f0d9810] w.api.functional.ApplicationFlowTest - before async send observable
[ThreadPoolTaskExecutor-1] INFO  [e7e4306f9f0d9810] [e7e4306f9f0d9810] w.api.functional.ApplicationFlowTest - after async send observable
[ThreadPoolTaskExecutor-1] INFO  [e7e4306f9f0d9810] [e7e4306f9f0d9810] w.api.functional.ApplicationFlowTest - before stopping observation -  observable
[ThreadPoolTaskExecutor-1] INFO  [] [] w.api.functional.ApplicationFlowTest - after stopping observation -  not observable
[ThreadPoolTaskExecutor-1] INFO  [] [] w.api.functional.ApplicationFlowTest - result: hello world not observable
[main] INFO  [] [] w.api.functional.ApplicationFlowTest - after completion the whole chain of steps not observable

Please note, that I have not decorated the tasks of the executor with the snapshot, as reflected by the commented line:

   //    threadPoolTaskExecutor.setTaskDecorator(snapshot::wrap);

If I uncomment this line, I will get undesired behavior (the scope will be opened outside the invoked method and propagated):

[ForkJoinPool.commonPool-worker-1] INFO  [] [] w.api.functional.ApplicationFlowTest - dummy logging in a separate thread not observable
[main] INFO  [] [] w.api.functional.ApplicationFlowTest - before opening scope not observable
[main] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - after creating a snapshot observable
[main] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - before async send observable
[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - after async send observable
[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - before stopping observation -  observable
[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - after stopping observation -  observable
[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - result: hello world observable
[main] INFO  [] [] w.api.functional.ApplicationFlowTest - after completion the whole chain of steps not observable

These two invocations should not be observable:

[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - after stopping observation -  observable
[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - result: hello world observable

but they are because the tasks of the executor are automatically instrumented with the snapshot.

Could you please provide guidance on how to achieve this?

sondemar avatar Jul 05 '24 05:07 sondemar