aws-cdk icon indicating copy to clipboard operation
aws-cdk copied to clipboard

feat(kinesisfirehose): add built-in data processors to decompress CloudWatch logs and extract messages

Open Tietew opened this issue 9 months ago • 8 comments

Issue # (if applicable)

Closes #33691. Closes #20242.

Reason for this change

Data Firehose supports following convenient conversions:

This PR adds above 3 processors because:

  • Decompression (without CloudWatchLogProcessing) may need AppendDelimiterToRecord
  • CloudWatchLogProcessing requires Decompression

Description of changes

  • introduce processors (plural) prop to specify multiple processors
  • deprecate processor (singular) prop in favor of processors
  • add built-in processor classes:
    • DecompressionProcessor - Decompress CloudWatch logs
    • CloudWatchLogProcessingProcessor - Extract message after decompression of CloudWatch Logs
    • AppendDelimiterToRecordProcessor - Add a new line delimiter when delivering data to Amazon S3

Describe any new or updated permissions being added

N/A - No permissions are required by these built-in processors.

Description of how you validated changes

Added unit tests and integ tests

see also https://github.com/aws/aws-cdk/issues/33691#issuecomment-2713012245

Checklist


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license

Tietew avatar Mar 12 '25 11:03 Tietew

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 82.39%. Comparing base (be383a9) to head (a4d86ac).

Additional details and impacted files
@@           Coverage Diff           @@
##             main   #33749   +/-   ##
=======================================
  Coverage   82.39%   82.39%           
=======================================
  Files         120      120           
  Lines        6960     6960           
  Branches     1175     1175           
=======================================
  Hits         5735     5735           
  Misses       1120     1120           
  Partials      105      105           
Flag Coverage Δ
suite.unit 82.39% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
packages/aws-cdk ∅ <ø> (∅)
packages/aws-cdk-lib/core 82.39% <ø> (ø)
:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

codecov[bot] avatar Mar 12 '25 11:03 codecov[bot]

Clarification Request

Compatibility check failed due:

err - IFACE aws-cdk-lib.aws_kinesisfirehose.DataProcessorConfig: formerly required property 'processorIdentifier' is optional: returned from aws-cdk-lib.aws_kinesisfirehose.LambdaFunctionProcessor.bind (...and 1 more...) [weakened:aws-cdk-lib.aws_kinesisfirehose.DataProcessorConfig]

processorIdentifier was used to supply the ARN of lambda function. But built-in processors requires no identifiers but own parameters.

processorIdentifier is not used now to avoid changing the order of parameters of LambdaFunctionProcessor.

  • Is it acceptable to add above exception to allowed-breaking-changes.txt?
  • If not, is there any other solution?

Tietew avatar Mar 12 '25 12:03 Tietew

Hi @Tietew, since this is a stable library we cannot accept any breaking changes. I think the best thing to do is deprecate the property in the interface and then just return some empty object or undefined and update the logic in the renderDataProcessor method https://github.com/aws/aws-cdk/blob/734ca662c82cbedf06393d223a870381ad55520c/packages/aws-cdk-lib/aws-kinesisfirehose/lib/private/helpers.ts#L113 to ignore this case if the processIdentifier is empty and ignore that case. Let me know what you think.

paulhcsun avatar Mar 14 '25 22:03 paulhcsun

@paulhcsun Thank you for comment. I'll update to return empty duumy object instead of undefined.

Tietew avatar Mar 15 '25 15:03 Tietew

snapshot change is caused by #33777

Tietew avatar Mar 15 '25 16:03 Tietew

I just tried using AppendDelimiterToRecord by copying the class into my own project. If a delivery stream is created with this as a processor, the deployment will fail as it doesn't accept the empty processorIdentifier.parameterValue.

As a result, I had to comment that out and then cast the return value, which was accepted. So it seems that the definition of DataProcessorConfig may be incorrect in requiring the parameter. The validation of the parameterValue appears to be non-empty, non-space character.

export class AppendDelimiterToRecordProcessor implements IDataProcessor {
  public readonly props: DataProcessorProps = {};

  constructor() {}

  bind(_scope: Construct, _options: DataProcessorBindOptions): DataProcessorConfig {
    return {
      processorType: 'AppendDelimiterToRecord',
      // processorIdentifier: { parameterName: '', parameterValue: '' },
    } as DataProcessorConfig;
  }
}

If done with the level 1 construct, this works:

          processors: [
            {
              type: 'AppendDelimiterToRecord',
            }
          ],

dmeehan1968 avatar Mar 17 '25 20:03 dmeehan1968

So it seems that the definition of DataProcessorConfig may be incorrect in requiring the parameter.

Agreed, but we cannot remove it or change it to optional due to unallowed breaking change. See also my clarification request https://github.com/aws/aws-cdk/pull/33749#issuecomment-2717693038 and @paulhcsun's reply https://github.com/aws/aws-cdk/pull/33749#issuecomment-2725904933

You can use the following escape hatch as workaround:

const stream = new firehose.DeliveryStream(...);
(stream.node.defaultChild as firehose.CfnDeliveryStream).addPropertyOverride('ExtendedS3DestinationConfiguration.ProcessingConfiguration', {
  Enabled: true,
  Processors: [{ Type: 'AppendDelimiterToRecord' }],
});

