vector icon indicating copy to clipboard operation
vector copied to clipboard

aws_kinesis_stream sink batching is not working

Open romkimchi1002 opened this issue 1 year ago • 6 comments

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

records are not batched together before being sent to kinesis. each record is sent individually to kinesis

Configuration

acknowledgements:
  enabled: true

sinks:
  kinesis:
    type: 'aws_kinesis_streams'
    inputs:
      - 'events_source'
    encoding:
      codec: 'json'
    auth:
      access_key_id: '#######'
      secret_access_key: '#######'
    stream_name: '#######'
    region: '######'
    # using default batching configuration

sources:
  events_source:
    type: 'http_server'
    address: '127.0.0.1:5000'
    auth: 
      username: 'user'
      password: 'pass'
    encoding: 'json'
    method: 'POST'
    path: '/data'
    response_code: 202

Version

vector 0.38.0 (aarch64-apple-darwin)

Debug Output

No response

Example Data

i tried debugging it myself and downloded vecror as a source. i added to this file vector/src/sinks/aws_kinesis/streams/record.rs this print line:

async fn send(
        &self,
        records: Vec<Self::T>,
        stream_name: String,
    ) -> Result<KinesisResponse, SdkError<Self::E, HttpResponse>> {
        let rec_count = records.len();
        let total_size = records
            .iter()
            .fold(0, |acc, record| acc + record.data().as_ref().len());

        println!("Sending {} records to stream {}. size: {}", rec_count, stream_name, total_size); // Debugging line

this line always return 1 no matter what configuration i'm using.

after modifying the code it seems that the partition_key is used for the batching, which is a bug since partition key should be a field used by the kinesis for spreading data across shards.

Additional Context

No response

References

No response

romkimchi1002 avatar May 29 '24 13:05 romkimchi1002

I think there may be some, understandable, confusion here. The batch settings for the aws_kinesis sink correspond to how the events are batched into AWS API PutRecords calls. That is, if you the batch size is set to 500, it'll send 500 records in a single request. However, each event is always a single Kinesis record. It seems like you might be expecting it to batch within a Kinesis record.

Can you share more about your use-case for putting multiple events into a single Kinesis record?

jszwedko avatar May 29 '24 21:05 jszwedko

Related:

  • https://github.com/vectordotdev/vector/issues/1407

jszwedko avatar May 29 '24 21:05 jszwedko

I think there may be some, understandable, confusion here. The batch settings for the aws_kinesis sink correspond to how the events are batched into AWS API PutRecords calls. That is, if you the batch size is set to 500, it'll send 500 records in a single request. However, each event is always a single Kinesis record. It seems like you might be expecting it to batch within a Kinesis record.

Can you share more about your use-case for putting multiple events into a single Kinesis record?

hi! im not trying to batch them into a single kinesis event. The line i added in the vector source indicates that the putRecords only put 1 vector event instead of 500.

async fn send(
        &self,
        records: Vec<Self::T>,
        stream_name: String,
    ) -> Result<KinesisResponse, SdkError<Self::E, HttpResponse>> {
        let rec_count = records.len();
        let total_size = records
            .iter()
            .fold(0, |acc, record| acc + record.data().as_ref().len());

        println!("Sending {} records to stream {}. size: {}", rec_count, stream_name, total_size); // Debugging line

image

i think that the issue might be in this function: batched_partitioned in the file: vector/src/sinks/aws_kinesis/sink.rs. but i didn't find the definition of it anywhere. reminder that when i modified vector code to use a constant partition key, the batching works, i assume that batched_partitioned function uses the partition key for batching when it shouldn't.

when using:

 fn gen_partition_key(number_of_shards) -> String {
    "fixed_partition_key".to_string()
}

the debugging print gives: image

romkimchi1002 avatar Jun 02 '24 08:06 romkimchi1002

Oh I see, thanks for the additional detail @romkimchi1002 ! It does look like there is a bug here then; likely with the partition key as you noted.

jszwedko avatar Jun 03 '24 16:06 jszwedko

thanks!

romkimchi1002 avatar Jun 04 '24 08:06 romkimchi1002

I hereby confirm that this problem also exists for the aws_kinesis_firehose sink. We were able to work around it, by selecting a (locally) constant field as partition_key_field:

[sinks.firehose_al]
  type = "aws_kinesis_firehose"
  inputs = ["input"]
  stream_name = "firehose-stream-name"
  encoding.codec = "json"
  partition_key_field = "hostname" # Workaround for https://github.com/vectordotdev/vector/issues/20575

steven-aerts avatar Jun 12 '24 07:06 steven-aerts

With the submission of #20653 I think this issue can now be closed.

steven-aerts avatar Jul 04 '24 03:07 steven-aerts

Agreed, thanks @steven-aerts . Closed by #20653

jszwedko avatar Jul 08 '24 13:07 jszwedko

@jszwedko

I think there may be some, understandable, confusion here. The batch settings for the aws_kinesis sink correspond to how the events are batched into AWS API PutRecords calls. That is, if you the batch size is set to 500, it'll send 500 records in a single request. However, each event is always a single Kinesis record. It seems like you might be expecting it to batch within a Kinesis record. Can you share more about your use-case for putting multiple events into a single Kinesis record?

I’m looking into this right now. The issue is about sending multiple events in a single Kinesis Record. The reason this matters is that Kinesis pricing is based on record size, with a minimum billable size of 5KB per record. If you send 10 events that are each 1KB in size separately, you’ll be charged for 50KB in total. However, if you batch 5 events into a single record, you would only be charged for 10KB.

ref: https://aws.amazon.com/firehose/pricing/?nc1=h_ls

jesseha-aiderx avatar Mar 07 '25 07:03 jesseha-aiderx

Hi @jesseha-aiderx, a combination of the reduce & remap can be used to address this. The alternative would be a more complex batch setup at the sink but that would require some thought and a PR. So I would encourage you to experiment with reduce.

pront avatar Mar 07 '25 16:03 pront

@pront You’re right. I think I can do with remap and reduce. It would be even better if the sink provided this functionality, though. Haha. I'll try it. Thank you for your reply!

jesseha-aiderx avatar Mar 08 '25 07:03 jesseha-aiderx