sttp icon indicating copy to clipboard operation
sttp copied to clipboard

[FEATURE] Stub for testing websocket stream

Open Kirill5k opened this issue 3 years ago • 8 comments
trafficstars

Original question/feature request was posted in gitter: https://gitter.im/softwaremill/sttp?at=605d9dbd563232374c459a79

Consider I have a function which sends a request via asWebSocketStream:

basicRequest
      .get(uri)
      .response(asWebSocketStream(Fs2Streams[F])(pipe))
      .send(backend)

where backend is SttpBackend[F, Fs2Streams[F] with capabilities.WebSockets].

Given that web socket streams aren't supported by the stub, it is impossible to test such function.

A possible solution would be to provide a stub for testing web socket streams.

Kirill5k avatar Jul 11 '22 06:07 Kirill5k

Did you try wrapping the stubbed web socket stream with RawStream? (https://sttp.softwaremill.com/en/latest/testing.html#testing-streams)

adamw avatar Jul 15 '22 15:07 adamw

I've tried it just now with the following backend:

  val testingBackend: SttpBackend[IO, Fs2Streams[IO] with WebSockets] = AsyncHttpClientFs2Backend
    .stub[IO]
    .whenAnyRequest
    .thenRespond(
      SttpBackendStub.RawStream(
        WebSocketStub
          .initialReceive(List(WebSocketFrame.text("Hello, World!")))
          .thenRespond(_ => List(WebSocketFrame.text("Hello, World!")))
      )
    )

However it does not seem to be working (unless I've done something wrong).

Kirill5k avatar Jul 15 '22 15:07 Kirill5k

Right, there's case ResponseAsWebSocketStream(_, _) => None in the stub :)

However, I don't think we can make this work, as it would require fusing the client-side pipe specified in the .response(asWebSocketStream(...)) with the server-side pipe returned by the stub response. And sttp doesn't have any knowledge of the underlying streaming implementation, meaning that it doesn't also know how to combine two pipes. So I think you'll have to test the pipes by hand.

adamw avatar Jul 20 '22 19:07 adamw

I'll close this as won't fix as we can't really do much with a stream pipe - it's opaque for sttp. If you would have some ideas how to fix this, please reopen :)

adamw avatar Jul 20 '22 19:07 adamw

I want to propose a solution to this one. I solved it by writing my own sttp backend stub.

import cats.effect.IO
import cats.effect.std.Queue
import sttp.{capabilities, monad}
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3.*
import sttp.client3.impl.cats.implicits.*
import sttp.ws.WebSocketFrame

import sttp.model.StatusCode

class Fs2StreamsWebsocketBackend(
    initialFrames: List[WebSocketFrame],
    serverPipe: fs2.Pipe[IO, WebSocketFrame, WebSocketFrame],
    serverQueue: Queue[IO, WebSocketFrame]
) extends SttpBackend[IO, Fs2Streams[IO] & capabilities.WebSockets] {

  override def send[T, R >: Fs2Streams[IO] with capabilities.WebSockets with capabilities.Effect[IO]](request: Request[T, R]): IO[Response[T]] = {
    respond(request.response).getOrElse(throw new RuntimeException("Not implemented"))
  }

  private def respond[T](
      ra: ResponseAs[T, _],
  ): Option[IO[Response[T]]] = {
    ra match {
      case ResponseAsWebSocketStream(_, pipe1) =>
        val clientSideWebsocketPipe   = pipe1.asInstanceOf[fs2.Pipe[IO, WebSocketFrame, WebSocketFrame]]
        val response: IO[Response[T]] = IO.never[Response[T]]
        val source                    = fs2.Stream.fromQueueUnterminated(serverQueue)
        val runStream = source.through(clientSideWebsocketPipe)
          .through(serverPipe).flatMap {
            frame => fs2.Stream.eval(serverQueue.offer(frame)) >> fs2.Stream.empty
          }.compile.drain.void.start
        (serverQueue.tryOfferN(initialFrames) >> runStream >> response).some
      case MappedResponseAs(raw: ResponseAs[_, _], _, _) =>
        respond(raw).map(_.map(_.asInstanceOf[Response[T]]))
      case ResponseAsFromMetadata(conditions, _) =>
        respond(conditions.head.responseAs)
      case _ =>
        throw new RuntimeException("Not implemented")
    }
  }

  override def close(): IO[Unit] = IO.unit

  override def responseMonad: monad.MonadError[IO] = monad.MonadError[IO]
}
import cats.effect.IO
import cats.effect.std.Queue
import com.typesafe.scalalogging.StrictLogging
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3.*
import sttp.ws.WebSocketFrame
import cats.effect.unsafe.implicits.global

import scala.concurrent.duration.DurationInt

class SttpStubSpec extends org.scalatest.freespec.AnyFreeSpec with StrictLogging {

  "sttp should allow to mock streams" in {
    val serverLogic: fs2.Pipe[IO, WebSocketFrame, WebSocketFrame] = {
      _.flatMap {
        case text: WebSocketFrame.Text if text.payload.startsWith("numbers") =>
          fs2.Stream.eval(IO(logger.info("S: Received message: " + text.payload))) >>
            fs2.Stream.emits(List(WebSocketFrame.text("1"), WebSocketFrame.text("2")))
        case text: WebSocketFrame.Text =>
          fs2.Stream.eval(IO(logger.info("Server - received message: " + text.payload))) >>
            fs2.Stream.empty
        case _ => fs2.Stream.empty
      }
    }

    val backend: IO[Fs2StreamsWebsocketBackend] =
      for {
        queue <- Queue.unbounded[IO, WebSocketFrame]
      } yield new Fs2StreamsWebsocketBackend(
        List(WebSocketFrame.text("hello")),
        serverLogic,
        queue
      )

    backend.flatMap { backend =>
      val pipe: fs2.Pipe[IO, WebSocketFrame, WebSocketFrame] = {
        input =>
          val readFromWebSocket: fs2.Stream[IO, WebSocketFrame] = input.flatMap {
            case WebSocketFrame.Text(payload, _, _) =>
              fs2.Stream.eval(IO(logger.info("Client - received message: " + payload))) >>
                fs2.Stream.empty[IO]
            case _ =>
              fs2.Stream.emit(WebSocketFrame.text("error"))
          }
          val subscribeToWallets = fs2.Stream.emit[IO, WebSocketFrame](WebSocketFrame.text("numbers"))
          subscribeToWallets.merge(readFromWebSocket)
      }

      val value = basicRequest
        .response(asWebSocketStream(Fs2Streams[IO])(pipe))
        .get(uri"wss://echo.websocket.org")
        .send(backend)
      (value >> IO.sleep(3.seconds))
    }.unsafeRunSync()
  }

}

It's mvp-ish implementation to satisfy my needs but with a few changes to SttpBackendStab it could be fairly easy to convert it into an extension of it. If tryAdjustResponseBody wouldn't be an object method, but a part of the class and have an implementation like:

private[client3] def tryAdjustResponseBody[F[_], T, U](
     ra: ResponseAs[T, _],
     b: U,
     meta: ResponseMetadata
 )(implicit monad: MonadError[F]): Option[F[T]] = {
   customResponseHandling(ra,b,meta) orElse defaultImplementation(ra,b,meta)
}

then it would be enough to extend the stub for particular types and override customResponseHandling (which would be empty by default) to achieve the same.

Zuchos avatar Apr 05 '24 06:04 Zuchos

@Zuchos yeah you're right, we could add customBodyAdjustments or sth like that as a parameter to the backend stub, just as we have matchers, or customEncodingHandler in regular backends. This should be possible in sttp4 since we don't have to keep bincompat there (yet :) ). I'll reopen this then.

adamw avatar Apr 05 '24 07:04 adamw

Additional note for sttp3, today I've spent quite some time debugging why my streams are not consumed. In the end I found the magical case ResponseAsWebSocketStream(_, _) => None that was responsible.

Do you think we could raise an error instead of failing silently in this case?

Krever avatar Jul 02 '24 13:07 Krever

@Krever yeah you're right, throwing errors might be better than returning None. We can try making this cahnge in sttp4, and we'll see if something somewhere breaks

adamw avatar Jul 08 '24 19:07 adamw