Tietew avatar Mar 18 '25 01:03 Tietew

To avoid BC, have you considered the Interface Extension with Type Guard Pattern?

Keep the interface unchanged but use type guards to detect extended configs:

// Keep original interface unchanged
export interface DataProcessorConfig {
  readonly processorType: string;
  readonly processorIdentifier: DataProcessorIdentifier;
}

// Create extended interface
export interface ExtendedDataProcessorConfig extends DataProcessorConfig {
  readonly parameters: CfnDeliveryStream.ProcessorParameterProperty[];
}

// Type guard function
function isExtendedConfig(config: DataProcessorConfig): config is ExtendedDataProcessorConfig {
  return 'parameters' in config;
}

// In the helper function
function renderDataProcessor(processor: IDataProcessor, scope: Construct, role: iam.IRole) {
  const processorConfig = processor.bind(scope, { role });

  // Use type guard to check for extended config
  if (isExtendedConfig(processorConfig)) {
    return {
      type: processorConfig.processorType,
      parameters: processorConfig.parameters,
    };
  }

  // Legacy path unchanged
  const parameters = [{ parameterName: 'RoleArn', parameterValue: role.roleArn }];
  parameters.push(processorConfig.processorIdentifier);
  // ... rest of existing logic
}

Another option is Optional Property Pattern

Add optional properties to the existing interface:

export interface DataProcessorConfig {
  readonly processorType: string;
  readonly processorIdentifier: DataProcessorIdentifier;

  // Add new optional properties - not breaking!
  readonly parameters?: CfnDeliveryStream.ProcessorParameterProperty[];
  readonly useDirectParameters?: boolean;
}

// Built-in processors can use the new properties
export class DecompressionProcessor implements IDataProcessor {
  bind(_scope: Construct, _options: DataProcessorBindOptions): DataProcessorConfig {
    return {
      processorType: 'Decompression',
      processorIdentifier: { parameterName: '', parameterValue: '' }, // Dummy for compatibility
      parameters: [
        { parameterName: 'CompressionFormat', parameterValue: 'GZIP' },
      ],
      useDirectParameters: true,
    };
  }
}

pahud avatar Aug 04 '25 23:08 pahud

@pahud Thanks for your suggestion. I'm thinking about it.

Tietew avatar Aug 05 '25 02:08 Tietew

aws-cdk-lib: error JSII5003: "aws-cdk-lib.aws_kinesisfirehose.AppendDelimiterToRecordProcessor#bind" changes the return type to "aws-cdk-lib.aws_kinesisfirehose.ExtendedDataProcessorConfig" when implementing aws-cdk-lib.aws_kinesisfirehose.IDataProcessor. Change it to "aws-cdk-lib.aws_kinesisfirehose.DataProcessorConfig"

JSII rejected to return subclass from bind().

Tietew avatar Aug 06 '25 03:08 Tietew

AWS CodeBuild CI Report

  • CodeBuild project: AutoBuildv2Project1C6BFA3F-wQm2hXv2jqQv
  • Commit ID: 3b517854b11ab6077a7481432fe8d0065de84316
  • Result: SUCCEEDED
  • Build Logs (available for 30 days)

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

aws-cdk-automation avatar Aug 06 '25 04:08 aws-cdk-automation

LGTM. Thanks!

Abogical avatar Oct 30 '25 13:10 Abogical

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

mergify[bot] avatar Oct 30 '25 14:10 mergify[bot]

This pull request has been removed from the queue for the following reason: pull request branch update failed.

The pull request can't be updated

rebase conflict between base and head.

You should update or rebase your pull request manually. If you do, this pull request will automatically be requeued once the queue conditions match again. If you think this was a flaky issue, you can requeue the pull request, without updating it, by posting a @mergifyio requeue comment.

mergify[bot] avatar Oct 30 '25 14:10 mergify[bot]

@mergifyio update

Tietew avatar Oct 30 '25 14:10 Tietew

update

✅ Branch has been successfully updated

mergify[bot] avatar Oct 30 '25 14:10 mergify[bot]

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

mergify[bot] avatar Oct 30 '25 14:10 mergify[bot]

This pull request has been removed from the queue for the following reason: pull request branch update failed.

The pull request can't be updated

rebase conflict between base and head.

You should update or rebase your pull request manually. If you do, this pull request will automatically be requeued once the queue conditions match again. If you think this was a flaky issue, you can requeue the pull request, without updating it, by posting a @mergifyio requeue comment.

mergify[bot] avatar Oct 30 '25 14:10 mergify[bot]

@mergifyio update

Tietew avatar Oct 30 '25 14:10 Tietew

update

✅ Branch has been successfully updated

mergify[bot] avatar Oct 30 '25 14:10 mergify[bot]

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

mergify[bot] avatar Oct 30 '25 15:10 mergify[bot]

Comments on closed issues and PRs are hard for our team to see. If you need help, please open a new issue that references this one.

github-actions[bot] avatar Oct 30 '25 15:10 github-actions[bot]