blaze icon indicating copy to clipboard operation
blaze copied to clipboard

FAQ: why does the client throw "org.http4s.InvalidBodyException: Received premature EOF"

Open rossabaker opened this issue 9 years ago • 8 comments

We've gotten this question a few times. We should get an FAQ into the docs, and this should be in it.

For posterity, here's my answer on gitter:

the client needs to know when the response body has been fully read, so it can release the connection. In that signature, it knows when the Task in that callback function completes. Where you're running into trouble is that it releases the connection once the Task[InputStream] is run, and that InputStream hasn't read all the bytes yet. I have only compiled this in my head, but io.toInputStream(client.streaming(request)(_.body))would get you an InputStream. .streaming returns a Process instead of a Task. Then the burden falls on you to make sure that process runs to completion, or else the connection can only be reclaimed by garbage collection. > Then when you adapt it to an InputStream, it becomes your responsibility to close it.

rossabaker avatar Mar 16 '16 17:03 rossabaker

can you please share any code samples for that @rossabaker

private val timeoutConfig = BlazeClientConfig.defaultConfig.copy(
responseHeaderTimeout = Duration.Inf, idleTimeout = Duration.Inf, requestTimeout = Duration.Inf)
private val httpClient = Http1Client[IO](config = timeoutConfig)

and then I'm doing httpClient.flatMap(_.fetch(req)(res => IO(res)))

i328254 avatar Jun 19 '18 02:06 i328254

What are you trying to get out of the response? The function passed to fetch needs to get everything out of res that you need. Something like this is common:

_.fetch(req)(res => res.as[String])

Also, flatMapping httpClient will create and shut down a new client on every request. I generally express my entire application as a Stream with fs2.StreamApp, and then use Http1Client[IO].stream(config).flatMap { client => myLogic(client) } to provide a single client to the entire application.

rossabaker avatar Jun 19 '18 03:06 rossabaker

@rossabaker Thanks for your immediate response. I'll give you a little background on what I'm trying to achieve and maybe you can suggest the best way to do it :) In a microservice architecture, I'm trying to build a gateway. Now the client calls the gateway and the gateway internally calls the microservice the gateway doesn't need to get anything out of the response, it just need to fetch the response and return to the client

The problem I'm facing is, if the response body is huge like 14k of lines, the fetch function is giving an EOF error. I also went through your conversation here

So my question is: Is it still valid to use httpclient.toHttpService.run() function if not please suggest alternative to fetch when you don't want to perform any modification on the response and return as it is

i328254 avatar Jun 19 '18 06:06 i328254

toHttpService is intended exactly for this proxy scenario. If your service returns the response from toHttpService.run(req), then the server will consume the body to render the response, and the connection is released. This is exactly what you want.

rossabaker avatar Jun 19 '18 15:06 rossabaker

Thanks a lot @rossabaker just one more question, is there a way to achieve the same with fetch

i328254 avatar Jun 20 '18 02:06 i328254

No. Fetch releases the connection at the end of its response callback function.

On Tue, Jun 19, 2018, 22:50 i328254 [email protected] wrote:

Thanks a lot @rossabaker https://github.com/rossabaker just one more question, is there a way to achieve the same with fetch

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub <http4s/blaze#655>, or mute the thread https://github.com/notifications/unsubscribe-auth/AAItalUFmwqE01Iuzj5M9e3QgcY9z4iPks5t-bh9gaJpZM4HyP0l .

rossabaker avatar Jun 20 '18 03:06 rossabaker

