elastic4s
elastic4s copied to clipboard
Is there anyway to stream the agg query?
Hello,
I need to get the Aggregation result. is there any possibility to stream agg query using stream publisher?
search(Indexes(s"$indexPrefix*")).size(0).aggregations(termsAgg("distinct_field_agg", field))
Not sure if I understood correctly, but here is the implementation to stream results using fs2
library I've written:
object StreamingSearch {
def apply[F[_] : Async, A: ClassTag : Decoder](client: ElasticClient, query: SearchRequest): Stream[F, A] = {
require(query.sorts.nonEmpty, "Search request must have at least one sort")
val emptySearchAfter: Option[Seq[Any]] = None
Stream.unfoldChunkEval(emptySearchAfter) { sa =>
client
.execute(query.searchAfter(sa.getOrElse(Seq.empty)))
.map { result =>
val r = result.result
if r.nonEmpty then
val seq = r.to[A]
val searchAfter = r.hits.hits.last.sort
Some(Chunk.from(seq), searchAfter)
else None
}
}
}
}