grails-data-mapping icon indicating copy to clipboard operation
grails-data-mapping copied to clipboard

samples of back pressure which can easily happen

Open musketyr opened this issue 8 years ago • 5 comments

just for example. no need to merge if you don't want to implement back pressure handling. the test can run very long time or break the build with OOME.

cc @graemerocher

musketyr avatar Jun 06 '16 10:06 musketyr

Nice example @musketyr .

I tried using RxJava's support for reactive pull backpressure:

    void "example of using reactive pull"() {
        when:"All Freds are listed"
        AtomicInteger count = new AtomicInteger(0)
        CountDownLatch latch = new CountDownLatch(1)
        Simple.where {
            name ==~ ~/Fred.+/
        }
        .toObservable()
        .doOnNext { println "Now we have $it.name on the stage..."}
        .subscribe(new Subscriber<Simple>() {
            int inflight

            @Override
            void onStart() {
                inflight = 10
                request(10)
            }

            @Override
            void onCompleted() {
                latch.countDown()
            }

            @Override
            void onError(Throwable e) {

            }

            @Override
            void onNext(Simple simple) {
                // emulates slow processing
                Thread.sleep(1)
                count.incrementAndGet()
                if(--inflight == 0) {
                    inflight = 10
                    request(10)
                }
            }
        })

        then:"The results are correct"
        latch.await(60, TimeUnit.SECONDS)
        count.get() == 25000
    }

I think that the "reactive pull backpressure" pattern is meant to be used to prevent the backpressure problems you were describing. It would be nice to make sure that it's fully supported in RxGorm.

lhotari avatar Jun 07 '16 03:06 lhotari

Yeah, I can see you have the same problem as me when I run it. The problem with the request(n) is that it must be supported by producer. e.g. see

https://github.com/MetadataRegistry/ModelCataloguePlugin/blob/2.x/ModelCatalogueCorePlugin/src/groovy/org/modelcatalogue/core/rx/DetachedCriteriaOnSubscribe.java#L42

I've tried the batch operator doing the same reactive pull approach as you've did

https://github.com/MetadataRegistry/ModelCataloguePlugin/blob/2.x/ModelCatalogueCorePlugin/src/groovy/org/modelcatalogue/core/rx/BatchOperator.java#L99

but it doesn't work very well either as actually the batches were read one after another in high rate so I get into back pressure problem again. I know the solution lies somewhere here

https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java#L110

or here

https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/observables/AsyncOnSubscribe.java

but it's way beyond my Rx knowledge.

musketyr avatar Jun 07 '16 06:06 musketyr

We support this in cases that we have control over, in the case of the MongoDB implementation support would have to be written in the mongodb-rx drivers.

graemerocher avatar Jun 07 '16 07:06 graemerocher

@musketyr @graemerocher It looks like the mongodb-rx drivers support the "reactive pull" backpressure (flow control) since the producer implements the request method. I assume it's a matter of making sure (=writing tests and fixing problems) that "reactive pull" is supported for all operators in the full chain up to the mongodb rx driver. One way to test it would be to inject some wrapper/proxy in the test that intercepts the interaction with the mongodb rx driver to see if demand is signaled correctly via request method calls. I guess it's easy to break "reactive pull" backpressure since supporting reactive backpressure is optional in RxJava v1.

lhotari avatar Jun 07 '16 17:06 lhotari

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

CLAassistant avatar Nov 03 '21 14:11 CLAassistant