rsocket-java icon indicating copy to clipboard operation
rsocket-java copied to clipboard

Provide `FlatMappingProcessor` as an alternative to `UnboundedProcessor`

Open OlegDokuka opened this issue 5 years ago • 0 comments

The Problem

Right now in RSocket we use UnboundedProcessor which simply is an MultiProducerSingleConsumerArrayLinkedQueue.

It means that all frames from all possible interactions are enqueued into that queue with NO backpressure control and then underlying connection drains elements with the speed it can: image

Such a design may lead to many problems including memory leaks and general application instability

Proposal

In order to have fine-grained control on all the levels, I propose to provide a customizable processor and introduce FlatMappingProcessor instead.

In a few words, FlatMappingProcessor should behave identically to FlatMap operator in ProjectReactor / RxJava

image

Main Characteristic

  1. Prefetch Mode for Flux
  2. Concurrency Control (RSocket user can specify the number of active streams)
  3. Small SpScBoundedArrayQueue for every Flux (requestChannel / requestStream cases) and general scalar MpScUnboundedLinkedArrayQueue for Mono cases

OlegDokuka avatar Mar 11 '20 14:03 OlegDokuka