Add Flow#onBackpressureDrop() operator
Motivation:
I want to do some rate limiting and just fail the single task, eg when combined with mapAsync.
Currently seems I can't observe the backpressure.
onBackpressure is only been called once downstream backpressure.
queue = Source.<TranslateTask>queue(10240)
.groupedWeightedWithin(
dynamicDictionaryConfigs.getMaxBatchTokens(), //最大的字符数,1000 个以内
dynamicDictionaryConfigs.getMaxBatchSize(), //最大的批量大小,50 个以内
costFn, //字符数计算
Duration.ofMillis(dynamicDictionaryConfigs.getMaxBatchIntervalInMillis())) //最大的聚合时间,比如 3ms
.onBackpressure(task -> task.fail(...)) //
.buffer(dynamicDictionaryConfigs.getOvsQpsLimit() * 2, OverflowStrategy.backpressure())
.toMat(Sink.foreach(this::batchTranslate), Keep.left())
....
.run(actorSystem);
What do you think?
In reactor-core, there is:
-
reactor.core.publisher.Flux#onBackpressureDrop(java.util.function.Consumer<? super T>)
This is what I would like to make use.
Update:
@InternalApi private[pekko] trait Buffer[T] {
def capacity: Int
def used: Int
def isFull: Boolean
def isEmpty: Boolean
def nonEmpty: Boolean
def enqueue(elem: T): Unit
def dequeue(): T
def peek(): T
def clear(): Unit
def dropHead(): Unit
def dropTail(): Unit
}
as the current Buffer's clear, dropHead and dropTail just returns Unit, change it to returns Seq[T] or T will help , but that can involve: 1. large amount of change across the codebase, 2. hurt performance for clear case.
So seems better to just add a new dedicated implementation with onBackPressureDrop as reactor-core?
kind of like Kafka Request Purgatory but not the same thing, i am not objection to this feature.