localstack
localstack copied to clipboard
enhancement request: Support for Firehose Dynamic Partitioning for S3 prefixes
Is there an existing issue for this?
- [X] I have searched the existing issues
Enhancement description
AWS supports Firehose dynamic partitioning of S3 prefixes by using keys in the streaming data. https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
Examples: I noticed integration tests in github referencing partitionKeyFromQuery for s3Prefix using JQ. https://github.com/localstack/localstack/blob/433de41113d1a6d7780435d87a3d8cd649f9b0f4/tests/integration/test_firehose.py https://github.com/localstack/localstack/blob/14ff4cce2a84e3885a1a293092f75fd533ac537f/tests/integration/templates/firehose_kinesis_as_source.yaml
The following Terraform example uses prefix and error_output_prefix that should generate a file on S3 like:
s3://
resource "aws_kinesis_firehose_delivery_stream" "extended_s3_stream" {
name = "event-datalake-stream"
destination = "extended_s3"
extended_s3_configuration {
role_arn = aws_iam_role.firehose_role.arn
bucket_arn = aws_s3_bucket.bucket.arn
buffer_size = 1
buffer_interval = 60
# https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
dynamic_partitioning_configuration {
enabled = "true"
}
# Example prefix using partitionKeyFromQuery, applicable to JQ processor
prefix = "data/account_id=!{partitionKeyFromQuery:account_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
#prefix = "data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
error_output_prefix = "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/"
processing_configuration {
enabled = "true"
# Multi-record deaggregation processor example
processors {
type = "RecordDeAggregation"
parameters {
parameter_name = "SubRecordType"
parameter_value = "JSON"
}
}
# New line delimiter processor example
processors {
type = "AppendDelimiterToRecord"
}
# JQ processor example
processors {
type = "MetadataExtraction"
parameters {
parameter_name = "JsonParsingEngine"
parameter_value = "JQ-1.6"
}
parameters {
parameter_name = "MetadataExtractionQuery"
parameter_value = "{account_id:.detail.metadata.resource | match('./account/(.*?)/').captures[0].string}"
}
}
}
}
}
🧑💻 Implementation
No response
Anything else?
https://aws.amazon.com/blogs/big-data/kinesis-data-firehose-now-supports-dynamic-partitioning-to-amazon-s3/ https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
I also want to use Dynamic Partition for ExtendedS3DestinationConfiguration, however when I put record to firehose firehose.provider reports Unsupported Firehose processor type 'MetadataExtraction'
.
localstack-localstack-1 | 2023-08-08T01:53:47.892 WARN --- [ asgi_gw_1] l.s.firehose.provider : Unsupported Firehose processor type 'MetadataExtraction'
localstack-localstack-1 | 2023-08-08T01:53:47.902 INFO --- [ asgi_gw_1] localstack.request.aws : AWS firehose.PutRecord => 200
The warning happen on _preprocess_records
and it indicate only supported ProcessorType is Lambda
, not supporting firehose built-in dynamic partition MetadataExtraction
.
https://github.com/localstack/localstack/blob/6969bcbf31716576bd62b0ec743d8055ada4740b/localstack/services/firehose/provider.py#L690
Looking into test test_delivery_stream_with_kinesis_as_source
(test_firehoset.py) creating MetadataExtraction enabled firehose but it seems not putting record, so issue won't happen in test .
https://github.com/localstack/localstack/blob/6969bcbf31716576bd62b0ec743d8055ada4740b/tests/aws/test_firehose.py#L366-L385
Is there any plan to support Dynamic partition for S3 destination firehose? It's great feature to reduce using Lambda for just an extraction so that localstack is extremely useful.
Hi. I am also interested in using Dynamic Partitioning in localstack. Any builds supporting this yet?
Might be partially related here, but even without the support for dynamic partitioning the timestamp and firehose error output type prefixes should work. I have tested this on an AWS account and it does work as expected. Here is an example terraform config: https://github.com/sdabhi23/streaming-data-pipeline/blob/main/terraform/kinesis_firehose.tf
Does this require a separate issue? I am not sure
just hit the same issue using the following setup in cloudformation to reproduce ..
Resources:
RawDLKDataStagingBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: raw-dlk-data-staging
FRTAppPlatformBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: frt-app-platform
AppEuropeTestEntityCRUDTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: app-europe-test-entity-crud-v1
FirehoseRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service: "firehose.amazonaws.com"
Action: "sts:AssumeRole"
Policies:
- PolicyName: "FirehosePolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action:
- "s3:PutObject"
- "s3:GetBucketLocation"
- "s3:GetObject"
- "s3:ListBucket"
Resource:
- !GetAtt RawDLKDataStagingBucket.Arn
- !Join
- ""
- - !GetAtt RawDLKDataStagingBucket.Arn
- "/*"
KinesisDataFirehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: my-firehose-delivery-stream
ExtendedS3DestinationConfiguration:
DynamicPartitioningConfiguration:
Enabled: true
BucketARN: !GetAtt RawDLKDataStagingBucket.Arn
RoleARN: !GetAtt FirehoseRole.Arn
Prefix: crud-events/!{partitionKeyFromQuery:entityType}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/
BufferingHints:
SizeInMBs: 5
IntervalInSeconds: 300
CompressionFormat: UNCOMPRESSED
ProcessingConfiguration:
Enabled: true
Processors:
- Parameters: [ ]
Type: AppendDelimiterToRecord
- Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{entityType:.entityType}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
Type: MetadataExtraction
FirehoseToSNSSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref APPEuropeTestEntityCRUDTopic
Protocol: firehose
Endpoint: !GetAtt KinesisDataFirehose.Arn
you can deploy using the following init script
#!/bin/bash
DIRNAME=$(dirname "$0")
PWD=$(pwd)
STACK_NAME="my-app"
TEMPLATE_FILE="$DIRNAME/cloudformation.yml"
LOCALSTACK_ENDPOINT="http://localhost:4566"
AWS_REGION="eu-central-1"
echo "initializing localstack venn-platform with variables:"
echo "DIRNAME : $DIRNAME"
echo "PWD : $PWD"
echo "STACK_NAME : $STACK_NAME"
echo "TEMPLATE_FILE : $TEMPLATE_FILE"
echo "LOCALSTACK_ENDPOINT: $LOCALSTACK_ENDPOINT"
echo "AWS_REGION : $AWS_REGION"
# Deploy the CloudFormation stack
echo "Deploying CloudFormation stack: $STACK_NAME"
aws cloudformation deploy \
--stack-name "$STACK_NAME" \
--template-file "$TEMPLATE_FILE" \
--capabilities CAPABILITY_IAM \
--endpoint-url "$LOCALSTACK_ENDPOINT" \
--region "$AWS_REGION" \
--output text
# Describe the status of the deployment
echo "Describing status of CloudFormation stack: $STACK_NAME"
aws cloudformation describe-stacks --stack-name "$STACK_NAME" \
--endpoint-url "$LOCALSTACK_ENDPOINT" \
--region "$AWS_REGION" \
--output text