Potentially Low Throughput in Sink Implementation
I am currently investigating what could be causing low throughput to Pulsar in our application. I looked at PulsarSinkGraphStage.scala implementation and the following lines caught my eye:
override def preStart(): Unit = {
producer = createFn()
produceCallback = getAsyncCallback {
case Success(_) =>
pull(in)
case Failure(e) =>
logger.error("Failing pulsar sink stage", e)
failStage(e)
}
pull(in)
}
override def onPush(): Unit = {
try {
val t = grab(in)
logger.debug(s"Sending message $t")
producer.sendAsync(t).onComplete(produceCallback.invoke)
} catch {
case e: Throwable =>
logger.error("Failing pulsar sink stage", e)
failStage(e)
}
}
I haven't implemented any akka-streams Sinks myself, so my assumptions could be wrong here. But if you are doing producer.sendAsync(t).onComplete(produceCallback.invoke) doesn't that mean that a new message will be pulled only after a successful response from producer.sendAsync? Meaning that messages are effectively sent one-by-one to Pulsar and producer batching settings have no effect?
Here is the manually constructed sink that helped with throughput
Sink
.foreachAsync[ProducerMessage[T]](300)(msg => producer.sendAsync[Task](msg).void.runToFuture)
.mapMaterializedValue { completionFuture =>
val flushAndClose = for {
_ <- producer.flushAsync
_ <- producer.closeAsync
} yield ()
Task.fromFuture(completionFuture).guarantee(flushAndClose).runToFuture
}
This is very simple and it works, but not sure about potential issues of using this approach. I think a custom sink could be based on a MapAsync inlet with configurable parallelism and then PulsarSinkGraphStage sink would only be responsible for correctly closing the producer.