aws-doc-sdk-examples
aws-doc-sdk-examples copied to clipboard
[Epic] - Kinesis Firehose (feature scenario)
Spec for Using Kinesis Firehose
The following spec is to be used to implement this workflow in the AWS SDK's:
- [x] #6463
- [ ] #6464
- [ ] #6465
Spec overview
Create a robust, production-grade scrip demonstrating how to use Amazon Kinesis Data Firehose to put individual records (PutRecord) and batches of records (PutRecordBatch) to a delivery stream.
Pre-requisites
- AWS Account: Ensure you have an active AWS account.
- AWS CLI Configured: Configure AWS CLI with appropriate permissions to access Kinesis Firehose.
- Setup: Follow the below instructions for setting up required AWS resources.
Setup
- Deploy Kinesis Firehose Delivery Stream & required resources:
- Use the
kinesis-firehose/resources/cdkproject within this directory to deploy the following required resources:- Kinesis Firehose delivery stream
- S3 bucket
- IAM role
- Use the
- Create a sample file of 2,550 records:
- Use the
resources/mock_data.pyscript to create mock web traffic data
Script Specification
1. Setup AWS SDK
- Initialize the AWS SDK in your chosen language.
- Ensure the necessary permissions are set up (IAM roles/policies) to allow interaction with the Firehose service.
2. Define Configuration Parameters
- Define configuration parameters such as
delivery_stream_name, region, batch size, and logging configurations.
3. PutRecord API
- Function to put a single record into the Firehose stream.
- Input: Data record (e.g., a JSON object or plain text).
- Output: Response from Firehose service indicating success or failure.
- Error Handling: Implement retries with exponential back-off and jitter.
4. PutRecordBatch API
- Function to put a batch of records into the Firehose stream.
- Input: List of data records.
- Output: Response from Firehose service indicating success or failure for each record.
- Error Handling: Implement retries with exponential back-off and jitter.
- Pagination: Handle large batches by splitting them into smaller chunks.
- Batch Sizing: Optimize batch size to balance throughput and cost, considering the hard limits (500 records or 4MB per request).
5. Logging
- Implement structured logging to capture the success and failure of API calls.
- Log retries, exceptions, batch operations, and any critical information for debugging and monitoring.
- Provide real-time updates on the script's activity.
6. Monitoring Metrics
- Check
IncomingBytesandIncomingRecordsmetrics to ensure there is incoming traffic. - Monitor
FailedPutCountfor batch operations.
Example Flow
-
Initialize SDK and Set Configuration Parameters:
- Configure the AWS SDK with access keys and region.
- Define the delivery stream name.
- Set up logging configurations.
-
PutRecord Function:
- Prepare a data record.
- Use
PutRecordAPI to send the data to the delivery stream. - Implement retries with exponential back-off and jitter for transient errors.
- Log the result of the operation.
- Handle and log exceptions.
-
PutRecordBatch Function:
- Prepare a batch of data records.
- Use
PutRecordBatchAPI to send the data batch to the delivery stream. - Implement retries with exponential back-off and jitter for transient errors.
- Handle large batches by splitting them into smaller chunks.
- Optimize batch size to balance throughput and cost, considering the hard limits (500 records or 4MB per request).
- Log the result of each operation and check
FailedPutCountin the response. - Handle and log exceptions.
-
Monitor Metrics:
- Check
IncomingBytesandIncomingRecordsto ensure incoming traffic. - Continuously monitor
FailedPutCountfor batch operations.
- Check
Example Pseudocode
import boto3
import logging
import time
import random
# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize the Kinesis Firehose client
firehose_client = boto3.client('firehose', region_name='us-east-1')
# Define the delivery stream name
delivery_stream_name = 'your-delivery-stream-name'
# Batch size configuration
max_batch_size_records = 500
max_batch_size_bytes = 4 * 1024 * 1024 # 4MB
def exponential_backoff_with_jitter(retry_count):
base_delay = 0.1 # Base delay in seconds
max_delay = 2.0 # Maximum delay in seconds
delay = min(max_delay, base_delay * (2 ** retry_count))
jitter = random.uniform(0, delay)
return delay + jitter
def put_record(data):
data_bytes = data.encode('utf-8')
max_retries = 5
for retry in range(max_retries):
try:
response = firehose_client.put_record(
DeliveryStreamName=delivery_stream_name,
Record={'Data': data_bytes}
)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
logger.info("Record successfully sent")
return
except Exception as e:
logger.error(f"Failed to send record: {e}")
time.sleep(exponential_backoff_with_jitter(retry))
logger.error("Max retries reached. Failed to send record")
def put_record_batch(data_batch):
records = [{'Data': data.encode('utf-8')} for data in data_batch]
max_retries = 5
for retry in range(max_retries):
try:
response = firehose_client.put_record_batch(
DeliveryStreamName=delivery_stream_name,
Records=records
)
failed_count = response['FailedPutCount']
if failed_count > 0:
logger.warning(f"Failed to send {failed_count} records")
for idx, record_response in enumerate(response['RequestResponses']):
if 'ErrorCode' in record_response:
logger.error(f"Record {idx} failed: {record_response['ErrorMessage']}")
else:
logger.info("All records successfully sent")
return
except Exception as e:
logger.error(f"Failed to send record batch: {e}")
time.sleep(exponential_backoff_with_jitter(retry))
logger.error("Max retries reached. Failed to send record batch")
# Example usage
put_record("Single record data")
def put_records_in_batches(data_records):
total_records = len(data_records)
total_bytes = sum(len(data.encode('utf-8')) for data in data_records)
if total_records > max_batch_size_records or total_bytes > max_batch_size_bytes:
logger.info("Splitting data into smaller batches")
for i in range(0, total_records, max_batch_size_records):
put_record_batch(data_records[i:i + max_batch_size_records])
else:
put_record_batch(data_records)
# Example batch usage
data_batch = ["Batch record data 1", "Batch record data 2", ...]
put_records_in_batches(data_batch)
def monitor_metrics():
cloudwatch = boto3.client('cloudwatch', region_name='us-east-1')
metrics = ['IncomingBytes', 'IncomingRecords']
for metric in metrics:
response = cloudwatch.get_metric_statistics(
Namespace='AWS/Firehose',
MetricName=metric,
Dimensions=[{'Name': 'DeliveryStreamName', 'Value': delivery_stream_name}],
StartTime=time.time() - 600, # last 10 minutes
EndTime=time.time(),
Period=60,
Statistics=['Sum']
)
for point in response['Datapoints']:
logger.info(f"{metric} - {point['Timestamp']}: {point['Sum']}")
# Monitor metrics
monitor_metrics()
Batch Sizing Considerations
- Hard Limits: The
PutRecordBatchAPI has a hard limit of 500 records or 4MB per request. - Throughput: Larger batches improve throughput but may increase latency.
- Cost: Smaller batches may lead to higher costs due to more frequent API calls.
- Optimization: Adjust the batch size to find the best balance between throughput, latency, and cost. Monitor and adjust based on the specific workload and requirements.
Notes
- Error Handling: Comprehensive error handling with retries and exponential back-off with jitter.
- Logging: Structured and informative logging for real-time monitoring and debugging.
- Pagination: Handle large data batches by splitting them into smaller chunks.
- Monitoring: Regularly check and log metrics to ensure the system is functioning correctly.