cloud-custodian icon indicating copy to clipboard operation
cloud-custodian copied to clipboard

Duplicate notify delivery by c7n-mailer in a GCP environment

Open nvarscar opened this issue 2 years ago • 5 comments

Describe the bug

The messages from a PubSub queue are returned in a random order (unless submitted with an ordering key, which they aren't). The mailer expects the messages to be ordered, based on the usage of the last publishTime within the message array to ACK them. Without ordering, however, last doesn't necessarily mean the most recent. What really happens is there is a random chance that the message will be ACKed if it's yonger than the last message in the queue. Alternatively, the message, after ack_deadline, will be NACKed and returned to the queue to be processed in the next cycle, thus causing duplicates.

What did you expect to happen?

The notify messages are delivered exactly once, but they are often delivered multiple times

Cloud Provider

Google Cloud (GCP)

Cloud Custodian version and dependency information

Custodian:   0.9.28
Python:      3.10.10 (main, Feb 16 2023, 02:49:39) [Clang 14.0.0 (clang-1400.0.29.202)]
Platform:    posix.uname_result(sysname='Darwin', nodename='foo.local', release='22.5.0', version='Darwin Kernel Version 22.5.0: Thu Jun  8 22:22:20 PDT 2023; root:xnu-8796.121.3~7/RELEASE_ARM64_T6000', machine='arm64')
Using venv:  True
Docker: False
Installed: 

argcomplete==3.1.1
attrs==23.1.0
boto3==1.26.157
botocore==1.29.157
c7n==0.9.28
cachetools==5.3.1
certifi==2023.5.7
charset-normalizer==3.1.0
docutils==0.18.1
google-api-core==2.11.1
google-api-python-client==2.90.0
google-auth==2.20.0
google-auth-httplib2==0.1.0
google-cloud-appengine-logging==1.3.0
google-cloud-audit-log==0.2.5
google-cloud-core==2.3.2
google-cloud-logging==3.5.0
google-cloud-monitoring==2.15.0
google-cloud-storage==2.9.0
google-crc32c==1.5.0
google-resumable-media==2.5.0
googleapis-common-protos==1.59.1
grpc-google-iam-v1==0.12.6
grpcio==1.54.2
grpcio-status==1.54.2
httplib2==0.22.0
idna==3.4
importlib-metadata==5.2.0
jmespath==1.0.1
jsonschema==4.17.3
proto-plus==1.22.2
protobuf==4.23.3
pyasn1==0.5.0
pyasn1-modules==0.3.0
pyparsing==3.1.0
pyrate-limiter==2.10.0
pyrsistent==0.19.3
python-dateutil==2.8.2
pyyaml==6.0
requests==2.31.0
retrying==1.3.4
rsa==4.9
s3transfer==0.6.1
six==1.16.0
tabulate==0.9.0
typing-extensions==4.6.3
uritemplate==4.1.1
urllib3==1.26.16
zipp==3.15.0

Policy

policies:
 - &label_policy
    name: ensure_gcp_instance_labels
    description: |
      Report resources without labels
    resource: gcp.instance
    filters:
      - <<: *label_filters
    actions:
      - type: notify
        slack_template: slack_alert
        slack_msg_color: danger
        to:
          - "slack://#{slack_channel}"
        transport:
          type: pubsub
          topic: "{topic}"
  - <<: *label_policy
    name: ensure_gcp_instance_template_labels
    resource: gcp.instance-template
... and so on

Relevant log/traceback output

Policy execution:

2023-07-05 21:32:44,776: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_instance_labels matched:54 time:1.11
2023-07-05 21:32:45,310: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_instance_template_labels matched:36 time:1.64
2023-07-05 21:32:46,052: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_disk_labels matched:103 time:2.38
2023-07-05 21:32:46,660: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_pubsub_topic_labels matched:255 time:2.99
2023-07-05 21:33:04,083: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_bq_dataset_labels matched:92 time:20.41
2023-07-05 21:33:04,470: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_cloud_function_labels matched:30 time:20.80
2023-07-05 21:33:04,976: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_cloud_run_service_labels matched:34 time:21.31
2023-07-05 21:33:05,074: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_secret_labels matched:36 time:21.40
2023-07-05 21:33:05,527: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_bucket_labels matched:132 time:21.86



Mailer run 1:

2023-07-05 21:39:38,502 - custodian-mailer - INFO - Downloading messages from the GCP PubSub Subscription.
2023-07-05 21:39:41,739 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_disk_labels gcp.disk:103 slack:slack_alert to custodian-reports
2023-07-05 21:39:42,901 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_instance_labels gcp.instance:54 slack:slack_alert to custodian-reports
2023-07-05 21:39:43,268 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_pubsub_topic_labels gcp.pubsub-topic:255 slack:slack_alert to custodian-reports
2023-07-05 21:39:43,881 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bq_dataset_labels gcp.bq-dataset:92 slack:slack_alert to custodian-reports
2023-07-05 21:39:44,072 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_cloud_function_labels gcp.function:30 slack:slack_alert to custodian-reports
2023-07-05 21:39:44,258 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_cloud_run_service_labels gcp.cloud-run-service:34 slack:slack_alert to custodian-reports
2023-07-05 21:39:44,456 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_secret_labels gcp.secret:36 slack:slack_alert to custodian-reports
2023-07-05 21:39:44,651 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bucket_labels gcp.bucket:132 slack:slack_alert to custodian-reports
2023-07-05 21:39:44,866 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_instance_template_labels gcp.instance-template:36 slack:slack_alert to custodian-reports
2023-07-05 21:39:46,578 - custodian-mailer - INFO - No messages left in the gcp topic subscription, now exiting c7n_mailer.


Mailer run 2:

2023-07-05 21:41:48,049 - custodian-mailer - INFO - Downloading messages from the GCP PubSub Subscription.
2023-07-05 21:41:51,626 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_cloud_run_service_labels gcp.cloud-run-service:34 slack:slack_alert to custodian-reports
2023-07-05 21:41:52,658 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_disk_labels gcp.disk:103 slack:slack_alert to custodian-reports
2023-07-05 21:41:52,856 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_pubsub_topic_labels gcp.pubsub-topic:255 slack:slack_alert to custodian-reports
2023-07-05 21:41:53,462 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bq_dataset_labels gcp.bq-dataset:92 slack:slack_alert to custodian-reports
2023-07-05 21:41:53,662 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_secret_labels gcp.secret:36 slack:slack_alert to custodian-reports
2023-07-05 21:41:53,865 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bucket_labels gcp.bucket:132 slack:slack_alert to custodian-reports
2023-07-05 21:41:54,103 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_cloud_function_labels gcp.function:30 slack:slack_alert to custodian-reports
2023-07-05 21:41:56,105 - custodian-mailer - INFO - No messages left in the gcp topic subscription, now exiting c7n_mailer.


Mailer run 3:

2023-07-05 21:53:39,169 - custodian-mailer - INFO - Downloading messages from the GCP PubSub Subscription.
2023-07-05 21:53:42,847 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_secret_labels gcp.secret:36 slack:slack_alert to custodian-reports
2023-07-05 21:53:43,080 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bucket_labels gcp.bucket:132 slack:slack_alert to custodian-reports
2023-07-05 21:53:43,295 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_cloud_run_service_labels gcp.cloud-run-service:34 slack:slack_alert to custodian-reports
2023-07-05 21:53:45,192 - custodian-mailer - INFO - No messages left in the gcp topic subscription, now exiting c7n_mailer.


Mailer run 4:

2023-07-05 22:17:56,370 - custodian-mailer - INFO - Downloading messages from the GCP PubSub Subscription.
2023-07-05 22:17:59,814 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_secret_labels gcp.secret:36 slack:slack_alert to custodian-reports
2023-07-05 22:18:00,057 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bucket_labels gcp.bucket:132 slack:slack_alert to custodian-reports
2023-07-05 22:18:02,235 - custodian-mailer - INFO - No messages left in the gcp topic subscription, now exiting c7n_mailer.

Extra information or context

Notice the order of the c7n policy run and how it impacts which messages were ACKed by mailer: Run 1 : 9 messages total; mailer finished on policy ensure_gcp_instance_template_labels (n=2), so next run will be 9-2=7 Run 2: 7 messages total; mailer finished on policy ensure_gcp_cloud_function_labels (n=6), so next run will be 9-6=3 Run 3: 3 messages total; mailer finished on policy ensure_gcp_cloud_run_service_labels (n=7), so next run will be 9-7=2 Run 4: 2 messages total; mailer finished on policy ensure_gcp_bucket_labels (n=9), so next run will be 9-9=0

nvarscar avatar Jul 05 '23 23:07 nvarscar

hmm.. we should switch this to using ack by message id, rather than assuming total ordering

kapilt avatar Jul 19 '23 23:07 kapilt

If anyone runs into the same issue as me, the workaround so far is to implement your own QueueProcessor and use it instead of the regular one by duplicating the cli code. At least, I didn't find a way to plug custom classes into the module.

from typing import List
from c7n_mailer.cli import *
from c7n_mailer.gcp_mailer import gcp_queue_processor

class MailerGcpQueueProcessor(gcp_queue_processor.MailerGcpQueueProcessor):
    """Custom processor that acknowledge by IDs instead of datetime"""
    def run(self):
        self.logger.info("Downloading messages from the GCP PubSub Subscription.")

        # Get first set of messages to process
        messages = self.receive_messages()
        while messages and len(messages["receivedMessages"]) > 0:
            # Process received messages
            ack_ids = []
            for message in messages["receivedMessages"]:
                self.process_message(message, message["message"]["publishTime"])
                ack_ids.append(message["ackId"])

            # Acknowledge and purge processed messages then get next set of messages
            if ack_ids:
                self.ack_messages(ack_ids)
            messages = self.receive_messages()

        self.logger.info("No messages left in the gcp topic subscription, now exiting c7n_mailer.")

    def ack_messages(self, ids: List[str]):
        """Acknowledge and Discard messages with specific IDs"""
        return self.client.execute_command(
            "acknowledge", {"subscription": self.subscription, "body": {"ackIds": ids}}
        )

# Copy of the mailer CLI function
def main():
    ....
    if args_dict.get("run"):
        ....
        processor = MailerGcpQueueProcessor(mailer_config, logger)
        ....

nvarscar avatar Nov 16 '23 19:11 nvarscar

@nvarscar could you submit a pull request?

kapilt avatar Nov 16 '23 19:11 kapilt

Last time I couldn't get past the agreement page as the process requires a CLA Manager of the organization to sign it and I don't have any idea who that would be in my organization. I wish I could.

nvarscar avatar Nov 16 '23 19:11 nvarscar