RxJavaReactiveStreams
RxJavaReactiveStreams copied to clipboard
Examples with Interop
We need some examples for the README to demonstrate usage.
Perhaps we can add unit test examples with dependencies on other libraries to demonstrate interop?
/cc @smaldini
I’m currently looking into trying out some interop with Akka Streams, are artifacts published somewhere?
You’ll have to hold off. It’s not spec compliant yet. Once it is I’ll get some artefacts published.
Bummer; I guess I’ll look into ratpack and Reactor then (want to show something at conferences next week).
I’m actively working on this right now. Should have it working in the next day or two.
Cool, would you please ping this issue when I should try it out? Thanks!
What I've just pushed should be compliant and I think is good enough for an initial release. There's probably bugs and there are certainly performance problems, but I think it can go out.
@benjchristensen how do we get a binary out?
I'll push the buttons (since we haven't yet got the fully self-serve Travis based release process working yet like RxScala).
0.2.0 is released on BinTray (https://bintray.com/reactivex/RxJava/RxJavaReactiveStreams/0.2.0/view) and is making its way to Maven Central.
@alkemist Thank you for your work on this. Would you mind providing a section on the README or in the Wiki with basic usage examples?
Looking forward to it for my next demos :D
On Wed, Oct 29, 2014 at 5:23 PM, Ben Christensen [email protected] wrote:
0.2.0 is released on BinTray ( https://bintray.com/reactivex/RxJava/RxJavaReactiveStreams/0.2.0/view) and is making its way to Maven Central.
@alkemist https://github.com/alkemist Thank you for your work on this. Would you mind providing a section on the README or in the Wiki with basic usage examples?
— Reply to this email directly or view it on GitHub https://github.com/ReactiveX/RxJavaReactiveStreams/issues/5#issuecomment-60966573 .
Stéphane
It can be seen on Maven Central now: http://repo1.maven.org/maven2/io/reactivex/rxjava-reactive-streams/0.2.0/
Thanks a lot, @benjchristensen and @alkemist!
Pushed one example of interop with Ratpack: https://github.com/ReactiveX/RxJavaReactiveStreams/blob/0.x/examples/ratpack/src/test/java/rx/reactivestreams/example/ratpack/RatpackExamples.java#L46-46
Leaving this ticket open for more examples and some stuff in the README.
Cool, nice to see that code, that's very helpful.
@alkemist I’m trying to extract the sample code you linked to (since I cannot figure out how to run the tests you point to). The problem I am facing is that I cannot figure out how to publish ratpack 0.9.10-SNAPSHOT locally, what is the magic incantation? I tried adding the maven-publish plugin but that does not do anything when I say ./gradlew publishToMavenLocal
(and ratpack-rx:publishToMavenLocal does not exist).
@rkuhn can you use this repo: maven { url "http://oss.jfrog.org/repo" } ? can't remember for sure but I think this is where goes all the ratpack snapshots. BTW if you can share your example I want to complete it with Reactor as I keep pitching : Reactor for the backend access/data layer, Akka Streams to get into an Actor system and scale out, RxJava to bridge with some metrics and especially Hystrix right now, Ratpack to bridge with the HTTP client (WS/ESS) :dancer:
Got it working, thanks! You can find my sample project here. @benjchristensen this might be interesting for you as well.
@rkuhn Having some hard time configuring the sample build into an IDEA project :(
Thanks @rkuhn ... that code looks like a good example for my talks next week as well. Do you mind if I use it (and possibly tweak/enhance it)?
@benjchristensen By all means: use it!
@smaldini That is the reason why I created an sbt project, I cannot figure out this gradle thing ;-) You should be able to just add a gradle build if you know better how that works (I don’t use IDEA).
Beautiful
I am trying to convert the project to our own sample suite as I am having issues with both. Do you know where I can find 0.9 snapshots for Akka Streams, my build seems to complain about that.
Ah, yes: I am about to publish a suitable version, should not take more than an hour.
I'll also propose we all use our own schedulers/dispatchers to make it more obvious that we talk each other handling async back pressure :dancers:
E.g. rxjava.observeOn(Schedulers.computation()), reactorStream.dispatchOn(new Environment())
Akka Streams 0.10-M1 is on its way to Maven Central; I’ll update my code as soon as it is there.
updated the build and it still works :-)
Using Akka Streams 0.10-M1 I was playing around and I've found an issue somewhere that the backpressure isn't propagating. Not sure yet where.
package reactive_streams_interop;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;
import akka.actor.ActorSystem;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class RxAkka {
public static void main(String... args) {
final ActorSystem system = ActorSystem.create("InteropTest");
final FlowMaterializer mat = FlowMaterializer.create(system);
final AtomicInteger numEmitted = new AtomicInteger();
// RxJava Observable
Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
.doOnNext(i -> numEmitted.incrementAndGet())
.groupBy(i -> i % 2 == 0)
.take(2);
Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
// schedule odd and even on different event loops
Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation());
/* using Akka Streams */
// convert to Reactive Streams Publisher
Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);
// convert to Akka Streams Source
Source<String> stringSource = Source.from(groupPublisher).map(i -> i + " " + group.getKey());
// convert back from Akka to Rx Observable
return RxReactiveStreams.toObservable(stringSource.take(2000).runWith(Sink.<String> fanoutPublisher(1, 1), mat));
/* using only Rx */
// return asyncGroup.take(2000).map(i -> i + " " + group.getKey());
});
strings.toBlocking().forEach(System.out::println);
system.shutdown();
System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
}
}
This non-deterministically blows up with:
Exception in thread "main" java.lang.RuntimeException: rx.exceptions.MissingBackpressureException
at rx.observables.BlockingObservable.forEach(BlockingObservable.java:138)
at reactive_streams_interop.RxAkka.main(RxAkka.java:45)
Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:222)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$1$3.onNext(OperatorGroupBy.java:236)
at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:181)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupBy.java:278)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:182)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:93)
at rx.Subscriber.setProducer(Subscriber.java:143)
at rx.Subscriber.setProducer(Subscriber.java:137)
at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:39)
at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:1)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:1)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:1)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:1)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:1)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:1)
at rx.Observable.subscribe(Observable.java:7463)
at rx.observables.BlockingObservable.forEach(BlockingObservable.java:98)
... 1 more
[ERROR] [11/01/2014 22:45:52.646] [InteropTest-akka.actor.default-dispatcher-16] [akka://InteropTest/user/$a/flow-2-1-map] failure during processing
rx.exceptions.MissingBackpressureException
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:222)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$1$3.onNext(OperatorGroupBy.java:236)
at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:181)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupBy.java:278)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:182)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:93)
at rx.Subscriber.setProducer(Subscriber.java:143)
at rx.Subscriber.setProducer(Subscriber.java:137)
at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:39)
at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:1)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:1)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:1)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:1)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:1)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:1)
at rx.Observable.subscribe(Observable.java:7463)
at rx.observables.BlockingObservable.forEach(BlockingObservable.java:98)
at reactive_streams_interop.RxAkka.main(RxAkka.java:45)
This means the request
flow isn't working.
It sometimes works however:
3998 true
4000 true
Number emitted from source (should be < 6000): 7055
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-fanoutPublisher] Message [akka.stream.impl.Cancel] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-fanoutPublisher#-718931434] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-2-take#-1094054996] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-2-take#-1094054996] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-fanoutPublisher] Message [akka.stream.impl.Cancel] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-fanoutPublisher#51622231] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
It works deterministically if I use just Rx without the conversion to/from. I have not yet spent time to hunt down where the issue is occurring.
The above is done with:
compile 'io.reactivex:rxjava:1.0.0.rc.8'
compile 'io.reactivex:rxjava-reactive-streams:0.3.0'
compile 'com.typesafe.akka:akka-stream-experimental_2.11:0.10-M1'
Here is an example I'm considering using for a presentation on Tuesday. It's buggy right now (as shown above) but demonstrates the goals of interop while going through non-trivial operators (groupBy and flatMap) along with injected concurrency and thread-hopping.
Any recommendations on what to do differently that would be better? Can someone provide me a more realistic example of going from or to Akka Streams? I'd prefer to have something that is not so contrived if possible.
@smaldini I'll play more with yours next as shown in https://github.com/rkuhn/ReactiveStreamsInterop/blob/7124906fb50f9a91cee4e8d58c00853898eed239/src/main/java/com/rolandkuhn/rsinterop/JavaMain.java