elastic4s icon indicating copy to clipboard operation
elastic4s copied to clipboard

Is there anyway to stream the agg query?

Open dataoperandz opened this issue 10 months ago • 1 comments

Hello,

I need to get the Aggregation result. is there any possibility to stream agg query using stream publisher?

search(Indexes(s"$indexPrefix*")).size(0).aggregations(termsAgg("distinct_field_agg", field))

dataoperandz avatar Apr 26 '24 15:04 dataoperandz

Not sure if I understood correctly, but here is the implementation to stream results using fs2 library I've written:

object StreamingSearch {

  def apply[F[_] : Async, A: ClassTag : Decoder](client: ElasticClient, query: SearchRequest): Stream[F, A] = {
    require(query.sorts.nonEmpty, "Search request must have at least one sort")

    val emptySearchAfter: Option[Seq[Any]] = None

    Stream.unfoldChunkEval(emptySearchAfter) { sa =>
      client
        .execute(query.searchAfter(sa.getOrElse(Seq.empty)))
        .map { result =>
          val r = result.result

          if r.nonEmpty then
            val seq         = r.to[A]
            val searchAfter = r.hits.hits.last.sort

            Some(Chunk.from(seq), searchAfter)
          else None
        }
    }
  }

}

igor-vovk avatar Jul 02 '24 16:07 igor-vovk