Provide downstream operators access to Context defined upstream
Motivation
Current observations work only upstream, which makes it impossible to decorate for example reactor-kafka KafkaReceiver that returns elements from kafka consumer as unbounded flux publisher of ReceiverRecord<K, V>. With current state it will force any n > 1 element publishers to actually wrap stuff into Flux<Mono<T>> to have a context per element available further down or make the contract for the method accept callable that needs to be connected for the single element processing, which as well changes the contract.
So if by default we've had a kafka receiver with Flux<ReceiverRecord<K, V>> receive() we would need then to either wrap it as Flux<Mono<ReceiverRecord<K, V>>> receive() and handle around publishers emitted inside flux publisher or Flux<ReceiverRecord<K, V>> receive(Function<Mono<T>, Publisher<R>> recordProcessingPipeline) that would actually process each record inside it's own that would be handled inside the decorator but still it requires changing the original contract and users needs to understand that the actual pipeline must be contained in that recordProcessingPipeline, because result of the receive method will be treated as a flux publisher without a per element context, so all the should have there is just subscribe if they want to have their telemetry per element intact.
Desired solution
A way to register context per element in a flux publisher and possibility to propagate that context downstream. It would be nice if it could be transparent to source publishers so that we won't need to change contracts for each possible library because of telemetry :)
Considered alternatives
Something similar to spring boot 2 + sleuth ScopePassingSpanSubscriber. It would be nice if that could be provided manually, so that if there's a need to create a flux out of custom Sink, we could still propagate the context downstream to the consumers of the publisher on per element basis without modifying the original contract.
I think at times you're confusing upstream with downstream, which makes it a bit difficult to envision the idea.
For attaching an Observation to each individual item, for now, creating a Mono is the way to go. The way Context works in Reactor is that the write happens in contextWrite() operator. Considering the example:
Mono.just("Hello")
.map(s -> s + ", world")
.contextWrite(ctx2) // <2>
.map(s -> s + "!")
.contextWrite(ctx1) // <1>
.map(String::toUpperCase)
.subscribe();
The ctx1 Context in <1> is available in the upstream operators, unless one of them also alters the Context, as in <2>, in which case the upstream of <2> sees the altered version ctx2, and operators between <2> and <1> see the ctx1, until the contextWrite operator is reached by elements. After crossing that boundary, an empty Context is present in the String::toUpperCase map operator.
Using this example and any existing or proposed operators, how would you like to interact with the somehow attached context/Observation?
Yes sorry, it should be downstream (got the naming mixed up midway, i've updated the issue description).
The problem with attaching observation to each single mono is that we can't simply attach it transparently for subscribers of our flux. So assuming we provide for example public Flux<SomeItem> endlessStreamOfSomeItemFromEther() and someone subscribes to it, if i wanted to add tracing that is propagated to subscribers of this publisher per each element i have to either change the contract to public Flux<Mono<SomeItem>> endlessStreamOfSomeItemFromEther() or like this public CorePublisher<SomeItem> endlessStreamOfSomeItemFromEther(Function<Publisher<V>, Publisher<V> methodForTransformOperator). This way they either subscribe to subpublisher with context or we execute their part of the pipeline with method argument, while they can only subscribe to the result of the method (unless they do the magic of subscribing using their own Flux.from that will receive the elements but the context won't be there anymore).
I think the discussion in #3424 is around the same subject. I will close this issue in favour of that one.