@rossabaker i think im experiencing the same issue but can't figure out what im doing wrong, im new to http4s so im sure it is just something i have missed.

 def createStream[F[_]](args: List[String], shutdown: F[Unit])(
      implicit E: Effect[F]): Stream[F, ExitCode] =
    for {
      client <- Http1Client.stream[F]()
      loggedClient = Logger(true, true, _ => false)(client)
      wxStationUpstream = WeatherStationUpstreamInterpreter[F](loggedClient)
      wxStationService = WeatherStationService[F](wxStationUpstream)
      exitCode <- BlazeBuilder[F]
        .bindHttp(8080, "localhost")
        .mountService(WeatherStationEndpoints.endpoints[F](wxStationService), "/")
        .serve
    } yield exitCode

 class WeatherStationUpstreamInterpreter[F[_]: Sync](client: Client[F]) (implicit M: Monad[F]) extends WeatherStationUpstreamAlgebra[F] {

  import cats.syntax.all._
  import io.circe.generic.auto._

  val uri = Uri.uri("http://wx.avalanche.ca/")//todo config

  def listStations(): F[Seq[Station]] = stations(None)

  def listMeasurements(): F[Seq[Measurement]] = measurements(None)

  private def stations(id: Option[Int]): F[Seq[Station]] = {
    val req: Request[F] = Request[F](method=Method.GET, uri=uri.withPath(s"/stations/${id.getOrElse("")}"))
    client.expect(req)(jsonOf[F, Seq[Station]])
  }

  private def measurements(id: Option[Int]): F[Seq[Measurement]] = {
    val req: Request[F] = Request[F](method=Method.GET, uri=uri.withPath(s"/measurements/${id.getOrElse("")}"))
    client.expect(req)(jsonOf[F, Seq[Measurement]])
  }
}

object WeatherStationUpstreamInterpreter {
  def apply[F[_]: Sync](client: Client[F]): WeatherStationUpstreamInterpreter[F] =
    new WeatherStationUpstreamInterpreter(client)
}

curl -v http://127.0.0.1:8080/stations
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 8080 (#0)
> GET /stations HTTP/1.1
> Host: 127.0.0.1:8080
> User-Agent: curl/7.58.0
> Accept: */*
> 
< HTTP/1.1 422 Unprocessable Entity
< Content-Type: text/plain; charset=UTF-8
< Date: Wed, 31 Oct 2018 02:10:03 GMT
< Content-Length: 29
< 
* Connection #0 to host 127.0.0.1 left intact
The request body was invalid.
Total time: 27 s, completed Oct 30, 2018 7:02:02 PM
[info] Packaging /home/ben/code/avid-data/target/scala-2.12/aviddataaggregator_2.12-0.0.1-SNAPSHOT.jar ...
[info] Done packaging.
[info] Running ca.avalanche.aviddata.Server 
[run-main-0] INFO  o.h.b.c.n.NIO1SocketServerGroup - Service bound to address /127.0.0.1:8080 
[run-main-0] INFO  o.h.s.b.BlazeBuilder - 
  _   _   _        _ _     
 | |_| |_| |_ _ __| | | ___
 | ' \  _|  _| '_ \_  _(_-<
 |_||_\__|\__| .__/ |_|/__/
             |_| 
[run-main-0] INFO  o.h.s.b.BlazeBuilder - http4s v0.18.19 on blaze v0.12.13 started at http://127.0.0.1:8080/ 
[blaze-nio1-acceptor] INFO  o.h.b.c.ServerChannelGroup - Connection to /127.0.0.1:60326 accepted at Tue Oct 30 19:10:00 PDT 2018. 
[scala-execution-context-global-156] INFO  o.h.c.m.RequestLogger - HTTP/1.1 GET http://wx.avalanche.ca/stations/ Headers(Accept: application/json) body="" 
[scala-execution-context-global-156] INFO  o.h.c.m.ResponseLogger - HTTP/1.1 200 OK Headers(Access-Control-Allow-Headers: accept-encoding, cache-control, origin, accept-language, Access-Control-Allow-Origin: *, Allow: GET, OPTIONS, Content-Type: application/json, Date: Wed, 31 Oct 2018 02:10:02 GMT, Server: nginx/1.10.1, Vary: Accept, Cookie, X-Frame-Options: SAMEORIGIN, Content-Length: 16371, Connection: keep-alive) body=" ...... "

benshaw avatar Oct 31 '18 03:10 benshaw

ignore it was a json parsing issue

benshaw avatar Oct 31 '18 07:10 benshaw