tapir
tapir copied to clipboard
[BUG] Connection hangs with no useful logs when response zio stream fails
Tapir version: 1.9.10
Scala version: 3.3.1
Connection hangs with no useful logs when response zio stream fails
When using zio streams to serve http response using ZioHttpInterpreter, the server does not properly handle stream failures. The client that sent request hangs, so apparently the server doesn't even close the connection. On top of that, server logs are full of Netty internal exceptions instead of the root cause exception that failed the stream. Additionally, elements from the stream preceding the failure are not sent in the response as well
How to reproduce?
Server code
This example spins up a server with one endpoint /test that returns a simple plain text stream that fails after two elements are emitted
// ThisBuild / scalaVersion := "3.3.1"
// libraryDependencies ++= Seq(
// "com.softwaremill.sttp.tapir" %% "tapir-zio-http-server" % "1.9.10"
// )
package tapirsample
import sttp.capabilities.zio.ZioStreams
import sttp.tapir.CodecFormat
import sttp.tapir.Schema
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
import sttp.tapir.ztapir.*
import zio.*
import zio.http.HttpApp
import zio.http.Server
import zio.stream.ZStream
import java.nio.charset.StandardCharsets
object Sample extends ZIOAppDefault {
val badEndpoint: ZServerEndpoint[Any, ZioStreams] = endpoint
.in("test")
.get
.out(
streamBody(ZioStreams)(
summon[Schema[Chunk[String]]],
CodecFormat.TextPlain(),
Some(StandardCharsets.UTF_8)
)
)
.zServerLogic { _ =>
val stream = ZStream("foo", "bar") ++ ZStream.fail(new RuntimeException("boom"))
val byteStream = stream.mapConcat(_.getBytes)
ZIO.succeed(byteStream)
}
val routes: HttpApp[Any] =
ZioHttpInterpreter().toHttp(badEndpoint)
override def run: ZIO[Any, Any, Any] =
Server.serve(routes).provide(Server.defaultWithPort(9000))
}
Running the server
sbt run
[info] welcome to sbt 1.8.2 (Eclipse Adoptium Java 17.0.6)
[info] loading settings for project tapir-stream-issue-build-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project/project
[info] loading settings for project tapir-stream-issue-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project
[success] Generated .bloop/tapir-stream-issue-build.json
[success] Total time: 1 s, completed 28 Feb 2024, 11:27:59
[info] loading settings for project tapir-stream-issue from build.sbt ...
[info] set current project to tapir-stream-issue (in build file:/Users/mskripnik/projects/tapir-stream-issue/)
[info] running tapirsample.Sample
Sending the request
curl -v 'http://localhost:9000/test'
* Trying [::1]:9000...
* Connected to localhost (::1) port 9000
> GET /test HTTP/1.1
> Host: localhost:9000
> User-Agent: curl/8.4.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=UTF-8
< transfer-encoding: chunked
<
You can observe two issues here:
- curl just hangs forever after this (until timeout) waiting for response from the server
- there are no even first two strings from the response stream (foo, bar). The response body is just empty
Server logs
[info] running tapirsample.Sample
timestamp=2024-02-28T10:29:01.638818Z level=WARN thread=#zio-fiber-39 message="Fatal exception in Netty" cause="Exception in thread "zio-fiber-" io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
Suppressed: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
at io.netty.handler.codec.http.HttpObjectEncoder.throwUnexpectedMessageTypeEx(HttpObjectEncoder.java:348)
at io.netty.handler.codec.http.HttpObjectEncoder.encodeFullHttpMessage(HttpObjectEncoder.java:305)
at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:162)
at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:97)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)"
Feb 28, 2024 11:29:01 AM io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
at io.netty.handler.codec.http.HttpObjectEncoder.throwUnexpectedMessageTypeEx(HttpObjectEncoder.java:348)
at io.netty.handler.codec.http.HttpObjectEncoder.encodeFullHttpMessage(HttpObjectEncoder.java:305)
at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:162)
at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:97)
... 22 more
Meanwhile the server just spits this large pile of Netty internals. The big issue here is the lack of information about the throwable that failed the response stream (no 'boom' string seen anywhere in the logs)
Additional information
Expected behavior
The expected behavior in this case is that the server:
- sends all elements from the response stream prior the failure
- drops the connection right after it encounters the exception in the stream
- logs the details on the exception that caused the stream to fail
Working example
As a side note, this example shows that such behavior did exist in tapir v1.2.3
// ThisBuild / scalaVersion := "3.3.1"
// libraryDependencies ++= Seq(
// "com.softwaremill.sttp.tapir" %% "tapir-zio-http-server" % "1.2.3"
// )
package tapirsample
import sttp.capabilities.zio.ZioStreams
import sttp.tapir.CodecFormat
import sttp.tapir.Schema
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
import sttp.tapir.ztapir.*
import zio.*
import zio.http.HttpApp
import zio.http.Server
import zio.http.ServerConfig
import zio.stream.ZStream
import java.nio.charset.StandardCharsets
object Sample extends ZIOAppDefault {
val badEndpoint: ZServerEndpoint[Any, ZioStreams] = endpoint
.in("test")
.get
.out(
streamBody(ZioStreams)(
summon[Schema[Chunk[String]]],
CodecFormat.TextPlain(),
Some(StandardCharsets.UTF_8)
)
)
.zServerLogic { _ =>
val stream = ZStream("foo", "bar") ++ ZStream.fail(new RuntimeException("boom"))
val byteStream = stream.mapConcat(_.getBytes)
ZIO.succeed(byteStream)
}
val routes: HttpApp[Any, Throwable] =
ZioHttpInterpreter().toHttp(badEndpoint)
override def run: ZIO[Any, Any, Any] =
Server.serve(routes).provide(Server.live, ZLayer.succeed(ServerConfig.default.port(9000)))
}
sbt run
[info] welcome to sbt 1.8.2 (Eclipse Adoptium Java 17.0.6)
[info] loading settings for project tapir-stream-issue-build-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project/project
[info] loading settings for project tapir-stream-issue-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project
[success] Generated .bloop/tapir-stream-issue-build.json
[success] Total time: 1 s, completed 28 Feb 2024, 11:40:27
[info] loading settings for project tapir-stream-issue from build.sbt ...
[info] set current project to tapir-stream-issue (in build file:/Users/mskripnik/projects/tapir-stream-issue/)
[info] compiling 1 Scala source to /Users/mskripnik/projects/tapir-stream-issue/target/scala-3.3.1/classes ...
[info] running tapirsample.Sample
curl -v 'http://localhost:9000/test'
* Trying [::1]:9000...
* Connected to localhost (::1) port 9000
> GET /test HTTP/1.1
> Host: localhost:9000
> User-Agent: curl/8.4.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=UTF-8
< transfer-encoding: chunked
<
* transfer closed with outstanding read data remaining
* Closing connection
curl: (18) transfer closed with outstanding read data remaining
foobar%
- foobar is seen in the response
- the connection is closed so curl ends the process with error details
[info] running tapirsample.Sample
[ERROR] KQueueEventLoopGroup-2-2 NettyRuntime HttpRuntimeException:Exception in thread "zio-fiber-" java.lang.RuntimeException: java.lang.RuntimeException: boom
Feb 28, 2024 11:40:36 AM io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.RuntimeException: boom
at tapirsample.Sample$.$anonfun$1$$anonfun$1(Sample.scala:29)
at zio.ZIO$.fail$$anonfun$1(ZIO.scala:3083)
at zio.ZIO$.failCause$$anonfun$1(ZIO.scala:3089)
at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1126)
at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:384)
at zio.internal.FiberRuntime.start(FiberRuntime.scala:1380)
at zio.Runtime$UnsafeAPIV1.fork(Runtime.scala:155)
at zio.Runtime$UnsafeAPIV1.fork(Runtime.scala:138)
at zio.http.netty.NettyRuntime.run(NettyRuntime.scala:48)
at zio.http.netty.NettyRuntime.run$(NettyRuntime.scala:11)
at zio.http.netty.NettyRuntime$$anon$4.run(NettyRuntime.scala:112)
at zio.http.netty.server.ServerInboundHandler.channelRead0(ServerInboundHandler.scala:200)
at zio.http.netty.server.ServerInboundHandler.channelRead0(ServerInboundHandler.scala:32)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
at io.netty.handler.codec.http.HttpServerKeepAliveHandler.channelRead(HttpServerKeepAliveHandler.java:64)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:383)
at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:213)
at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:291)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
The logs above contain the information about the root cause exception, even though there is also not a very useful warning from Netty
It seems like the issue is with zio-http, caused by this: https://github.com/zio/zio-http/issues/2584
It's already fixed with https://github.com/zio/zio-http/pull/2599, so I guess we just need to wait for the next zio-http release and update it in tapir
zio-http was released https://github.com/zio/zio-http/releases/tag/v3.0.0-RC5
@TrustNoOne yes, but it cointains snapshot transitive dependencies, so we cannot use it
See https://github.com/softwaremill/tapir/pull/3596
I've just checked, https://github.com/zio/zio-http/issues/2584 hang issues I had previously and coming from the ZIO side, are fixed (zio 2.0.22, tapir 1.10.5, zhttp-3.0.0-RC6), but now I have something wrong when tapir try to process a stream which returns an error.
I get 200 OK with no content :
< HTTP/1.1 200 OK
< Content-Type: application/json-seq
< transfer-encoding: chunked
<
* transfer closed with outstanding read data remaining
* Closing connection 0
instead of getting (with the same inputs) :
< HTTP/1.1 400 Bad Request
< content-length: 305
< Content-Type: application/json
<
* Connection #0 to host 127.0.0.1 left intact
{"querySyntaxError"...
I had two similar endpoints with the same entries, the first one returns a stream and the second one the result of a backend consumed stream (the same one).
If needed I will be able to write some snippets to reproduce the issue but not before 3 weeks unfortunately.
@dacr please do, I'm not aware of any similar issues
Consider the stream logic failingGreetingStream3 bound to helloEndPoint, it summaries the issue I have, and once written like this it also gives me some hints on what's going on. I try to build a ZIO Stream from a java stream which can fail, and while reading this I realized the issue is coming from my side and the way I'm building the stream :(
val failingGreetingStream3: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
ZStream.fromJavaStreamZIO(
ZIO.fail(Exception("Can not build the stream"))
)
)
val helloEndPoint =
endpoint
.description("Returns greeting")
.get
.in("hello")
.out(streamBody(ZioStreams)(Schema.derived[Greeting], JsonSeqCodecFormat()))
.out(statusCode(StatusCode.Ok).description("query success"))
.errorOutVariantPrepend(oneOfVariant(StatusCode.InternalServerError, plainBody[String]))
val helloRoute = helloEndPoint.zServerLogic[Any](_ => failingGreetingStream3)
The full script :
// ---------------------
//> using scala "3.4.1"
//> using dep "com.softwaremill.sttp.tapir::tapir-zio:1.10.7"
//> using dep "com.softwaremill.sttp.tapir::tapir-zio-http-server:1.10.7"
//> using dep "com.softwaremill.sttp.tapir::tapir-json-zio:1.10.7"
// ---------------------
import sttp.tapir.ztapir.*
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
import zio.*
import zio.stream.*
import zio.json.*
import zio.http.Server
import sttp.capabilities.zio.ZioStreams
import sttp.model.{MediaType, StatusCode}
import sttp.tapir.{CodecFormat, Schema}
import sttp.tapir.generic.auto.*
import sttp.tapir.json.zio.*
import sttp.tapir.ztapir.*
case class Greeting(message:String) derives JsonCodec
case class JsonSeqCodecFormat() extends CodecFormat {
override val mediaType: MediaType = MediaType.unsafeApply("application", "json-seq")
}
object WebApp extends ZIOAppDefault {
// --------------------------------------------------
val greetingStream: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
ZStream
.repeat(Greeting("Hello world"))
.schedule(Schedule.spaced(1.second))
.flatMap(greeting => ZStream.fromIterable( (greeting.toJson+"\n").getBytes ))
)
val failingGreetingStream1: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.fail(
"Can not build the stream"
)
val failingGreetingStream2: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
ZStream.fail(Exception("Can not build the stream"))
)
val failingGreetingStream3: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
ZStream.fromJavaStreamZIO(
ZIO.fail(Exception("Can not build the stream"))
)
)
val helloEndPoint =
endpoint
.description("Returns greeting")
.get
.in("hello")
.out(streamBody(ZioStreams)(Schema.derived[Greeting], JsonSeqCodecFormat()))
.out(statusCode(StatusCode.Ok).description("query success"))
.errorOutVariantPrepend(oneOfVariant(StatusCode.InternalServerError, plainBody[String]))
//val helloRoute = helloEndPoint.zServerLogic[Any](_ => greetingStream)
val helloRoute = helloEndPoint.zServerLogic[Any](_ => failingGreetingStream3)
val routes = ZioHttpInterpreter().toHttp(List(helloRoute))
override def run = Server.serve(routes).provide(Server.default)
}
WebApp.main(Array.empty)
@dacr does this work when using zio-http directly, without tapir?