pekko
pekko copied to clipboard
Add Flow#bufferUntilChanged operator
Maybe implemented with aggregateWithBoundary?
eg:
this.ollamaApi.pullModel(new PullModelRequest(modelName))
.bufferUntilChanged(OllamaApi.ProgressResponse::status)
.doOnEach(signal -> {
var progressResponses = signal.get();
if (!CollectionUtils.isEmpty(progressResponses) && progressResponses.get(progressResponses.size() - 1) != null) {
logger.info("Pulling the '{}' model - Status: {}", modelName, progressResponses.get(progressResponses.size() - 1).status());
}
})
.takeUntil(progressResponses ->
progressResponses.get(0) != null && "success".equals(progressResponses.get(0).status()))
.timeout(this.options.timeout())
.retryWhen(Retry.backoff(this.options.maxRetries(), Duration.ofSeconds(5)))
.blockLast();