opentelemetry-java-instrumentation icon indicating copy to clipboard operation
opentelemetry-java-instrumentation copied to clipboard

Context propagation lost using parallel streams

Open claudio-canellada-edo opened this issue 1 year ago • 8 comments

Hi, it seems that the context propagation is not done when using parallel streams, as only one of the child threads are getting it. I'm testing this using version 2.6.0 of the opentelemetry-instrumentation-api artifact.

This is the test:

public class StreamsTest {

    private static final ContextKey<Double> ANY_DOUBLE_KEY = ContextKey.named("any-double-key");
    private static final ContextKey<String> ANY_STRING_KEY = ContextKey.named("any-string-key");
    private static final Double ANY_DOUBLE_VALUE = 17.0d;
    private static final String ANY_STRING_VALUE = "any-string-value";
    
    @Test
    public void contextTest() throws Exception {
        // The scope is auto closable
        try (Scope scope = Context.current() // Get the current context to add values
                .with(ANY_DOUBLE_KEY, ANY_DOUBLE_VALUE) // Add any double value
                .makeCurrent()) {  // A scope is created when we set the new context as current

            // If we want to add another key/value we need to create a new scope
            try (Scope innerScope = Context.current()
                    .with(ANY_STRING_KEY, ANY_STRING_VALUE) // Add any string value
                    .makeCurrent()) {

                executeSomethingWithStreams();
            }
        }
    }

    private void executeSomethingWithStreams() {
        Integer sum = IntStream.rangeClosed(0, 3)
                .parallel()
                .peek(this::assertContext)
                .reduce(0, Integer::sum);

        assertEquals(sum, Integer.valueOf(6));
    }

    private void assertContext(int i) {
        System.out.printf("I'm process %d and my double value from context is %s%n", i, Context.current().get(ANY_DOUBLE_KEY));
    }
}

And the output:

I'm process 2 and my double value from context is 17.0
I'm process 0 and my double value from context is null
I'm process 1 and my double value from context is null
I'm process 3 and my double value from context is null

claudio-canellada-edo avatar Aug 12 '24 15:08 claudio-canellada-edo

can you try this:

 try (Scope ignored = Context.current().makeCurrent()) { // **makeCurrent()**
   assert Context.current() == ctx;
   ...
 }

heyams avatar Aug 16 '24 18:08 heyams

I'm testing this using version 2.6.0 of the opentelemetry-instrumentation-api artifact.

are you also using the Java agent?

trask avatar Aug 19 '24 22:08 trask

Yes, we have the java agent too

ajaparicio avatar Aug 20 '24 15:08 ajaparicio

can you try this:

 try (Scope ignored = Context.current().makeCurrent()) { // **makeCurrent()**
   assert Context.current() == ctx;
   ...
 }

Hi @heyams, I don't know what is "ctx" to create the assert Sorry

ajaparicio avatar Aug 20 '24 15:08 ajaparicio

@ajaparicio don't worry about that assert statement, can you try this instead:

    private void executeSomethingWithStreams() {
	Context context = Context.current();
        Integer sum = IntStream.rangeClosed(0, 3)
                .parallel()
                .peek(i -> {
		    try (Scope scope = context.makeCurrent()) {
	                 assertContext(i);
		    }
                 })
                .reduce(0, Integer::sum);
        assertEquals(sum, Integer.valueOf(6));
    }

please let me know if that works.

heyams avatar Aug 20 '24 19:08 heyams

@heyams yes, setting the scope and then call the method works, thanks! We can use this workaround. Do you thing in the future we will have the propagation automatically?

ajaparicio avatar Aug 21 '24 06:08 ajaparicio

Context propagation is done automatically in these instrumentations

It is not handled automatically in this case. I don't know if it can be implemented in the future. It will require some investigation.

heyams avatar Aug 21 '24 17:08 heyams

Thank you for the response, @heyams. In that case, we should keep this issue open until we find an implementation solution.

ajaparicio avatar Aug 22 '24 07:08 ajaparicio

Since there has been no activity on this enhancement for the past year we are closing it to help maintain our backlog. Anyone who would like to work on it is still welcome to do so, and we can re-open it at that time.

trask avatar Oct 03 '25 00:10 trask