pekko icon indicating copy to clipboard operation
pekko copied to clipboard

Add Flow#bufferUntilChanged operator

Open He-Pin opened this issue 7 months ago • 0 comments

Maybe implemented with aggregateWithBoundary?

eg:

		this.ollamaApi.pullModel(new PullModelRequest(modelName))
				.bufferUntilChanged(OllamaApi.ProgressResponse::status)
				.doOnEach(signal -> {
					var progressResponses = signal.get();
					if (!CollectionUtils.isEmpty(progressResponses) && progressResponses.get(progressResponses.size() - 1) != null) {
						logger.info("Pulling the '{}' model - Status: {}", modelName, progressResponses.get(progressResponses.size() - 1).status());
					}
				})
				.takeUntil(progressResponses ->
					progressResponses.get(0) != null && "success".equals(progressResponses.get(0).status()))
				.timeout(this.options.timeout())
				.retryWhen(Retry.backoff(this.options.maxRetries(), Duration.ofSeconds(5)))
				.blockLast();

He-Pin avatar Apr 19 '25 14:04 He-Pin