alpakka icon indicating copy to clipboard operation
alpakka copied to clipboard

Kinesis push throughput (maxBytesPerSecond) throttling ineffective

Open mberchon opened this issue 2 years ago • 2 comments

Versions used

  • "com.lightbend.akka" %% "akka-stream" % 2.6.19,
  • "com.lightbend.akka" %% "akka-stream-alpakka-kinesis" % 3.0.4,
  • "software.amazon.awssdk" % "sdk-core" % 2.17.172.

Expected Behavior

KinesisFlow should be throttled to 1 MBs per shard whatever the factory methods used to create the ByteBuffer payload.

Actual Behavior

Flow throughput is unlimited

Relevant logs

akka.stream.alpakka.kinesis.scaladsl.KinesisFlow line 72

private def getPayloadByteSize[T](record: (PutRecordsRequestEntry, T)): Int = record match {
    case (request, _) => request.partitionKey.length + request.data.asByteBuffer.position()
}

request.data.asByteBuffer.position() returns 0 if data created from array wrapping

software.amazon.awssdk.core.BytesWrapper line 55

public final ByteBuffer asByteBuffer() {
    return ByteBuffer.wrap(bytes).asReadOnlyBuffer();
}

mberchon avatar May 30 '22 14:05 mberchon

Unit test

  def `byte buffer wrapping`(): Unit = {

    val payload = s"My test payload ${UUID.randomUUID().toString}".padTo(1024*1000,'X').getBytes(StandardCharsets.UTF_8)
    val partition = UUID.randomUUID().toString

    val req = PutRecordsRequestEntry
      .builder()
      .partitionKey(partition)
      .data(SdkBytes.fromByteArray(payload))
      .build()

    req.data.asByteBuffer.position() shouldBe payload.length
    //Fail with: org.scalatest.exceptions.TestFailedException: 0 was not equal to 1024000, took 0.122 sec
  }
  @Test
  def `byte buffer allocate`(): Unit = {

    val payload = s"My test payload ${UUID.randomUUID().toString}".padTo(1024*1000,'X').getBytes(StandardCharsets.UTF_8)
    val partition = UUID.randomUUID().toString
    val bb = ByteBuffer.allocate(1024*1024)
    bb.put(payload)

    bb.position() shouldBe payload.length
    //Success

    val req = PutRecordsRequestEntry
      .builder()
      .partitionKey(partition)
      .data(SdkBytes.fromByteBuffer(bb))
      .build()

    req.data.asByteBuffer.position() shouldBe payload.length
    //Fail with: org.scalatest.exceptions.TestFailedException: 0 was not equal to 1024000, took 0.122 sec
  }

mberchon avatar May 30 '22 14:05 mberchon

My work around: add throughput throttle flow on the upstream with the following weight function

def weightFunction(request:PutRecordsRequestEntry):Int = request.partitionKey().length+request.data().asByteArray().length

.throttle(nbShard*1024*1024,1 second, nbShard*1024*1024, { case (req,_) => weightFunction(req)},1 second,ThrottleMode.Shaping)
.via(KinesisFlow.withContext[Ctx](streamName, KinesisFlowSettings.byNumberOfShards(nbShard).withMaxBatchSize(1)))

mberchon avatar May 30 '22 14:05 mberchon

@ennru I think this can be closed

jtjeferreira avatar Dec 27 '23 15:12 jtjeferreira

Thank you for your effort! Implemented with #3035

ennru avatar Jan 04 '24 21:01 ennru