akka-wamp
akka-wamp copied to clipboard
[error] a.w.s.SerializationFlows - Timeout on waiting for new data
I am attempting to use your actor example to subscribe to a production web socket feed. All is well except this exception (refer to trace below). I'm not sure how to reproduce this other than connecting/subscribing and then waiting for the appropriate condition to occur. Very frequently, I have encountered this exception after leaving my actor to run in the background. I can't seem to keep my connection to the production server alive. Thanks again for making this available!
[error] a.w.s.SerializationFlows - Timeout on waiting for new data
akka.wamp.serialization.DeserializeException: Timeout on waiting for new data
at akka.wamp.serialization.JsonSerialization.make$1(JsonSerialization.scala:47)
at akka.wamp.serialization.JsonSerialization.deserialize(JsonSerialization.scala:163)
at akka.wamp.serialization.JsonSerializationFlows$$anonfun$2.apply(JsonSerializationFlows.scala:58)
at akka.wamp.serialization.JsonSerializationFlows$$anonfun$2.apply(JsonSerializationFlows.scala:53)
at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:42)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
Caused by: java.io.IOException: Timeout on waiting for new data
at akka.stream.impl.io.InputStreamAdapter$$anonfun$read$1.apply$mcI$sp(InputStreamSinkStage.scala:147)
at akka.stream.impl.io.InputStreamAdapter$$anonfun$read$1.apply(InputStreamSinkStage.scala:132)
at akka.stream.impl.io.InputStreamAdapter$$anonfun$read$1.apply(InputStreamSinkStage.scala:132)
at akka.stream.impl.io.InputStreamAdapter.executeIfNotClosed(InputStreamSinkStage.scala:112)
at akka.stream.impl.io.InputStreamAdapter.read(InputStreamSinkStage.scala:132)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:207)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseNumber2(UTF8StreamJsonParser.java:1470)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1378)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
So after doing some digging around I modified the same code that I had in my original pull request.
// JsonSerializationFlows.scala
val deserialize: Flow[websocket.Message, wamp.Message, NotUsed] =
Flow[websocket.Message]
.mapAsync(1) {
case TextMessage.Strict(text) =>
Future.successful(serialization.deserialize(Source.single(text)))
case TextMessage.Streamed(source) =>
source.runReduce(_ + _).map ( str => serialization.deserialize(Source.single(str)))
//serialization.deserialize(source)
And the timeout waiting for new data miraculously goes away. I think the issue is with
// file JsonSerialization.scala line 39
val inputStream = source.
map(ByteString(_)).
runWith(StreamConverters.asInputStream())
Perhaps this is the same issue discussed here: https://github.com/akka/akka/issues/19392
Hi asciuu,
thanks very much for using akka-wamp Please send emails to [email protected] next time. That's the account I read the most of the time. Feel free to file a new issue on GitHub and a pull request. I'll be happy to consider your investigations and fixes.
Cheers Paolo
On Sun, Oct 30, 2016 at 1:11 AM, asciiu [email protected] wrote:
So after doing some digging around I modified the same code that I had in my original pull request.
val deserialize: Flow[websocket.Message, wamp.Message, NotUsed] = Flow[websocket.Message] .mapAsync(1) { case TextMessage.Strict(text) => Future.successful(serialization.deserialize(Source.single(text)))
case TextMessage.Streamed(source) => source.runReduce(_ + _).map ( str => serialization.deserialize(Source.single(str))) //serialization.deserialize(source)
And the timeout waiting for new data miraculously goes away. I think the issue is with
// file JsonSerialization.scala line 39 val inputStream = source. map(ByteString(_)). runWith(StreamConverters.asInputStream())
Perhaps this is the same issue discussed here: akka/akka#19392 https://github.com/akka/akka/issues/19392
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/angiolep/akka-wamp/issues/43#issuecomment-257125617, or mute the thread https://github.com/notifications/unsubscribe-auth/ACIVuhJKAbZQPKYMB_n0IiDt_RvjPGAXks5q4-7UgaJpZM4KggQ8 .
I see this timeout in the Poloniex example application (https://github.com/angiolep/akka-wamp/blob/master/examples/poloniex/src/main/java/PoloniexJavaClient.java).
Change the subscription on line 12 from "BTC_XMR"
to "ticker"
, and run the example, like this:
import akka.actor.*;
import akka.wamp.client.japi.*;
public class PoloniexJavaClient {
public static void main(String[] aaa) {
ActorSystem actorSystem = ActorSystem.create();
Client client = Client.create(actorSystem);
client.connect("wss://api.poloniex.com", "json").thenAccept(conn -> {
conn.open("realm1").thenAccept(session -> {
session.subscribe("ticker", event -> {
System.out.printf("%s --> %s\n", event.kwargs(), event.args());
});
});
});
}
}
This always causes the akka.wamp.serialization.DeserializeException: Timeout on waiting for new data
within 2 minutes of running.