Early thoughts of java architecture
Customized Observable base class
let's create an abstraction EdgeChain on top of RxJava3, which will have customized versions of map, zip, and subscribe functions. We'll create two subclasses of EdgeChain, called MRKLChain and ReactChain, with different implementations for parsing. We'll also create an EndPoint class to encapsulate the API endpoint details.
First, let's define the EdgeChain class:
import io.reactivex.rxjava3.core.Observable;
public abstract class EdgeChain<T> {
protected Observable<T> observable;
protected EdgeChain(Observable<T> observable) {
this.observable = observable;
}
public abstract <R> EdgeChain<R> transform(Function<T, R> mapper);
public abstract <R> EdgeChain<R> combine(BiFunction<T, T, R> zipper);
public abstract void execute(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
}
Customized Endpoint base class
Now, let's create the EndPoint class:
public class EndPoint {
private String url;
private int maxRetries;
private BackoffStrategy backoffStrategy;
public EndPoint(String url, int maxRetries, BackoffStrategy backoffStrategy) {
this.url = url;
this.maxRetries = maxRetries;
this.backoffStrategy = backoffStrategy;
}
// Getters and setters
}
Example implementations
This implementation allows you to create customized observable abstractions with different parsing implementations in the subclasses. The EndPoint class encapsulates the API endpoint details, and the Chain class reads from the EndPoint class to call the actual URL. The Chain class has an optional EndPoint class inside, and its behavior depends on whether it's declared inside, mandatory but empty, or not necessary.
Now, you can create instances of the MRKLChain and ReactChain classes and use their transform, combine, and execute methods to perform the required operations. Remember to provide an EndPoint instance with the required API endpoint details, and the number of retries and backoff strategy.
let's create the MRKLChain subclass:
import io.reactivex.rxjava3.functions.Function;
public class MRKLChain extends EdgeChain<String> {
private EndPoint endPoint;
public MRKLChain(Observable<String> observable, EndPoint endPoint) {
super(observable);
this.endPoint = endPoint;
}
@Override
public <R> MRKLChain<R> transform(Function<String, R> mapper) {
return new MRKLChain<>(observable.map(mapper), endPoint);
}
@Override
public <R> MRKLChain<R> combine(BiFunction<String, String, R> zipper) {
return new MRKLChain<>(observable.zipWith(observable, zipper), endPoint);
}
@Override
public void execute(Consumer<? super String> onNext, Consumer<? super Throwable> onError) {
observable
.retryWhen(Retry.backoff(endPoint.getMaxRetries(), endPoint.getBackoffStrategy()))
.subscribe(onNext, onError);
}
}
Now, let's create the ReactChain subclass:
import io.reactivex.rxjava3.functions.Function;
public class ReactChain extends EdgeChain<String> {
private EndPoint endPoint;
public ReactChain(Observable<String> observable, EndPoint endPoint) {
super(observable);
this.endPoint = endPoint;
}
@Override
public <R> ReactChain<R> transform(Function<String, R> mapper) {
return new ReactChain<>(observable.map(mapper), endPoint);
}
@Override
public <R> ReactChain<R> combine(BiFunction<String, String, R> zipper) {
return new ReactChain<>(observable.zipWith(observable, zipper), endPoint);
}
@Override
public void execute(Consumer<? super String> onNext, Consumer<? super Throwable> onError) {
observable
.retryWhen(Retry.backoff(endPoint.getMaxRetries(), endPoint.getBackoffStrategy()))
.subscribe(onNext, onError);
}
}
Combined Example
Let's create an example where two chains, MRKLChain and ReactChain, are used together with transform, combine, and execute. We'll use a simple transformation function to parse and reformat the text in each chain. We'll also use the forkJoin operator from RxJava3 to combine the results of the two chains before passing them to another instance of the chain.
First, let's create some sample API endpoints and transformation functions:
EndPoint endPoint1 = new EndPoint("https://api.example.com/data1", 3, BackoffStrategy.exponential());
EndPoint endPoint2 = new EndPoint("https://api.example.com/data2", 3, BackoffStrategy.exponential());
Function<String, List<String>> mrklParser = text -> Arrays.asList(text.split("\\s+"));
Function<String, List<String>> reactParser = text -> Arrays.asList(text.split("[\\r\\n]+"));
Next, let's create instances of MRKLChain and ReactChain:
MRKLChain mrklChain = new MRKLChain(Observable.just("sample text for MRKLChain"), endPoint1);
ReactChain reactChain = new ReactChain(Observable.just("sample text\nfor ReactChain"), endPoint2);
Now, let's transform the data, combine the results using forkJoin, and execute the combined chain:
Observable<List<String>> mrklTransformed = mrklChain.transform(mrklParser).observable;
Observable<List<String>> reactTransformed = reactChain.transform(reactParser).observable;
Observable<List<String>> combined = Observable.combineLatest(mrklTransformed, reactTransformed, (mrklData, reactData) -> {
List<String> result = new ArrayList<>(mrklData);
result.addAll(reactData);
return result;
});
MRKLChain combinedChain = new MRKLChain(combined, endPoint1);
combinedChain.execute(
result -> System.out.println("Combined result: " + result),
error -> System.err.println("Error: " + error)
);
In this example, the MRKLChain and ReactChain instances are created with sample text and API endpoints. The transformation functions, mrklParser and reactParser, are used to parse and reformat the text. The forkJoin operator is used to combine the results of the two chains before passing them to another instance of the chain (combinedChain). The execute method is used to process the combined result or handle any errors.
This demonstrates how to use the customized observable abstractions to call external APIs, parse and reformat the result, and pass the reformatted data to another instance of the abstraction for further processing.
Putting it inside Spring Boot Webflux
To integrate the EdgeChain abstractions into a Spring Boot Webflux application, you can create a Spring Webflux controller for each chain and expose them as RESTful API endpoints. By doing this, you can call the chain endpoints from your code or any other external clients, without worrying about whether they are executing on your local machine or a remote endpoint.
Let's continue using RxJava3 with Spring Boot Webflux. You can use Observable directly in the controller and return a Mono by converting the Observable using the from method. This way, we can continue using RxJava3, and the extra configuration needed is minimal.
First, let's create a ChainController class that will handle the incoming requests and delegate the processing to the appropriate chain instances:
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/chains")
public class ChainController {
@PostMapping("/mrkl")
public Mono<ResponseEntity<String>> processMRKLChain(@RequestBody String input) {
// Create MRKLChain instance and process the input
// Return the result as a Mono<ResponseEntity<String>>
}
@PostMapping("/react")
public Mono<ResponseEntity<String>> processReactChain(@RequestBody String input) {
// Create ReactChain instance and process the input
// Return the result as a Mono<ResponseEntity<String>>
}
}
Now, let's implement the processMRKLChain and processReactChain methods in the ChainController. You can keep the execute method in the EdgeChain, MRKLChain, and ReactChain classes as they were before:
@PostMapping("/mrkl")
public Mono<ResponseEntity<String>> processMRKLChain(@RequestBody String input) {
EndPoint endPoint = new EndPoint("https://api.example.com/data1", 3, BackoffStrategy.exponential());
MRKLChain mrklChain = new MRKLChain(Observable.just(input), endPoint);
return Mono.from(mrklChain.transform(mrklParser).observable)
.map(result -> ResponseEntity.ok().body(result))
.onErrorResume(e -> Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error: " + e.getMessage())));
}
@PostMapping("/react")
public Mono<ResponseEntity<String>> processReactChain(@RequestBody String input) {
EndPoint endPoint = new EndPoint("https://api.example.com/data2", 3, BackoffStrategy.exponential());
ReactChain reactChain = new ReactChain(Observable.just(input), endPoint);
return Mono.from(reactChain.transform(reactParser).observable)
.map(result -> ResponseEntity.ok().body(result))
.onErrorResume(e -> Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error: " + e.getMessage())));
}
In this implementation, we create MRKLChain and ReactChain instances and process the input using the transform method. The result is an Observable, which is then converted to a Mono using the Mono.from method. This approach allows you to keep using RxJava3 in your Spring Boot Webflux application and expose the chains as RESTful API endpoints. When you call these endpoints from your code or an external client, they will process the input using the corresponding chain and return the result.
cleaning up response classes, etc. Pure cosmetics
Sure, we can create a custom class ArkResponse that wraps the Mono<ResponseEntity<String>> and provides a method to create an instance of ArkResponse from an Observable<String>.
Here's the updated ArkResponse class:
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;
import io.reactivex.rxjava3.core.Observable;
public class ArkResponse {
private final Mono<ResponseEntity<String>> response;
private ArkResponse(Mono<ResponseEntity<String>> response) {
this.response = response;
}
public static ArkResponse fromObservable(Observable<String> observable) {
Mono<ResponseEntity<String>> mono = Mono.from(observable)
.map(result -> ResponseEntity.ok().body(result))
.onErrorResume(e -> Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error: " + e.getMessage())));
return new ArkResponse(mono);
}
public Mono<ResponseEntity<String>> getResponse() {
return response;
}
}
Now, let's update the ChainController to return ArkResponse instead of Mono<ResponseEntity<String>>:
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/chains")
public class ChainController {
@PostMapping("/mrkl")
public ArkResponse processMRKLChain(@RequestBody String input) {
// Create MRKLChain instance and process the input
// Return the result as an ArkResponse
}
@PostMapping("/react")
public ArkResponse processReactChain(@RequestBody String input) {
// Create ReactChain instance and process the input
// Return the result as an ArkResponse
}
}
Update the processMRKLChain and processReactChain methods in the ChainController to use ArkResponse.fromObservable:
@PostMapping("/mrkl")
public ArkResponse processMRKLChain(@RequestBody String input) {
EndPoint endPoint = new EndPoint("https://api.example.com/data1", 3, BackoffStrategy.exponential());
MRKLChain mrklChain = new MRKLChain(Observable.just(input), endPoint);
return ArkResponse.fromObservable(mrklChain.transform(mrklParser).observable);
}
@PostMapping("/react")
public ArkResponse processReactChain(@RequestBody String input) {
EndPoint endPoint = new EndPoint("https://api.example.com/data2", 3, BackoffStrategy.exponential());
ReactChain reactChain = new ReactChain(Observable.just(input), endPoint);
return ArkResponse.fromObservable(reactChain.transform(reactParser).observable);
}
With this implementation, the ArkResponse class wraps the Mono<ResponseEntity<String>> and the ChainController methods return ArkResponse instances, reducing verbosity in the controller. To access the underlying Mono<ResponseEntity<String>> when needed, you can use the getResponse() method provided by the ArkResponse class.
tying it all together
In this example, we'll use two chains, MRKLChain and ReactChain, to transform and reformat the input text using simple transformation functions. Then, we'll combine the results of these two chains using the forkJoin operator from RxJava3 and pass the combined result to another instance of the chain.
First, let's create simple transformation functions for MRKLChain and ReactChain. We'll use a lambda expression to define these functions:
Function<String, String> mrklParser = input -> "MRKL: " + input.toUpperCase();
Function<String, String> reactParser = input -> "React: " + input.toLowerCase();
Now, let's create instances of MRKLChain and ReactChain and apply the transformation functions using the transform method:
EndPoint endPoint1 = new EndPoint("https://api.example.com/data1", 3, BackoffStrategy.exponential());
MRKLChain mrklChain = new MRKLChain(Observable.just("Hello MRKL"), endPoint1);
Observable<String> mrklTransformed = mrklChain.transform(mrklParser).observable;
EndPoint endPoint2 = new EndPoint("https://api.example.com/data2", 3, BackoffStrategy.exponential());
ReactChain reactChain = new ReactChain(Observable.just("Hello React"), endPoint2);
Observable<String> reactTransformed = reactChain.transform(reactParser).observable;
Next, let's use the forkJoin operator from RxJava3 to combine the results of the two chains:
import io.reactivex.rxjava3.core.Observable;
Observable<String> combined = Observable.forkJoin(mrklTransformed, reactTransformed, (mrklResult, reactResult) -> mrklResult + " | " + reactResult);
Now, we'll create another instance of the chain (e.g., MRKLChain) and pass the combined result to it. We'll also apply a transformation function that concatenates a string to the result:
Function<String, String> combinedParser = input -> "Combined: " + input;
MRKLChain combinedChain = new MRKLChain(combined, endPoint1);
Observable<String> combinedTransformed = combinedChain.transform(combinedParser).observable;
Finally, let's create a new endpoint in the ChainController that uses the combined chain:
@PostMapping("/combined")
public ArkResponse processCombinedChain() {
return ArkResponse.fromObservable(combinedTransformed);
}
In this example, we created instances of MRKLChain and ReactChain, applied simple transformation functions to reformat the input text, combined the results using the forkJoin operator, and passed the combined result to another instance of the chain. We also added a new endpoint in the ChainController to handle the combined chain.
Hello,
This sounds interesting and a great learning opportunity. I'd love to contribute to it under GSSoC'23 and learn. Please assign this task to me.