pekko icon indicating copy to clipboard operation
pekko copied to clipboard

Add Flow#onBackpressureDrop() operator

Open He-Pin opened this issue 1 year ago • 2 comments

Motivation:

I want to do some rate limiting and just fail the single task, eg when combined with mapAsync. Currently seems I can't observe the backpressure.

onBackpressure is only been called once downstream backpressure.

        queue = Source.<TranslateTask>queue(10240)
            .groupedWeightedWithin(
                dynamicDictionaryConfigs.getMaxBatchTokens(), //最大的字符数,1000 个以内
                dynamicDictionaryConfigs.getMaxBatchSize(), //最大的批量大小,50 个以内
                costFn, //字符数计算
                Duration.ofMillis(dynamicDictionaryConfigs.getMaxBatchIntervalInMillis())) //最大的聚合时间,比如 3ms
            .onBackpressure(task -> task.fail(...)) //
            .buffer(dynamicDictionaryConfigs.getOvsQpsLimit() * 2, OverflowStrategy.backpressure())
            .toMat(Sink.foreach(this::batchTranslate), Keep.left())
            ....
            .run(actorSystem);

What do you think?

In reactor-core, there is:

  • reactor.core.publisher.Flux#onBackpressureDrop(java.util.function.Consumer<? super T>)

This is what I would like to make use.

He-Pin avatar Jun 14 '24 04:06 He-Pin

Update:

@InternalApi private[pekko] trait Buffer[T] {
  def capacity: Int
  def used: Int
  def isFull: Boolean
  def isEmpty: Boolean
  def nonEmpty: Boolean

  def enqueue(elem: T): Unit
  def dequeue(): T

  def peek(): T
  def clear(): Unit
  def dropHead(): Unit
  def dropTail(): Unit
}

as the current Buffer's clear, dropHead and dropTail just returns Unit, change it to returns Seq[T] or T will help , but that can involve: 1. large amount of change across the codebase, 2. hurt performance for clear case.

So seems better to just add a new dedicated implementation with onBackPressureDrop as reactor-core?

image

He-Pin avatar Jun 23 '24 09:06 He-Pin

kind of like Kafka Request Purgatory but not the same thing, i am not objection to this feature.

Roiocam avatar Jul 24 '24 13:07 Roiocam