spring-cqrs-arch icon indicating copy to clipboard operation
spring-cqrs-arch copied to clipboard

Evaluate using rxjava to replace the asynchronous code.

Open sleroy opened this issue 5 years ago • 3 comments

Evaluate using rxjava to replace the asynchronous code.

https://github.com/ReactiveX/RxJava

sleroy avatar Jul 13 '19 19:07 sleroy

Futures

Futures were introduced in Java 5 (2004). They're basically placeholders for a result of an operation that hasn't finished yet. Once the operation finishes, the Future will contain that result. For example, an operation can be a Runnable or Callable instance that is submitted to an ExecutorService. The submitter of the operation can use the Future object to check whether the operation isDone(), or wait for it to finish using the blocking get() method.

Example:

/**

  • A task that sleeps for a second, then returns 1 **/ public static class MyCallable implements Callable<Integer> {

    @Override public Integer call() throws Exception { Thread.sleep(1000); return 1; }

}

public static void main(String[] args) throws Exception{ ExecutorService exec = Executors.newSingleThreadExecutor(); Future<Integer> f = exec.submit(new MyCallable());

System.out.println(f.isDone()); //False

System.out.println(f.get()); //Waits until the task is done, then prints 1

}

CompletableFutures

CompletableFutures were introduced in Java 8 (2014). They are in fact an evolution of regular Futures, inspired by Google's Listenable Futures, part of the Guava library. They are Futures that also allow you to string tasks together in a chain. You can use them to tell some worker thread to "go do some task X, and when you're done, go do this other thing using the result of X". Using CompletableFutures, you can do something with the result of the operation without actually blocking a thread to wait for the result. Here's a simple example:

/**

  • A supplier that sleeps for a second, and then returns one **/ public static class MySupplier implements Supplier<Integer> {

    @Override public Integer get() { try { Thread.sleep(1000); } catch (InterruptedException e) { //Do nothing } return 1; } }

/**

  • A (pure) function that adds one to a given Integer **/ public static class PlusOne implements Function<Integer, Integer> {

    @Override public Integer apply(Integer x) { return x + 1; } }

public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newSingleThreadExecutor(); CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec); System.out.println(f.isDone()); // False CompletableFuture<Integer> f2 = f.thenApply(new PlusOne()); System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2 }

RxJava

RxJava is whole library for reactive programming created at Netflix. At a glance, it will appear to be similar to Java 8's streams. It is, except it's much more powerful.

Similarly to Futures, RxJava can be used to string together a bunch of synchronous or asynchronous actions to create a processing pipeline. Unlike Futures, which are single-use, RxJava works on streams of zero or more items. Including never-ending streams with an infinite number of items. It's also much more flexible and powerful thanks to an unbelievably rich set of operators.

Unlike Java 8's streams, RxJava also has a backpressure mechanism, which allows it to handle cases in which different parts of your processing pipeline operate in different threads, at different rates.

The downside of RxJava is that despite the solid documentation, it is a challenging library to learn due to the paradigm shift involved. Rx code can also be a nightmare to debug, especially if multiple threads are involved, and even worse - if backpressure is needed.

If you want to get into it, there's a whole page of various tutorials on the official website, plus the official documentation and Javadoc. You can also take a look at some of the videos such as this one which gives a brief intro into Rx and also talks about the differences between Rx and Futures.

Bonus: Java 9 Reactive Streams

Java 9's Reactive Streams aka Flow API are a set of Interfaces implemented by various reactive streams libraries such as RxJava 2, Akka Streams, and Vertx. They allow these reactive libraries to interconnect, while preserving the all important back-pressure.

sleroy avatar Jul 16 '19 21:07 sleroy

https://stackoverflow.com/questions/35329845/difference-between-completablefuture-future-and-rxjavas-observable/46572647

sleroy avatar Jul 16 '19 21:07 sleroy

Version 3.x (Javadoc)

single dependency: Reactive-Streams
continued support for Java 6+ & Android 2.3+
fixed API mistakes and many limits of RxJava 2
intended to be a replacement for RxJava 2 with relatively few binary incompatible changes
Java 8 lambda-friendly API
non-opinionated about source of concurrency (threads, pools, event loops, fibers, actors, etc.)
async or synchronous execution
virtual time and schedulers for parameterized concurrency
test and diagnostic support via test schedulers, test consumers and plugin hooks

Learn more about RxJava in general on the Wiki Home.

sleroy avatar Jul 16 '19 21:07 sleroy