postgres-async-driver
postgres-async-driver copied to clipboard
Consider not calling onNext(null)
Null values can be difficult for different reactive streams implementations to handle (e.g. https://github.com/monix/monix/issues/252). I understand it can cause compatibility concerns, but possibly changing com.github.pgasync.Transaction::commit
to return Observable<?>
or Observable<Object>
instead of Observable<Void>
and return something like Optional.empty()
may be cleaner (and apply the same thing to other places where onNext(null)
is invoked).
To add some context, the problem is that interfaces like java.util.Queue
are using null
to signal an empty queue on methods such as poll()
. See: https://docs.oracle.com/javase/7/docs/api/java/util/Queue.html#poll()
This is then implemented by all concurrent queue implementations, like Java's own ConcurrentLinkedQueue, or the ones in JCTools.org.
And reactive streams implementations, such as Monix, use concurrent queues to implement buffering on asynchronous boundaries, where needed. RxJava is also using the queue implementations in JCTools. Even if you've been allowed to push null
values downstream and it worked until now, because of these queue implementations pushing nulls is basically a minefield.
Here's another clearer reason for why null
is not allowed - null
parameters are forbidden by the Reactive Streams specification, see rule 2.13 - if the Monix implementation doesn't throw a NullPointerException
then it wouldn't be compliant with that spec.
Hi,
you're correct, but due to backward compatibility I'm hesitant to change observables emitting a single null to Observable.empty(). But for example returning something like Observable<Transaction.State>
with onNext(Transaction.State.ACTIVE) from Transaction::begin would probably be a good compromise? The same goes for the other null-emitting observables.
@alaisi if that's some kind of enum, then it sounds good. IMO the problem is that Observables aren't really meant for single values. Those are usually Future
abstractions, though granted in Java those suck at least until ver 8, in @Monix we have Task
and I believe RxJava had Single
, you might want to check that out.
@alaisi - I'd say that's fine, just note you still have a compat issue if people saved off the Observable<Void>
but I agree that it is less of a compat concern than not emitting anything. I don't think you need to change begin
, just commit
and rollback
. I agree with @alexandru that you should utilize Single
if you can stand the compat break (and everywhere else where it's a single job with a single emitted value), but otherwise an observable that emits a single non-null value should be fine.
Ok, great. For Single, that will probably be taken into using when porting to RxJava 2.0.
For any others, note, this is causing me production issues at https://github.com/alaisi/postgres-async-driver/blob/71e761c677db1ce1c2b7a57b96a528ace18d61b6/src/main/java/com/github/pgasync/impl/netty/NettyPgProtocolStream.java#L304. Here is the stack trace from my Play app:
play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[MissingBackpressureException: null]]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:255)
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:182)
at play.core.server.AkkaHttpServer$$anonfun$2.applyOrElse(AkkaHttpServer.scala:310)
at play.core.server.AkkaHttpServer$$anonfun$2.applyOrElse(AkkaHttpServer.scala:308)
at scala.concurrent.Future.$anonfun$recoverWith$1(Future.scala:414)
at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:37)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
Caused by: rx.exceptions.MissingBackpressureException: null
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:353)
at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:379)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:361)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
at com.github.pgasync.impl.PgConnection$1.onNext(PgConnection.java:130)
at com.github.pgasync.impl.PgConnection$1.onNext(PgConnection.java:121)
at com.github.pgasync.impl.netty.NettyPgProtocolStream$1.onNext(NettyPgProtocolStream.java:204)
at com.github.pgasync.impl.netty.NettyPgProtocolStream$5.channelRead(NettyPgProtocolStream.java:304)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
Not really sure what to do about it (working on moving to another lib) but figured I'd post the stack trace to help anyone else Googling.