XChange icon indicating copy to clipboard operation
XChange copied to clipboard

[Binance] Streams does not work on Android

Open gsoultos opened this issue 2 years ago • 2 comments

  • Xchange version: 5.0.14-SNAPSHOT
  • Kotlin JVM: 1.8

The following code is executed under Kotlin coroutine in ViewModelScope with IO dispatcher.

Kotlin code:

val binanceSpecification = BinanceStreamingExchange().defaultExchangeSpecification
binanceSpecification.apiKey = "MY_API_KEY"
binanceSpecification.secretKey = "MY_SECRET_KEY"
val binanceStream = StreamingExchangeFactory.INSTANCE.createExchange(binanceSpecification)
binanceStream.connect(ProductSubscription.create().addAll(CurrencyPair.ADA_BNB).build()).blockingAwait()

StackTrace:

E/AndroidRuntime: FATAL EXCEPTION: nioEventLoopGroup-3-1
    Process: com.gsoultos.myapp, PID: 9724
    io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | info.bitrich.xchangestream.service.exception.NotConnectedException
        at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
        at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
        at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
        at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
        at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
        at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
        at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onError(ObservableRefCount.java:234)
        at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onError(ObservablePublishAlt.java:187)
        at io.reactivex.internal.observers.DisposableLambdaObserver.onError(DisposableLambdaObserver.java:65)
        at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.tryOnError(ObservableCreate.java:84)
        at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:72)
        at info.bitrich.xchangestream.service.netty.NettyStreamingService.lambda$subscribeChannel$13$info-bitrich-xchangestream-service-netty-NettyStreamingService(NettyStreamingService.java:391)
        at info.bitrich.xchangestream.service.netty.NettyStreamingService$$ExternalSyntheticLambda14.subscribe(Unknown Source:8)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
        at io.reactivex.Observable.subscribe(Observable.java:12284)
        at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
        at io.reactivex.Observable.subscribe(Observable.java:12284)
        at io.reactivex.internal.operators.observable.ObservablePublishAlt.connect(ObservablePublishAlt.java:79)
        at io.reactivex.internal.operators.observable.ObservableRefCount.subscribeActual(ObservableRefCount.java:87)
        at io.reactivex.Observable.subscribe(Observable.java:12284)
        at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
        at io.reactivex.Observable.subscribe(Observable.java:12284)
        at io.reactivex.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30)
        at io.reactivex.Observable.subscribe(Observable.java:12284)
        at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
        at io.reactivex.Observable.subscribe(Observable.java:12284)
        at io.reactivex.Observable.subscribe(Observable.java:12270)
        at io.reactivex.Observable.subscribe(Observable.java:12172)
        at info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.triggerObservableBody(BinanceStreamingMarketDataService.java:468)
        at info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.initTickerSubscription(BinanceStreamingMarketDataService.java:279)
        at info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.$r8$lambda$avl4gXZeUTR_JSyr59si_tPRs50(Unknown Source:0)
        at info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService$$ExternalSyntheticLambda19.accept(Unknown Source:4)
        at java.util.ArrayList.forEach(ArrayList.java:1262)
        at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1127)
        at info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.openSubscriptions(BinanceStreamingMarketDataService.java:225)
E/AndroidRuntime:     at info.bitrich.xchangestream.binance.BinanceStreamingExchange.lambda$connect$0$info-bitrich-xchangestream-binance-BinanceStreamingExchange(BinanceStreamingExchange.java:123)
        at info.bitrich.xchangestream.binance.BinanceStreamingExchange$$ExternalSyntheticLambda5.run(Unknown Source:4)
        at io.reactivex.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onComplete(CompletablePeek.java:107)
        at io.reactivex.internal.operators.completable.CompletableConcatIterable$ConcatInnerObserver.next(CompletableConcatIterable.java:105)
        at io.reactivex.internal.operators.completable.CompletableConcatIterable$ConcatInnerObserver.onComplete(CompletableConcatIterable.java:77)
        at io.reactivex.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onComplete(CompletablePeek.java:115)
        at io.reactivex.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onComplete(CompletablePeek.java:115)
        at io.reactivex.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onComplete(CompletablePeek.java:115)
        at io.reactivex.internal.operators.completable.CompletableCreate$Emitter.onComplete(CompletableCreate.java:64)
        at info.bitrich.xchangestream.service.netty.NettyStreamingService.lambda$openConnection$1$info-bitrich-xchangestream-service-netty-NettyStreamingService(NettyStreamingService.java:244)
        at info.bitrich.xchangestream.service.netty.NettyStreamingService$$ExternalSyntheticLambda10.operationComplete(Unknown Source:4)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
        at io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96)
        at io.netty.channel.DefaultChannelPromise.setSuccess(DefaultChannelPromise.java:78)
        at io.netty.channel.DefaultChannelPromise.setSuccess(DefaultChannelPromise.java:73)
        at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:65)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
        at io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler.channelRead(WebSocketClientExtensionHandler.java:125)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
E/AndroidRuntime:     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:920)
     Caused by: info.bitrich.xchangestream.service.exception.NotConnectedException
        	... 94 more
I/Process: Sending signal. PID: 9724 SIG: 9

I've tested the previous code in Java on Android and it still produces the same exception. Although the same code is working on my PC.

gsoultos avatar Mar 22 '22 18:03 gsoultos

Uh oh. Can you pin point the code in XChange that needs to be reverted to non-Streams and submit a PR?

timmolter avatar Mar 25 '22 15:03 timmolter

@timmolter I am not quite sure which part of the xchange code is responsible for this bug. It's weird that the same piece of code is working without any issue on my PC but it throws an exception on Android. I'm still debugging the Android app to identify the issue.

gsoultos avatar Mar 26 '22 09:03 gsoultos