finagle-websocket
finagle-websocket copied to clipboard
Websocket server replying Futures
I'm making some experiments with finagle-websocket module. I would like the server to reply frames that it currently does not have, represented by Futures.
The code above illustrates my question:
package me.zup.tsung.cli
import java.net.URI
import com.twitter.concurrent.AsyncStream
import com.twitter.conversions.time._
import com.twitter.finagle.util.DefaultTimer
import com.twitter.finagle.websocket.{Frame, Request, Response}
import com.twitter.finagle.{Service, Websocket}
import com.twitter.util.{Await, Future}
object ClientTest {
val client = Websocket.client.newService(":14000")
val req = Request(new URI("/"), Map.empty, null, AsyncStream.empty[Frame])
def start() = client(req).map(_.messages.foreach {
case Frame.Text(m) => println(m)
case _ => println("unknown")
})
}
object Server extends App {
implicit val timer = DefaultTimer.twitter
def handler(): AsyncStream[Frame] = {
Frame.Text("hello") +:: Frame.Text("world") +:: AsyncStream.fromFuture(Future.sleep(1.second).map((_) => Frame.Text("again")))
}
val server = Websocket.serve(":14000", new Service[Request, Response] {
def apply(req: Request): Future[Response] = {
Future.value(Response(handler()))
}
})
ClientTest.start()
Await.ready(server)
}
The expected output is:
hello
world
again # after 1 second
But all I get is:
hello
world
<hanging>
Am I missing something?
Yeah, I would expect that too. I'll have a look at this on the weekend.
Hi @diegossilveira – OK, this is slightly tricky (and sorry to have taken so long to respond). The explanation for this (odd) result is that the stream is closing before the server can respond. It's closing because the end of the request stream is interpreted by the server as a close initiated by the client.
If you change your request to
val p = new Promise[AsyncStream[Frame]]
val frames = AsyncStream.flatten(AsyncStream.fromFuture(p))
val req = Request(new URI("/"), Map.empty, null, frames)
you will see the "again". However, this presents another problem: the client request needs to close eventually. Something must call p.setValue(AsyncStream.empty)
.