localstack icon indicating copy to clipboard operation
localstack copied to clipboard

enhancement request: Support for Firehose Dynamic Partitioning for S3 prefixes

Open kbret1 opened this issue 1 year ago • 4 comments

Is there an existing issue for this?

  • [X] I have searched the existing issues

Enhancement description

AWS supports Firehose dynamic partitioning of S3 prefixes by using keys in the streaming data. https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html

Examples: I noticed integration tests in github referencing partitionKeyFromQuery for s3Prefix using JQ. https://github.com/localstack/localstack/blob/433de41113d1a6d7780435d87a3d8cd649f9b0f4/tests/integration/test_firehose.py https://github.com/localstack/localstack/blob/14ff4cce2a84e3885a1a293092f75fd533ac537f/tests/integration/templates/firehose_kinesis_as_source.yaml

The following Terraform example uses prefix and error_output_prefix that should generate a file on S3 like: s3:///data/account_id=1234/year=2023/month=04/day=01/hour=22/ or s3:///error/year=2023/month=04/day=01/hour=22//

resource "aws_kinesis_firehose_delivery_stream" "extended_s3_stream" {
  name        = "event-datalake-stream"
  destination = "extended_s3"

  extended_s3_configuration {  
    role_arn   = aws_iam_role.firehose_role.arn
    bucket_arn = aws_s3_bucket.bucket.arn
    buffer_size = 1
    buffer_interval = 60

      # https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
    dynamic_partitioning_configuration {
      enabled = "true"
    }

    # Example prefix using partitionKeyFromQuery, applicable to JQ processor
    prefix              = "data/account_id=!{partitionKeyFromQuery:account_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
    #prefix              = "data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
    error_output_prefix = "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/"

    processing_configuration {
      enabled = "true"

      # Multi-record deaggregation processor example
      processors {
        type = "RecordDeAggregation"
        parameters {
          parameter_name  = "SubRecordType"
          parameter_value = "JSON"
        }
      }

      # New line delimiter processor example
      processors {
        type = "AppendDelimiterToRecord"
      }

      # JQ processor example
      processors {
        type = "MetadataExtraction"
        parameters {
          parameter_name  = "JsonParsingEngine"
          parameter_value = "JQ-1.6"
        }
        parameters {
          parameter_name  = "MetadataExtractionQuery"
          parameter_value = "{account_id:.detail.metadata.resource | match('./account/(.*?)/').captures[0].string}"
          }
      }
    }
  }
}  

🧑‍💻 Implementation

No response

Anything else?

https://aws.amazon.com/blogs/big-data/kinesis-data-firehose-now-supports-dynamic-partitioning-to-amazon-s3/ https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html

kbret1 avatar Apr 19 '23 12:04 kbret1

I also want to use Dynamic Partition for ExtendedS3DestinationConfiguration, however when I put record to firehose firehose.provider reports Unsupported Firehose processor type 'MetadataExtraction'.

localstack-localstack-1  | 2023-08-08T01:53:47.892  WARN --- [   asgi_gw_1] l.s.firehose.provider      : Unsupported Firehose processor type 'MetadataExtraction'
localstack-localstack-1  | 2023-08-08T01:53:47.902  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS firehose.PutRecord => 200

The warning happen on _preprocess_records and it indicate only supported ProcessorType is Lambda, not supporting firehose built-in dynamic partition MetadataExtraction. https://github.com/localstack/localstack/blob/6969bcbf31716576bd62b0ec743d8055ada4740b/localstack/services/firehose/provider.py#L690

Looking into test test_delivery_stream_with_kinesis_as_source (test_firehoset.py) creating MetadataExtraction enabled firehose but it seems not putting record, so issue won't happen in test .

https://github.com/localstack/localstack/blob/6969bcbf31716576bd62b0ec743d8055ada4740b/tests/aws/test_firehose.py#L366-L385

