elastic4s icon indicating copy to clipboard operation
elastic4s copied to clipboard

May I propose a more pure FP design for `ElasticClient`, `HttpClient` and `Executor`?

Open kailuowang opened this issue 4 years ago • 1 comments

The current design tries to abstract the effect type F[_] into Executor[F], but in reality, the effect type is usually determined by the implementation in HttpClient. For example, SttpRequestHttpClient is actually using a Future... May I propose simplifing this design by using a barebone tagless final design and rid of the need for Exceutor. Bascially

trait HttpClient[F[_]] {
  def send(request: ElasticRequest): F[HttpResponse] 
}

object Elastic4s {

    def execute[T, U, F[_]](t: T)(implicit
                                    client: HttpClient[F],
                                    functor: Functor[F],
                                    handler: Handler[T, U],
                                    options: CommonRequestOptions): F[Response[U]] {
          val request = handler.build(t)         
          //omitting the part that add `timeout` and `master_timeout` parameter
          functor.map(client(request3)) { resp =>
              handler.responseHandler.handle(resp)
                .fold(
                  error =>
                    RequestFailure(resp.statusCode, resp.entity.map(_.content), resp.headers, error),
                  u =>
                    RequestSuccess(resp.statusCode, resp.entity.map(_.content), resp.headers, u)
                )
     }
}

With this design it's quite easy to add more HttpClient implementations, specifically, it made it a lot easier for each HttpClient implementation to control their effect type system. The current ElasticClient is a simple wrapper of HttpClient, I feel it's unnecessary for it to try to manage the effect system for HttpClient

Here is an example of a Http4s based HttpClient impl.


class Http4sElastic4sClient[F[_]: ContextShift](
    baseUri: Uri,
    blocker: Blocker,
    hClient: Client[F]
  )(implicit
    F: Effect[F]) {

  private def processResponse(resp: Response[F]): F[HttpResponse] = {
    resp.as[String].map { body =>
      val entity =
        Some(HttpEntity.StringEntity(body, resp.contentType.map(_.renderString)))
      HttpResponse(
        resp.status.code,
        entity,
        resp.headers.toList.map(h => (h.name.value, h.value)).toMap
      )
    }
  }

  implicit val fileEncoder = EntityEncoder.fileEncoder[F](blocker)
  implicit val isEncoder = EntityEncoder.inputStreamEncoder[F, InputStream](blocker)

  private def send(
      request: ElasticRequest
    ): F[HttpResponse] = {
    import request._
    val uri =
      baseUri
        .addPath(endpoint)
        .withQueryParams(params)

    val http4sReq = Method
      .fromString(method.toUpperCase())
      .map { m =>
        def withBody[A](
            b: A,
            ct: Option[String]
          )(implicit
            w: EntityEncoder[F, A]
          ): Request[F] = {
          val h = w.headers
          val entity = w.toEntity(b)
          val withContentLength = entity.length
            .map { l =>
              `Content-Length`.fromLong(l).fold(_ => h, c => h.put(c))
            }
            .getOrElse(h)

          val newHeaders = ct
            .flatMap { cts =>
              `Content-Type`.parse(cts).toOption
            }
            .map { h =>
              withContentLength.filterNot(_.is(`Content-Type`)).put(h)
            }
            .getOrElse(withContentLength)

          Request(method = m, uri = uri, headers = newHeaders, body = entity.body)
        }

        entity.fold(Request[F](m, uri)) {
          case StringEntity(content, _)    => withBody(content)
          case ByteArrayEntity(content, _) => withBody(content)
          case InputStreamEntity(in, _)    => withBody(F.pure(in))
          case FileEntity(file: File, _)   => withBody(file)
        }
      }
      .liftTo[F]

    http4sReq.flatMap(req => hClient.run(req).use(processResponse))
  }

UPDATE: fixed the example HttpClient with correct content type

kailuowang avatar Apr 05 '21 18:04 kailuowang

So just push the F[_] down into the client trait. We could do this as part of the scala 3 work, because that's going to need a new client trait anyway to use givens.

On Mon, 5 Apr 2021 at 13:21, Kai(luo) Wang @.***> wrote:

The current design tries to abstract the effect type F[_] into Executor[F], but in reality, the effect type is usually determined by the implementation in HttpClient. For example, SttpRequestHttpClient is actually using a Future... May I propose simplifing this design by using a barebone tagless final design and rid of the need for Exceutor. Bascially

trait HttpClient[F[_]] { def send(request: ElasticRequest): F[HttpResponse] } object Elastic4s {

def execute[T, U, F[_]](t: T)(implicit
                                client: HttpClient[F],
                                functor: Functor[F],
                                handler: Handler[T, U],
                                options: CommonRequestOptions): F[Response[U]] {
      val request = handler.build(t)
      //omitting the part that add `timeout` and `master_timeout` parameter
      functor.map(client(request3)) { resp =>
          handler.responseHandler.handle(resp)
            .fold(
              error =>
                RequestFailure(resp.statusCode, resp.entity.map(_.content), resp.headers, error),
              u =>
                RequestSuccess(resp.statusCode, resp.entity.map(_.content), resp.headers, u)
            )
 }

}

With this design it's quite easy to add more HttpClient implementations, specifically, it made it a lot easier for each HttpClient implementation to control their effect type system. The current ElasticClient is a simple wrapper of HttpClient, I feel it's unnecessary for it to try to manage the effect system for HttpClient

Here is an example of a Http4s based HttpClient impl.

class Http4sElastic4sClient[F[_]: ContextShift]( baseUri: Uri, blocker: Blocker, hClient: Client[F] )(implicit F: Effect[F]) {

private def processResponse(resp: Response[F]): F[HttpResponse] = { resp.as[String].map { body => val entity = Some(HttpEntity.StringEntity(body, resp.contentType.map(_.renderString))) HttpResponse( resp.status.code, entity, resp.headers.toList.map(h => (h.name.value, h.value)).toMap ) } }

implicit val fileEncoder = EntityEncoder.fileEncoderF implicit val isEncoder = EntityEncoder.inputStreamEncoderF, InputStream

private def send( request: ElasticRequest ): F[HttpResponse] = { import request._ val uri = baseUri .addPath(endpoint) .withQueryParams(params)

val http4sReq = Method
  .fromString(method.toUpperCase())
  .map { m =>
    def withBody[A](
        b: A
      )(implicit
        w: EntityEncoder[F, A]
      ): Request[F] = {
      val h = w.headers
      val entity = w.toEntity(b)
      val newHeaders = entity.length
        .map { l =>
          `Content-Length`.fromLong(l).fold(_ => h, c => h.put(c))
        }
        .getOrElse(h)
      Request(method = m, uri = uri, headers = newHeaders, body = entity.body)
    }

    entity.fold(Request[F](m, uri)) {
      case StringEntity(content, _)    => withBody(content)
      case ByteArrayEntity(content, _) => withBody(content)
      case InputStreamEntity(in, _)    => withBody(F.pure(in))
      case FileEntity(file: File, _)   => withBody(file)
    }
  }
  .liftTo[F]

http4sReq.flatMap(req => hClient.run(req).use(processResponse))

}

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/sksamuel/elastic4s/issues/2379, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFVSGW7JD57JMCGL5MNOPTTHH5R3ANCNFSM42NHLSVA .

sksamuel avatar Apr 05 '21 20:04 sksamuel