grails-data-mapping
grails-data-mapping copied to clipboard
samples of back pressure which can easily happen
just for example. no need to merge if you don't want to implement back pressure handling. the test can run very long time or break the build with OOME.
cc @graemerocher
Nice example @musketyr .
I tried using RxJava's support for reactive pull backpressure:
void "example of using reactive pull"() {
when:"All Freds are listed"
AtomicInteger count = new AtomicInteger(0)
CountDownLatch latch = new CountDownLatch(1)
Simple.where {
name ==~ ~/Fred.+/
}
.toObservable()
.doOnNext { println "Now we have $it.name on the stage..."}
.subscribe(new Subscriber<Simple>() {
int inflight
@Override
void onStart() {
inflight = 10
request(10)
}
@Override
void onCompleted() {
latch.countDown()
}
@Override
void onError(Throwable e) {
}
@Override
void onNext(Simple simple) {
// emulates slow processing
Thread.sleep(1)
count.incrementAndGet()
if(--inflight == 0) {
inflight = 10
request(10)
}
}
})
then:"The results are correct"
latch.await(60, TimeUnit.SECONDS)
count.get() == 25000
}
I think that the "reactive pull backpressure" pattern is meant to be used to prevent the backpressure problems you were describing. It would be nice to make sure that it's fully supported in RxGorm.
Yeah, I can see you have the same problem as me when I run it. The problem with the request(n)
is that it must be supported by producer. e.g. see
https://github.com/MetadataRegistry/ModelCataloguePlugin/blob/2.x/ModelCatalogueCorePlugin/src/groovy/org/modelcatalogue/core/rx/DetachedCriteriaOnSubscribe.java#L42
I've tried the batch operator doing the same reactive pull approach as you've did
https://github.com/MetadataRegistry/ModelCataloguePlugin/blob/2.x/ModelCatalogueCorePlugin/src/groovy/org/modelcatalogue/core/rx/BatchOperator.java#L99
but it doesn't work very well either as actually the batches were read one after another in high rate so I get into back pressure problem again. I know the solution lies somewhere here
https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java#L110
or here
https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/observables/AsyncOnSubscribe.java
but it's way beyond my Rx knowledge.
We support this in cases that we have control over, in the case of the MongoDB implementation support would have to be written in the mongodb-rx drivers.
@musketyr @graemerocher It looks like the mongodb-rx drivers support the "reactive pull" backpressure (flow control) since the producer implements the request method. I assume it's a matter of making sure (=writing tests and fixing problems) that "reactive pull" is supported for all operators in the full chain up to the mongodb rx driver. One way to test it would be to inject some wrapper/proxy in the test that intercepts the interaction with the mongodb rx driver to see if demand is signaled correctly via request method calls. I guess it's easy to break "reactive pull" backpressure since supporting reactive backpressure is optional in RxJava v1.
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.