RxJava
RxJava copied to clipboard
Feature Request: Add `Observable.usingWhen()` for resources generated by a `Publisher`
Version: 3.1.6
In Reactor, there is Mono.usingWhen()
, which works similarly to Mono.using()
, but supports resources that are generated and cleaned up with a Publisher
.
For example, here's a use case in r2dbc-pool
:
Mono<Object> result = Mono.usingWhen(
connectionFactory.create(), // Publisher<Connection>
connection -> executeQuery(connection),
Connection::close // Publisher<Void>
);
In RxJava, we have Single.using()
, which works the same way as Mono.using()
, so there's no support for reactive resource generation/cleanup. I was thinking of doing something like this:
Single<Object> result = Single
.fromPublisher(connectionFactory.create())
.flatMap(connection -> executeQuery(connection)
.flatMap(result -> Completable
.fromPublisher(connection.close())
.toSingleDefault(result)));
However, this does not handle cases like connection cleanup on error, dispose or terminate. The .doOnXyz()
methods would not suffice, so I'm guessing this would require a custom Observable
implementation, unless I'm missing something obvious. I've also checked RxJavaExtensions
for anything similar to what I'm trying to achieve, but I was not able to find anything, nor could I find anything relevant on Stack Overflow.
I currently have the option of just using Mono.usingWhen()
and then convert it to a Single
, but it would be nice to have this natively available in RxJava. Would you be willing to add support for usingWhen()
?
One way to handle resource cleanup in RxJava is to use the using operator to create a Disposable object that handles the cleanup logic. You can combine this with the Single.defer operator to create a single that generates the resource and executes your query, with the Disposable taking care of the cleanup.
Here's an exmp: Single<Object> result = Single.defer(() -> { Connection connection = connectionFactory.create().blockingGet(); return executeQuery(connection) .doFinally(() -> connection.close()); });
In this example, the defer operator is used to delay the creation of the connection until the Single is subscribed to. The blockingGet method is used to block and wait for the connection to be created. Then, executeQuery is called with the connection, and the doFinally operator is used to clean up the connection when the Single completes or errors out.
You could also create a custom operator that wraps the using operator and converts the resulting Mono to a Single. However, this would require more code than the previous example and may not be necessary for your use case.
I hope this helps! :)
@KarboniteKream Thanks for the feedback. I'm still thinking about the implementation details and support considerations. Stay tuned.
@Mahammadnajaf Suggesting blocking operations is 99.99% wrong.
well Single result = Single.defer(() -> { Connection connection = connectionFactory.create().blockingGet(); return executeQuery(connection) .doFinally(() -> connection.close()); });
In this code, the blockingGet method is only used once to retrieve the Connection object, which should be fine as long as it is done in a non-blocking context (such as during application startup or initialization).
However, if you need to avoid blocking operations entirely, you could modify the code to use reactive streams instead of blocking operations. Here's an example:
Single result = Single.defer(() -> { return connectionFactory.create() .flatMap(connection -> executeQuery(connection) .doFinally(() -> connection.close())); });
In this example, the create method returns a Mono that emits a single Connection object when it is available. The flatMap operator is used to execute the executeQuery method with the Connection object, and the doFinally operator is used to close the Connection when the Single completes or errors out.
This code should not block, as all operations are done in a reactive, non-blocking manner.