pulsar4s icon indicating copy to clipboard operation
pulsar4s copied to clipboard

Potentially Low Throughput in Sink Implementation

Open n3ziniuka5 opened this issue 4 years ago • 1 comments

I am currently investigating what could be causing low throughput to Pulsar in our application. I looked at PulsarSinkGraphStage.scala implementation and the following lines caught my eye:

override def preStart(): Unit = {
  producer = createFn()
  produceCallback = getAsyncCallback {
    case Success(_) =>
      pull(in)
    case Failure(e) =>
      logger.error("Failing pulsar sink stage", e)
      failStage(e)
  }
  pull(in)
}

override def onPush(): Unit = {
  try {
    val t = grab(in)
    logger.debug(s"Sending message $t")
    producer.sendAsync(t).onComplete(produceCallback.invoke)
  } catch {
    case e: Throwable =>
      logger.error("Failing pulsar sink stage", e)
      failStage(e)
  }
}

I haven't implemented any akka-streams Sinks myself, so my assumptions could be wrong here. But if you are doing producer.sendAsync(t).onComplete(produceCallback.invoke) doesn't that mean that a new message will be pulled only after a successful response from producer.sendAsync? Meaning that messages are effectively sent one-by-one to Pulsar and producer batching settings have no effect?

n3ziniuka5 avatar Jan 26 '21 17:01 n3ziniuka5

Here is the manually constructed sink that helped with throughput

Sink
  .foreachAsync[ProducerMessage[T]](300)(msg => producer.sendAsync[Task](msg).void.runToFuture)
  .mapMaterializedValue { completionFuture =>
    val flushAndClose = for {
      _ <- producer.flushAsync
      _ <- producer.closeAsync
    } yield ()
  
    Task.fromFuture(completionFuture).guarantee(flushAndClose).runToFuture
  }

This is very simple and it works, but not sure about potential issues of using this approach. I think a custom sink could be based on a MapAsync inlet with configurable parallelism and then PulsarSinkGraphStage sink would only be responsible for correctly closing the producer.

n3ziniuka5 avatar Jan 27 '21 09:01 n3ziniuka5