Is there any plan to support Dynamic partition for S3 destination firehose? It's great feature to reduce using Lambda for just an extraction so that localstack is extremely useful.

guitarrapc avatar Aug 08 '23 02:08 guitarrapc

Hi. I am also interested in using Dynamic Partitioning in localstack. Any builds supporting this yet?

freywaid avatar Aug 09 '23 17:08 freywaid

Might be partially related here, but even without the support for dynamic partitioning the timestamp and firehose error output type prefixes should work. I have tested this on an AWS account and it does work as expected. Here is an example terraform config: https://github.com/sdabhi23/streaming-data-pipeline/blob/main/terraform/kinesis_firehose.tf

Does this require a separate issue? I am not sure

sdabhi23 avatar Mar 23 '24 12:03 sdabhi23

just hit the same issue using the following setup in cloudformation to reproduce ..

Resources:
  RawDLKDataStagingBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: raw-dlk-data-staging

  FRTAppPlatformBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: frt-app-platform

  AppEuropeTestEntityCRUDTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: app-europe-test-entity-crud-v1

  FirehoseRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service: "firehose.amazonaws.com"
            Action: "sts:AssumeRole"
      Policies:
        - PolicyName: "FirehosePolicy"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action:
                  - "s3:PutObject"
                  - "s3:GetBucketLocation"
                  - "s3:GetObject"
                  - "s3:ListBucket"
                Resource:
                  - !GetAtt RawDLKDataStagingBucket.Arn
                  - !Join
                    - ""
                    - - !GetAtt RawDLKDataStagingBucket.Arn
                      - "/*"

  KinesisDataFirehose:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: my-firehose-delivery-stream
      ExtendedS3DestinationConfiguration:
        DynamicPartitioningConfiguration:
          Enabled: true
        BucketARN: !GetAtt RawDLKDataStagingBucket.Arn
        RoleARN: !GetAtt FirehoseRole.Arn
        Prefix: crud-events/!{partitionKeyFromQuery:entityType}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/
        BufferingHints:
          SizeInMBs: 5
          IntervalInSeconds: 300
        CompressionFormat: UNCOMPRESSED
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Parameters: [ ]
              Type: AppendDelimiterToRecord
            - Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{entityType:.entityType}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
              Type: MetadataExtraction


  FirehoseToSNSSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      TopicArn: !Ref APPEuropeTestEntityCRUDTopic
      Protocol: firehose
      Endpoint: !GetAtt KinesisDataFirehose.Arn

you can deploy using the following init script

#!/bin/bash

DIRNAME=$(dirname "$0")
PWD=$(pwd)
STACK_NAME="my-app"
TEMPLATE_FILE="$DIRNAME/cloudformation.yml"
LOCALSTACK_ENDPOINT="http://localhost:4566"
AWS_REGION="eu-central-1"

echo "initializing localstack venn-platform with variables:"
echo "DIRNAME            : $DIRNAME"
echo "PWD                : $PWD"
echo "STACK_NAME         : $STACK_NAME"
echo "TEMPLATE_FILE      : $TEMPLATE_FILE"
echo "LOCALSTACK_ENDPOINT: $LOCALSTACK_ENDPOINT"
echo "AWS_REGION         : $AWS_REGION"

# Deploy the CloudFormation stack
echo "Deploying CloudFormation stack: $STACK_NAME"
aws cloudformation deploy \
    --stack-name "$STACK_NAME" \
    --template-file "$TEMPLATE_FILE" \
    --capabilities CAPABILITY_IAM \
    --endpoint-url "$LOCALSTACK_ENDPOINT" \
    --region "$AWS_REGION" \
    --output text

# Describe the status of the deployment
echo "Describing status of CloudFormation stack: $STACK_NAME"
aws cloudformation describe-stacks --stack-name "$STACK_NAME" \
    --endpoint-url "$LOCALSTACK_ENDPOINT" \
    --region "$AWS_REGION" \
    --output text


arthurvaverko avatar May 03 '24 11:05 arthurvaverko