XChange
XChange copied to clipboard
[Binance] Streams does not work on Android
- Xchange version: 5.0.14-SNAPSHOT
- Kotlin JVM: 1.8
The following code is executed under Kotlin coroutine in ViewModelScope with IO dispatcher.
Kotlin code:
val binanceSpecification = BinanceStreamingExchange().defaultExchangeSpecification
binanceSpecification.apiKey = "MY_API_KEY"
binanceSpecification.secretKey = "MY_SECRET_KEY"
val binanceStream = StreamingExchangeFactory.INSTANCE.createExchange(binanceSpecification)
binanceStream.connect(ProductSubscription.create().addAll(CurrencyPair.ADA_BNB).build()).blockingAwait()
StackTrace:
E/AndroidRuntime: FATAL EXCEPTION: nioEventLoopGroup-3-1
Process: com.gsoultos.myapp, PID: 9724
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | info.bitrich.xchangestream.service.exception.NotConnectedException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onError(ObservableRefCount.java:234)
at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onError(ObservablePublishAlt.java:187)
at io.reactivex.internal.observers.DisposableLambdaObserver.onError(DisposableLambdaObserver.java:65)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.tryOnError(ObservableCreate.java:84)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:72)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.lambda$subscribeChannel$13$info-bitrich-xchangestream-service-netty-NettyStreamingService(NettyStreamingService.java:391)
at info.bitrich.xchangestream.service.netty.NettyStreamingService$$ExternalSyntheticLambda14.subscribe(Unknown Source:8)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservablePublishAlt.connect(ObservablePublishAlt.java:79)
at io.reactivex.internal.operators.observable.ObservableRefCount.subscribeActual(ObservableRefCount.java:87)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.Observable.subscribe(Observable.java:12270)
at io.reactivex.Observable.subscribe(Observable.java:12172)
at info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.triggerObservableBody(BinanceStreamingMarketDataService.java:468)
at info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.initTickerSubscription(BinanceStreamingMarketDataService.java:279)
at info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.$r8$lambda$avl4gXZeUTR_JSyr59si_tPRs50(Unknown Source:0)
at info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService$$ExternalSyntheticLambda19.accept(Unknown Source:4)
at java.util.ArrayList.forEach(ArrayList.java:1262)
at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1127)
at info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.openSubscriptions(BinanceStreamingMarketDataService.java:225)
E/AndroidRuntime: at info.bitrich.xchangestream.binance.BinanceStreamingExchange.lambda$connect$0$info-bitrich-xchangestream-binance-BinanceStreamingExchange(BinanceStreamingExchange.java:123)
at info.bitrich.xchangestream.binance.BinanceStreamingExchange$$ExternalSyntheticLambda5.run(Unknown Source:4)
at io.reactivex.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onComplete(CompletablePeek.java:107)
at io.reactivex.internal.operators.completable.CompletableConcatIterable$ConcatInnerObserver.next(CompletableConcatIterable.java:105)
at io.reactivex.internal.operators.completable.CompletableConcatIterable$ConcatInnerObserver.onComplete(CompletableConcatIterable.java:77)
at io.reactivex.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onComplete(CompletablePeek.java:115)
at io.reactivex.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onComplete(CompletablePeek.java:115)
at io.reactivex.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onComplete(CompletablePeek.java:115)
at io.reactivex.internal.operators.completable.CompletableCreate$Emitter.onComplete(CompletableCreate.java:64)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.lambda$openConnection$1$info-bitrich-xchangestream-service-netty-NettyStreamingService(NettyStreamingService.java:244)
at info.bitrich.xchangestream.service.netty.NettyStreamingService$$ExternalSyntheticLambda10.operationComplete(Unknown Source:4)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
at io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96)
at io.netty.channel.DefaultChannelPromise.setSuccess(DefaultChannelPromise.java:78)
at io.netty.channel.DefaultChannelPromise.setSuccess(DefaultChannelPromise.java:73)
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:65)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
at io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler.channelRead(WebSocketClientExtensionHandler.java:125)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
E/AndroidRuntime: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:920)
Caused by: info.bitrich.xchangestream.service.exception.NotConnectedException
... 94 more
I/Process: Sending signal. PID: 9724 SIG: 9
I've tested the previous code in Java on Android and it still produces the same exception. Although the same code is working on my PC.
Uh oh. Can you pin point the code in XChange that needs to be reverted to non-Streams and submit a PR?
@timmolter I am not quite sure which part of the xchange code is responsible for this bug. It's weird that the same piece of code is working without any issue on my PC but it throws an exception on Android. I'm still debugging the Android app to identify the issue.