FAQ: why does the client throw "org.http4s.InvalidBodyException: Received premature EOF"
We've gotten this question a few times. We should get an FAQ into the docs, and this should be in it.
For posterity, here's my answer on gitter:
the client needs to know when the response body has been fully read, so it can release the connection. In that signature, it knows when the Task in that callback function completes. Where you're running into trouble is that it releases the connection once the Task[InputStream] is run, and that InputStream hasn't read all the bytes yet. I have only compiled this in my head, but io.toInputStream(client.streaming(request)(_.body))would get you an InputStream. .streaming returns a Process instead of a Task. Then the burden falls on you to make sure that process runs to completion, or else the connection can only be reclaimed by garbage collection. > Then when you adapt it to an InputStream, it becomes your responsibility to close it.
can you please share any code samples for that @rossabaker
private val timeoutConfig = BlazeClientConfig.defaultConfig.copy(
responseHeaderTimeout = Duration.Inf, idleTimeout = Duration.Inf, requestTimeout = Duration.Inf)
private val httpClient = Http1Client[IO](config = timeoutConfig)
and then I'm doing
httpClient.flatMap(_.fetch(req)(res => IO(res)))
What are you trying to get out of the response? The function passed to fetch needs to get everything out of res that you need. Something like this is common:
_.fetch(req)(res => res.as[String])
Also, flatMapping httpClient will create and shut down a new client on every request. I generally express my entire application as a Stream with fs2.StreamApp, and then use Http1Client[IO].stream(config).flatMap { client => myLogic(client) } to provide a single client to the entire application.
@rossabaker Thanks for your immediate response. I'll give you a little background on what I'm trying to achieve and maybe you can suggest the best way to do it :) In a microservice architecture, I'm trying to build a gateway. Now the client calls the gateway and the gateway internally calls the microservice the gateway doesn't need to get anything out of the response, it just need to fetch the response and return to the client
The problem I'm facing is, if the response body is huge like 14k of lines, the fetch function is giving an EOF error.
I also went through your conversation here
So my question is: Is it still valid to use httpclient.toHttpService.run() function
if not please suggest alternative to fetch when you don't want to perform any modification on the response and return as it is
toHttpService is intended exactly for this proxy scenario. If your service returns the response from toHttpService.run(req), then the server will consume the body to render the response, and the connection is released. This is exactly what you want.
Thanks a lot @rossabaker
just one more question, is there a way to achieve the same with fetch
No. Fetch releases the connection at the end of its response callback function.
On Tue, Jun 19, 2018, 22:50 i328254 [email protected] wrote:
Thanks a lot @rossabaker https://github.com/rossabaker just one more question, is there a way to achieve the same with fetch
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub <http4s/blaze#655>, or mute the thread https://github.com/notifications/unsubscribe-auth/AAItalUFmwqE01Iuzj5M9e3QgcY9z4iPks5t-bh9gaJpZM4HyP0l .
@rossabaker i think im experiencing the same issue but can't figure out what im doing wrong, im new to http4s so im sure it is just something i have missed.
def createStream[F[_]](args: List[String], shutdown: F[Unit])(
implicit E: Effect[F]): Stream[F, ExitCode] =
for {
client <- Http1Client.stream[F]()
loggedClient = Logger(true, true, _ => false)(client)
wxStationUpstream = WeatherStationUpstreamInterpreter[F](loggedClient)
wxStationService = WeatherStationService[F](wxStationUpstream)
exitCode <- BlazeBuilder[F]
.bindHttp(8080, "localhost")
.mountService(WeatherStationEndpoints.endpoints[F](wxStationService), "/")
.serve
} yield exitCode
class WeatherStationUpstreamInterpreter[F[_]: Sync](client: Client[F]) (implicit M: Monad[F]) extends WeatherStationUpstreamAlgebra[F] {
import cats.syntax.all._
import io.circe.generic.auto._
val uri = Uri.uri("http://wx.avalanche.ca/")//todo config
def listStations(): F[Seq[Station]] = stations(None)
def listMeasurements(): F[Seq[Measurement]] = measurements(None)
private def stations(id: Option[Int]): F[Seq[Station]] = {
val req: Request[F] = Request[F](method=Method.GET, uri=uri.withPath(s"/stations/${id.getOrElse("")}"))
client.expect(req)(jsonOf[F, Seq[Station]])
}
private def measurements(id: Option[Int]): F[Seq[Measurement]] = {
val req: Request[F] = Request[F](method=Method.GET, uri=uri.withPath(s"/measurements/${id.getOrElse("")}"))
client.expect(req)(jsonOf[F, Seq[Measurement]])
}
}
object WeatherStationUpstreamInterpreter {
def apply[F[_]: Sync](client: Client[F]): WeatherStationUpstreamInterpreter[F] =
new WeatherStationUpstreamInterpreter(client)
}
curl -v http://127.0.0.1:8080/stations
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 8080 (#0)
> GET /stations HTTP/1.1
> Host: 127.0.0.1:8080
> User-Agent: curl/7.58.0
> Accept: */*
>
< HTTP/1.1 422 Unprocessable Entity
< Content-Type: text/plain; charset=UTF-8
< Date: Wed, 31 Oct 2018 02:10:03 GMT
< Content-Length: 29
<
* Connection #0 to host 127.0.0.1 left intact
The request body was invalid.
Total time: 27 s, completed Oct 30, 2018 7:02:02 PM
[info] Packaging /home/ben/code/avid-data/target/scala-2.12/aviddataaggregator_2.12-0.0.1-SNAPSHOT.jar ...
[info] Done packaging.
[info] Running ca.avalanche.aviddata.Server
[run-main-0] INFO o.h.b.c.n.NIO1SocketServerGroup - Service bound to address /127.0.0.1:8080
[run-main-0] INFO o.h.s.b.BlazeBuilder -
_ _ _ _ _
| |_| |_| |_ _ __| | | ___
| ' \ _| _| '_ \_ _(_-<
|_||_\__|\__| .__/ |_|/__/
|_|
[run-main-0] INFO o.h.s.b.BlazeBuilder - http4s v0.18.19 on blaze v0.12.13 started at http://127.0.0.1:8080/
[blaze-nio1-acceptor] INFO o.h.b.c.ServerChannelGroup - Connection to /127.0.0.1:60326 accepted at Tue Oct 30 19:10:00 PDT 2018.
[scala-execution-context-global-156] INFO o.h.c.m.RequestLogger - HTTP/1.1 GET http://wx.avalanche.ca/stations/ Headers(Accept: application/json) body=""
[scala-execution-context-global-156] INFO o.h.c.m.ResponseLogger - HTTP/1.1 200 OK Headers(Access-Control-Allow-Headers: accept-encoding, cache-control, origin, accept-language, Access-Control-Allow-Origin: *, Allow: GET, OPTIONS, Content-Type: application/json, Date: Wed, 31 Oct 2018 02:10:02 GMT, Server: nginx/1.10.1, Vary: Accept, Cookie, X-Frame-Options: SAMEORIGIN, Content-Length: 16371, Connection: keep-alive) body=" ...... "
ignore it was a json parsing issue