Context propagation lost using parallel streams
Hi, it seems that the context propagation is not done when using parallel streams, as only one of the child threads are getting it. I'm testing this using version 2.6.0 of the opentelemetry-instrumentation-api artifact.
This is the test:
public class StreamsTest {
private static final ContextKey<Double> ANY_DOUBLE_KEY = ContextKey.named("any-double-key");
private static final ContextKey<String> ANY_STRING_KEY = ContextKey.named("any-string-key");
private static final Double ANY_DOUBLE_VALUE = 17.0d;
private static final String ANY_STRING_VALUE = "any-string-value";
@Test
public void contextTest() throws Exception {
// The scope is auto closable
try (Scope scope = Context.current() // Get the current context to add values
.with(ANY_DOUBLE_KEY, ANY_DOUBLE_VALUE) // Add any double value
.makeCurrent()) { // A scope is created when we set the new context as current
// If we want to add another key/value we need to create a new scope
try (Scope innerScope = Context.current()
.with(ANY_STRING_KEY, ANY_STRING_VALUE) // Add any string value
.makeCurrent()) {
executeSomethingWithStreams();
}
}
}
private void executeSomethingWithStreams() {
Integer sum = IntStream.rangeClosed(0, 3)
.parallel()
.peek(this::assertContext)
.reduce(0, Integer::sum);
assertEquals(sum, Integer.valueOf(6));
}
private void assertContext(int i) {
System.out.printf("I'm process %d and my double value from context is %s%n", i, Context.current().get(ANY_DOUBLE_KEY));
}
}
And the output:
I'm process 2 and my double value from context is 17.0
I'm process 0 and my double value from context is null
I'm process 1 and my double value from context is null
I'm process 3 and my double value from context is null
can you try this:
try (Scope ignored = Context.current().makeCurrent()) { // **makeCurrent()**
assert Context.current() == ctx;
...
}
I'm testing this using version 2.6.0 of the opentelemetry-instrumentation-api artifact.
are you also using the Java agent?
Yes, we have the java agent too
can you try this:
try (Scope ignored = Context.current().makeCurrent()) { // **makeCurrent()** assert Context.current() == ctx; ... }
Hi @heyams, I don't know what is "ctx" to create the assert Sorry
@ajaparicio don't worry about that assert statement, can you try this instead:
private void executeSomethingWithStreams() {
Context context = Context.current();
Integer sum = IntStream.rangeClosed(0, 3)
.parallel()
.peek(i -> {
try (Scope scope = context.makeCurrent()) {
assertContext(i);
}
})
.reduce(0, Integer::sum);
assertEquals(sum, Integer.valueOf(6));
}
please let me know if that works.
@heyams yes, setting the scope and then call the method works, thanks! We can use this workaround. Do you thing in the future we will have the propagation automatically?
Context propagation is done automatically in these instrumentations
It is not handled automatically in this case. I don't know if it can be implemented in the future. It will require some investigation.
Thank you for the response, @heyams. In that case, we should keep this issue open until we find an implementation solution.
Since there has been no activity on this enhancement for the past year we are closing it to help maintain our backlog. Anyone who would like to work on it is still welcome to do so, and we can re-open it at that time.