feat(kinesisfirehose): add built-in data processors to decompress CloudWatch logs and extract messages
Issue # (if applicable)
Closes #33691. Closes #20242.
Reason for this change
Data Firehose supports following convenient conversions:
- Decompress CloudWatch logs
- Extract message after decompression of CloudWatch Logs
- Add a new line delimiter when delivering data to Amazon S3
This PR adds above 3 processors because:
- Decompression (without CloudWatchLogProcessing) may need AppendDelimiterToRecord
- CloudWatchLogProcessing requires Decompression
Description of changes
- introduce
processors(plural) prop to specify multiple processors - deprecate
processor(singular) prop in favor ofprocessors - add built-in processor classes:
DecompressionProcessor- Decompress CloudWatch logsCloudWatchLogProcessingProcessor- Extract message after decompression of CloudWatch LogsAppendDelimiterToRecordProcessor- Add a new line delimiter when delivering data to Amazon S3
Describe any new or updated permissions being added
N/A - No permissions are required by these built-in processors.
Description of how you validated changes
Added unit tests and integ tests
see also https://github.com/aws/aws-cdk/issues/33691#issuecomment-2713012245
Checklist
- [x] My code adheres to the CONTRIBUTING GUIDE and DESIGN GUIDELINES
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 82.39%. Comparing base (
be383a9) to head (a4d86ac).
Additional details and impacted files
@@ Coverage Diff @@
## main #33749 +/- ##
=======================================
Coverage 82.39% 82.39%
=======================================
Files 120 120
Lines 6960 6960
Branches 1175 1175
=======================================
Hits 5735 5735
Misses 1120 1120
Partials 105 105
| Flag | Coverage Δ | |
|---|---|---|
| suite.unit | 82.39% <ø> (ø) |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Components | Coverage Δ | |
|---|---|---|
| packages/aws-cdk | ∅ <ø> (∅) |
|
| packages/aws-cdk-lib/core | 82.39% <ø> (ø) |
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
- :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.
Clarification Request
Compatibility check failed due:
err - IFACE aws-cdk-lib.aws_kinesisfirehose.DataProcessorConfig: formerly required property 'processorIdentifier' is optional: returned from aws-cdk-lib.aws_kinesisfirehose.LambdaFunctionProcessor.bind (...and 1 more...) [weakened:aws-cdk-lib.aws_kinesisfirehose.DataProcessorConfig]
processorIdentifier was used to supply the ARN of lambda function.
But built-in processors requires no identifiers but own parameters.
processorIdentifier is not used now to avoid changing the order of parameters of LambdaFunctionProcessor.
- Is it acceptable to add above exception to
allowed-breaking-changes.txt? - If not, is there any other solution?
Hi @Tietew, since this is a stable library we cannot accept any breaking changes. I think the best thing to do is deprecate the property in the interface and then just return some empty object or undefined and update the logic in the renderDataProcessor method https://github.com/aws/aws-cdk/blob/734ca662c82cbedf06393d223a870381ad55520c/packages/aws-cdk-lib/aws-kinesisfirehose/lib/private/helpers.ts#L113 to ignore this case if the processIdentifier is empty and ignore that case. Let me know what you think.
@paulhcsun Thank you for comment. I'll update to return empty duumy object instead of undefined.
snapshot change is caused by #33777
I just tried using AppendDelimiterToRecord by copying the class into my own project. If a delivery stream is created with this as a processor, the deployment will fail as it doesn't accept the empty processorIdentifier.parameterValue.
As a result, I had to comment that out and then cast the return value, which was accepted. So it seems that the definition of DataProcessorConfig may be incorrect in requiring the parameter. The validation of the parameterValue appears to be non-empty, non-space character.
export class AppendDelimiterToRecordProcessor implements IDataProcessor {
public readonly props: DataProcessorProps = {};
constructor() {}
bind(_scope: Construct, _options: DataProcessorBindOptions): DataProcessorConfig {
return {
processorType: 'AppendDelimiterToRecord',
// processorIdentifier: { parameterName: '', parameterValue: '' },
} as DataProcessorConfig;
}
}
If done with the level 1 construct, this works:
processors: [
{
type: 'AppendDelimiterToRecord',
}
],
So it seems that the definition of DataProcessorConfig may be incorrect in requiring the parameter.
Agreed, but we cannot remove it or change it to optional due to unallowed breaking change. See also my clarification request https://github.com/aws/aws-cdk/pull/33749#issuecomment-2717693038 and @paulhcsun's reply https://github.com/aws/aws-cdk/pull/33749#issuecomment-2725904933
You can use the following escape hatch as workaround:
const stream = new firehose.DeliveryStream(...);
(stream.node.defaultChild as firehose.CfnDeliveryStream).addPropertyOverride('ExtendedS3DestinationConfiguration.ProcessingConfiguration', {
Enabled: true,
Processors: [{ Type: 'AppendDelimiterToRecord' }],
});
To avoid BC, have you considered the Interface Extension with Type Guard Pattern?
Keep the interface unchanged but use type guards to detect extended configs:
// Keep original interface unchanged
export interface DataProcessorConfig {
readonly processorType: string;
readonly processorIdentifier: DataProcessorIdentifier;
}
// Create extended interface
export interface ExtendedDataProcessorConfig extends DataProcessorConfig {
readonly parameters: CfnDeliveryStream.ProcessorParameterProperty[];
}
// Type guard function
function isExtendedConfig(config: DataProcessorConfig): config is ExtendedDataProcessorConfig {
return 'parameters' in config;
}
// In the helper function
function renderDataProcessor(processor: IDataProcessor, scope: Construct, role: iam.IRole) {
const processorConfig = processor.bind(scope, { role });
// Use type guard to check for extended config
if (isExtendedConfig(processorConfig)) {
return {
type: processorConfig.processorType,
parameters: processorConfig.parameters,
};
}
// Legacy path unchanged
const parameters = [{ parameterName: 'RoleArn', parameterValue: role.roleArn }];
parameters.push(processorConfig.processorIdentifier);
// ... rest of existing logic
}
Another option is Optional Property Pattern
Add optional properties to the existing interface:
export interface DataProcessorConfig {
readonly processorType: string;
readonly processorIdentifier: DataProcessorIdentifier;
// Add new optional properties - not breaking!
readonly parameters?: CfnDeliveryStream.ProcessorParameterProperty[];
readonly useDirectParameters?: boolean;
}
// Built-in processors can use the new properties
export class DecompressionProcessor implements IDataProcessor {
bind(_scope: Construct, _options: DataProcessorBindOptions): DataProcessorConfig {
return {
processorType: 'Decompression',
processorIdentifier: { parameterName: '', parameterValue: '' }, // Dummy for compatibility
parameters: [
{ parameterName: 'CompressionFormat', parameterValue: 'GZIP' },
],
useDirectParameters: true,
};
}
}
@pahud Thanks for your suggestion. I'm thinking about it.
aws-cdk-lib: error JSII5003: "aws-cdk-lib.aws_kinesisfirehose.AppendDelimiterToRecordProcessor#bind" changes the return type to "aws-cdk-lib.aws_kinesisfirehose.ExtendedDataProcessorConfig" when implementing aws-cdk-lib.aws_kinesisfirehose.IDataProcessor. Change it to "aws-cdk-lib.aws_kinesisfirehose.DataProcessorConfig"
JSII rejected to return subclass from bind().
AWS CodeBuild CI Report
- CodeBuild project: AutoBuildv2Project1C6BFA3F-wQm2hXv2jqQv
- Commit ID: 3b517854b11ab6077a7481432fe8d0065de84316
- Result: SUCCEEDED
- Build Logs (available for 30 days)
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository
LGTM. Thanks!
Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).
This pull request has been removed from the queue for the following reason: pull request branch update failed.
The pull request can't be updated
rebase conflict between base and head.
You should update or rebase your pull request manually. If you do, this pull request will automatically be requeued once the queue conditions match again.
If you think this was a flaky issue, you can requeue the pull request, without updating it, by posting a @mergifyio requeue comment.
@mergifyio update
update
✅ Branch has been successfully updated
Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).
This pull request has been removed from the queue for the following reason: pull request branch update failed.
The pull request can't be updated
rebase conflict between base and head.
You should update or rebase your pull request manually. If you do, this pull request will automatically be requeued once the queue conditions match again.
If you think this was a flaky issue, you can requeue the pull request, without updating it, by posting a @mergifyio requeue comment.
@mergifyio update
update
✅ Branch has been successfully updated
Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).
Comments on closed issues and PRs are hard for our team to see. If you need help, please open a new issue that references this one.