sttp
sttp copied to clipboard
[FEATURE] Stub for testing websocket stream
Original question/feature request was posted in gitter: https://gitter.im/softwaremill/sttp?at=605d9dbd563232374c459a79
Consider I have a function which sends a request via asWebSocketStream:
basicRequest
.get(uri)
.response(asWebSocketStream(Fs2Streams[F])(pipe))
.send(backend)
where backend is SttpBackend[F, Fs2Streams[F] with capabilities.WebSockets].
Given that web socket streams aren't supported by the stub, it is impossible to test such function.
A possible solution would be to provide a stub for testing web socket streams.
Did you try wrapping the stubbed web socket stream with RawStream? (https://sttp.softwaremill.com/en/latest/testing.html#testing-streams)
I've tried it just now with the following backend:
val testingBackend: SttpBackend[IO, Fs2Streams[IO] with WebSockets] = AsyncHttpClientFs2Backend
.stub[IO]
.whenAnyRequest
.thenRespond(
SttpBackendStub.RawStream(
WebSocketStub
.initialReceive(List(WebSocketFrame.text("Hello, World!")))
.thenRespond(_ => List(WebSocketFrame.text("Hello, World!")))
)
)
However it does not seem to be working (unless I've done something wrong).
Right, there's case ResponseAsWebSocketStream(_, _) => None in the stub :)
However, I don't think we can make this work, as it would require fusing the client-side pipe specified in the .response(asWebSocketStream(...)) with the server-side pipe returned by the stub response. And sttp doesn't have any knowledge of the underlying streaming implementation, meaning that it doesn't also know how to combine two pipes. So I think you'll have to test the pipes by hand.
I'll close this as won't fix as we can't really do much with a stream pipe - it's opaque for sttp. If you would have some ideas how to fix this, please reopen :)
I want to propose a solution to this one. I solved it by writing my own sttp backend stub.
import cats.effect.IO
import cats.effect.std.Queue
import sttp.{capabilities, monad}
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3.*
import sttp.client3.impl.cats.implicits.*
import sttp.ws.WebSocketFrame
import sttp.model.StatusCode
class Fs2StreamsWebsocketBackend(
initialFrames: List[WebSocketFrame],
serverPipe: fs2.Pipe[IO, WebSocketFrame, WebSocketFrame],
serverQueue: Queue[IO, WebSocketFrame]
) extends SttpBackend[IO, Fs2Streams[IO] & capabilities.WebSockets] {
override def send[T, R >: Fs2Streams[IO] with capabilities.WebSockets with capabilities.Effect[IO]](request: Request[T, R]): IO[Response[T]] = {
respond(request.response).getOrElse(throw new RuntimeException("Not implemented"))
}
private def respond[T](
ra: ResponseAs[T, _],
): Option[IO[Response[T]]] = {
ra match {
case ResponseAsWebSocketStream(_, pipe1) =>
val clientSideWebsocketPipe = pipe1.asInstanceOf[fs2.Pipe[IO, WebSocketFrame, WebSocketFrame]]
val response: IO[Response[T]] = IO.never[Response[T]]
val source = fs2.Stream.fromQueueUnterminated(serverQueue)
val runStream = source.through(clientSideWebsocketPipe)
.through(serverPipe).flatMap {
frame => fs2.Stream.eval(serverQueue.offer(frame)) >> fs2.Stream.empty
}.compile.drain.void.start
(serverQueue.tryOfferN(initialFrames) >> runStream >> response).some
case MappedResponseAs(raw: ResponseAs[_, _], _, _) =>
respond(raw).map(_.map(_.asInstanceOf[Response[T]]))
case ResponseAsFromMetadata(conditions, _) =>
respond(conditions.head.responseAs)
case _ =>
throw new RuntimeException("Not implemented")
}
}
override def close(): IO[Unit] = IO.unit
override def responseMonad: monad.MonadError[IO] = monad.MonadError[IO]
}
import cats.effect.IO
import cats.effect.std.Queue
import com.typesafe.scalalogging.StrictLogging
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3.*
import sttp.ws.WebSocketFrame
import cats.effect.unsafe.implicits.global
import scala.concurrent.duration.DurationInt
class SttpStubSpec extends org.scalatest.freespec.AnyFreeSpec with StrictLogging {
"sttp should allow to mock streams" in {
val serverLogic: fs2.Pipe[IO, WebSocketFrame, WebSocketFrame] = {
_.flatMap {
case text: WebSocketFrame.Text if text.payload.startsWith("numbers") =>
fs2.Stream.eval(IO(logger.info("S: Received message: " + text.payload))) >>
fs2.Stream.emits(List(WebSocketFrame.text("1"), WebSocketFrame.text("2")))
case text: WebSocketFrame.Text =>
fs2.Stream.eval(IO(logger.info("Server - received message: " + text.payload))) >>
fs2.Stream.empty
case _ => fs2.Stream.empty
}
}
val backend: IO[Fs2StreamsWebsocketBackend] =
for {
queue <- Queue.unbounded[IO, WebSocketFrame]
} yield new Fs2StreamsWebsocketBackend(
List(WebSocketFrame.text("hello")),
serverLogic,
queue
)
backend.flatMap { backend =>
val pipe: fs2.Pipe[IO, WebSocketFrame, WebSocketFrame] = {
input =>
val readFromWebSocket: fs2.Stream[IO, WebSocketFrame] = input.flatMap {
case WebSocketFrame.Text(payload, _, _) =>
fs2.Stream.eval(IO(logger.info("Client - received message: " + payload))) >>
fs2.Stream.empty[IO]
case _ =>
fs2.Stream.emit(WebSocketFrame.text("error"))
}
val subscribeToWallets = fs2.Stream.emit[IO, WebSocketFrame](WebSocketFrame.text("numbers"))
subscribeToWallets.merge(readFromWebSocket)
}
val value = basicRequest
.response(asWebSocketStream(Fs2Streams[IO])(pipe))
.get(uri"wss://echo.websocket.org")
.send(backend)
(value >> IO.sleep(3.seconds))
}.unsafeRunSync()
}
}
It's mvp-ish implementation to satisfy my needs but with a few changes to SttpBackendStab it could be fairly easy to convert it into an extension of it. If tryAdjustResponseBody wouldn't be an object method, but a part of the class and have an implementation like:
private[client3] def tryAdjustResponseBody[F[_], T, U](
ra: ResponseAs[T, _],
b: U,
meta: ResponseMetadata
)(implicit monad: MonadError[F]): Option[F[T]] = {
customResponseHandling(ra,b,meta) orElse defaultImplementation(ra,b,meta)
}
then it would be enough to extend the stub for particular types and override customResponseHandling (which would be empty by default) to achieve the same.
@Zuchos yeah you're right, we could add customBodyAdjustments or sth like that as a parameter to the backend stub, just as we have matchers, or customEncodingHandler in regular backends. This should be possible in sttp4 since we don't have to keep bincompat there (yet :) ). I'll reopen this then.
Additional note for sttp3, today I've spent quite some time debugging why my streams are not consumed. In the end I found the magical case ResponseAsWebSocketStream(_, _) => None that was responsible.
Do you think we could raise an error instead of failing silently in this case?
@Krever yeah you're right, throwing errors might be better than returning None. We can try making this cahnge in sttp4, and we'll see if something somewhere breaks