servicetalk icon indicating copy to clipboard operation
servicetalk copied to clipboard

Reactive Streams bridge and crash in MapPublisher

Open akarnokd opened this issue 6 years ago • 2 comments

In the MapPublisher operator, the comment indicates that the caller of onNext is responsible for handling crashes. However, the ReactiveStreamsAdapter calls the servicetalk's Publisher.Subscriber.onNext without protection.

If I understand correctly, there is a try-catch and further code needed in the adapter to protect against this case, right? ~~I'm happy to post a PR.~~

Edit: Apparently, the project won't build or even load properly on Windows+IntelliJ so I won't be able to post a PR after all.

akarnokd avatar Nov 08 '19 09:11 akarnokd

ReactiveStreams spec rule 2.13 [1] disallows throwing arbitrary exceptions from Subscriber methods.

Considering this rule we take liberty in our operator implementations to avoid the complexity of handling these unexpected exceptions in the intermediary layers (operators). The complexity here is to guarantee serial invocation of Subscription (spec rule 2.7[2]) when the intermediary layers do not “own” the Subscription. The liberty we take is that we assume if we bubble up all such spec violations up to the original source, the source is in a better position to handle the violation (e.g. cancel the Subscription, propagate an error to the Subscriber, cleanup state, etc.). This enables us to simplify our operator implementations, for example in this case, if we were to handle exceptions correctly in the RsToSTSubscriber[3] we would have to do the following:

  • Add a try-catch around subscriber.onNext()
  • If onNext() throws, — Cancel the Subscription — Propagate error to the subscriber — Since we can not guarantee that we would not receive any more onNext() (spec rule 2.8 [4]), make sure if we do get an onNext() (or a terminal signal), we ignore it and do not send any further signals after sending an onError() (spec rule 1.7 [5])

In order to do all the above and also make sure we do not concurrently invoke the Subscription (as we send the same Subscription down the operator chain and a Subscriber may invoke a Subscription method at the same time as us) we have to pessimistically wrap the Subscription and account for these out-of-band cancellations. Such wrapping and concurrent invocation protection causes overheads for each operator which we try to avoid when possible. Additionally such handling has to be done on the original source anyways.

We make sure all our sources exhibit the same behavior with respect to spec violations and extend the same assumption to external sources. However, there may be sources that do not follow these guidelines and are outside our control, for which it will be a good idea to provide utilities that can adapt a source to follow these guidelines. We do not provide these utilities today but will be a good addition to ServiceTalk.

Apparently, the project won't build or even load properly on Windows+IntelliJ so I won't be able to post a PR after all.

Sorry about that, it is a known issue (#575). Since we do not use windows in our environments, supporting windows is low in our priority list but you are welcome to contribute if interested!

[1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13 [2] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.7 [3] https://github.com/apple/servicetalk/blob/96e3e198685d497e27120b5a7f78593d43e26f2d/servicetalk-concurrent-reactivestreams/src/main/java/io/servicetalk/concurrent/reactivestreams/ReactiveStreamsAdapters.java#L171 [4] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.8 [5] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7

NiteshKant avatar Nov 08 '19 18:11 NiteshKant

Also @viktorklang may find this discussion interesting.

NiteshKant avatar Nov 08 '19 18:11 NiteshKant