alpakka
alpakka copied to clipboard
Kinesis push throughput (maxBytesPerSecond) throttling ineffective
Versions used
- "com.lightbend.akka" %% "akka-stream" % 2.6.19,
- "com.lightbend.akka" %% "akka-stream-alpakka-kinesis" % 3.0.4,
- "software.amazon.awssdk" % "sdk-core" % 2.17.172.
Expected Behavior
KinesisFlow should be throttled to 1 MBs per shard whatever the factory methods used to create the ByteBuffer payload.
Actual Behavior
Flow throughput is unlimited
Relevant logs
akka.stream.alpakka.kinesis.scaladsl.KinesisFlow line 72
private def getPayloadByteSize[T](record: (PutRecordsRequestEntry, T)): Int = record match {
case (request, _) => request.partitionKey.length + request.data.asByteBuffer.position()
}
request.data.asByteBuffer.position() returns 0 if data created from array wrapping
software.amazon.awssdk.core.BytesWrapper line 55
public final ByteBuffer asByteBuffer() {
return ByteBuffer.wrap(bytes).asReadOnlyBuffer();
}
Unit test
def `byte buffer wrapping`(): Unit = {
val payload = s"My test payload ${UUID.randomUUID().toString}".padTo(1024*1000,'X').getBytes(StandardCharsets.UTF_8)
val partition = UUID.randomUUID().toString
val req = PutRecordsRequestEntry
.builder()
.partitionKey(partition)
.data(SdkBytes.fromByteArray(payload))
.build()
req.data.asByteBuffer.position() shouldBe payload.length
//Fail with: org.scalatest.exceptions.TestFailedException: 0 was not equal to 1024000, took 0.122 sec
}
@Test
def `byte buffer allocate`(): Unit = {
val payload = s"My test payload ${UUID.randomUUID().toString}".padTo(1024*1000,'X').getBytes(StandardCharsets.UTF_8)
val partition = UUID.randomUUID().toString
val bb = ByteBuffer.allocate(1024*1024)
bb.put(payload)
bb.position() shouldBe payload.length
//Success
val req = PutRecordsRequestEntry
.builder()
.partitionKey(partition)
.data(SdkBytes.fromByteBuffer(bb))
.build()
req.data.asByteBuffer.position() shouldBe payload.length
//Fail with: org.scalatest.exceptions.TestFailedException: 0 was not equal to 1024000, took 0.122 sec
}
My work around: add throughput throttle flow on the upstream with the following weight function
def weightFunction(request:PutRecordsRequestEntry):Int = request.partitionKey().length+request.data().asByteArray().length
.throttle(nbShard*1024*1024,1 second, nbShard*1024*1024, { case (req,_) => weightFunction(req)},1 second,ThrottleMode.Shaping)
.via(KinesisFlow.withContext[Ctx](streamName, KinesisFlowSettings.byNumberOfShards(nbShard).withMaxBatchSize(1)))
@ennru I think this can be closed
Thank you for your effort! Implemented with #3035