aws_kinesis_stream sink batching is not working
A note for the community
- Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
- If you are interested in working on this issue or have submitted a pull request, please leave a comment
Problem
records are not batched together before being sent to kinesis. each record is sent individually to kinesis
Configuration
acknowledgements:
enabled: true
sinks:
kinesis:
type: 'aws_kinesis_streams'
inputs:
- 'events_source'
encoding:
codec: 'json'
auth:
access_key_id: '#######'
secret_access_key: '#######'
stream_name: '#######'
region: '######'
# using default batching configuration
sources:
events_source:
type: 'http_server'
address: '127.0.0.1:5000'
auth:
username: 'user'
password: 'pass'
encoding: 'json'
method: 'POST'
path: '/data'
response_code: 202
Version
vector 0.38.0 (aarch64-apple-darwin)
Debug Output
No response
Example Data
i tried debugging it myself and downloded vecror as a source.
i added to this file
vector/src/sinks/aws_kinesis/streams/record.rs
this print line:
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E, HttpResponse>> {
let rec_count = records.len();
let total_size = records
.iter()
.fold(0, |acc, record| acc + record.data().as_ref().len());
println!("Sending {} records to stream {}. size: {}", rec_count, stream_name, total_size); // Debugging line
this line always return 1 no matter what configuration i'm using.
after modifying the code it seems that the partition_key is used for the batching, which is a bug since partition key should be a field used by the kinesis for spreading data across shards.
Additional Context
No response
References
No response
I think there may be some, understandable, confusion here. The batch settings for the aws_kinesis sink correspond to how the events are batched into AWS API PutRecords calls. That is, if you the batch size is set to 500, it'll send 500 records in a single request. However, each event is always a single Kinesis record. It seems like you might be expecting it to batch within a Kinesis record.
Can you share more about your use-case for putting multiple events into a single Kinesis record?
Related:
- https://github.com/vectordotdev/vector/issues/1407
I think there may be some, understandable, confusion here. The
batchsettings for theaws_kinesissink correspond to how the events are batched into AWS API PutRecords calls. That is, if you the batch size is set to 500, it'll send 500 records in a single request. However, each event is always a single Kinesis record. It seems like you might be expecting it to batch within a Kinesis record.Can you share more about your use-case for putting multiple events into a single Kinesis record?
hi! im not trying to batch them into a single kinesis event. The line i added in the vector source indicates that the putRecords only put 1 vector event instead of 500.
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E, HttpResponse>> {
let rec_count = records.len();
let total_size = records
.iter()
.fold(0, |acc, record| acc + record.data().as_ref().len());
println!("Sending {} records to stream {}. size: {}", rec_count, stream_name, total_size); // Debugging line
i think that the issue might be in this function: batched_partitioned in the file: vector/src/sinks/aws_kinesis/sink.rs. but i didn't find the definition of it anywhere. reminder that when i modified vector code to use a constant partition key, the batching works, i assume that batched_partitioned function uses the partition key for batching when it shouldn't.
when using:
fn gen_partition_key(number_of_shards) -> String {
"fixed_partition_key".to_string()
}
the debugging print gives:
Oh I see, thanks for the additional detail @romkimchi1002 ! It does look like there is a bug here then; likely with the partition key as you noted.
thanks!
I hereby confirm that this problem also exists for the aws_kinesis_firehose sink.
We were able to work around it, by selecting a (locally) constant field as partition_key_field:
[sinks.firehose_al]
type = "aws_kinesis_firehose"
inputs = ["input"]
stream_name = "firehose-stream-name"
encoding.codec = "json"
partition_key_field = "hostname" # Workaround for https://github.com/vectordotdev/vector/issues/20575
With the submission of #20653 I think this issue can now be closed.
Agreed, thanks @steven-aerts . Closed by #20653
@jszwedko
I think there may be some, understandable, confusion here. The batch settings for the aws_kinesis sink correspond to how the events are batched into AWS API PutRecords calls. That is, if you the batch size is set to 500, it'll send 500 records in a single request. However, each event is always a single Kinesis record. It seems like you might be expecting it to batch within a Kinesis record. Can you share more about your use-case for putting multiple events into a single Kinesis record?
I’m looking into this right now. The issue is about sending multiple events in a single Kinesis Record. The reason this matters is that Kinesis pricing is based on record size, with a minimum billable size of 5KB per record. If you send 10 events that are each 1KB in size separately, you’ll be charged for 50KB in total. However, if you batch 5 events into a single record, you would only be charged for 10KB.
ref: https://aws.amazon.com/firehose/pricing/?nc1=h_ls
Hi @jesseha-aiderx, a combination of the reduce & remap can be used to address this. The alternative would be a more complex batch setup at the sink but that would require some thought and a PR. So I would encourage you to experiment with reduce.
@pront You’re right. I think I can do with remap and reduce. It would be even better if the sink provided this functionality, though. Haha. I'll try it. Thank you for your reply!