Duplicate notify delivery by c7n-mailer in a GCP environment
Describe the bug
The messages from a PubSub queue are returned in a random order (unless submitted with an ordering key, which they aren't). The mailer expects the messages to be ordered, based on the usage of the last publishTime within the message array to ACK them. Without ordering, however, last doesn't necessarily mean the most recent. What really happens is there is a random chance that the message will be ACKed if it's yonger than the last message in the queue. Alternatively, the message, after ack_deadline, will be NACKed and returned to the queue to be processed in the next cycle, thus causing duplicates.
What did you expect to happen?
The notify messages are delivered exactly once, but they are often delivered multiple times
Cloud Provider
Google Cloud (GCP)
Cloud Custodian version and dependency information
Custodian: 0.9.28
Python: 3.10.10 (main, Feb 16 2023, 02:49:39) [Clang 14.0.0 (clang-1400.0.29.202)]
Platform: posix.uname_result(sysname='Darwin', nodename='foo.local', release='22.5.0', version='Darwin Kernel Version 22.5.0: Thu Jun 8 22:22:20 PDT 2023; root:xnu-8796.121.3~7/RELEASE_ARM64_T6000', machine='arm64')
Using venv: True
Docker: False
Installed:
argcomplete==3.1.1
attrs==23.1.0
boto3==1.26.157
botocore==1.29.157
c7n==0.9.28
cachetools==5.3.1
certifi==2023.5.7
charset-normalizer==3.1.0
docutils==0.18.1
google-api-core==2.11.1
google-api-python-client==2.90.0
google-auth==2.20.0
google-auth-httplib2==0.1.0
google-cloud-appengine-logging==1.3.0
google-cloud-audit-log==0.2.5
google-cloud-core==2.3.2
google-cloud-logging==3.5.0
google-cloud-monitoring==2.15.0
google-cloud-storage==2.9.0
google-crc32c==1.5.0
google-resumable-media==2.5.0
googleapis-common-protos==1.59.1
grpc-google-iam-v1==0.12.6
grpcio==1.54.2
grpcio-status==1.54.2
httplib2==0.22.0
idna==3.4
importlib-metadata==5.2.0
jmespath==1.0.1
jsonschema==4.17.3
proto-plus==1.22.2
protobuf==4.23.3
pyasn1==0.5.0
pyasn1-modules==0.3.0
pyparsing==3.1.0
pyrate-limiter==2.10.0
pyrsistent==0.19.3
python-dateutil==2.8.2
pyyaml==6.0
requests==2.31.0
retrying==1.3.4
rsa==4.9
s3transfer==0.6.1
six==1.16.0
tabulate==0.9.0
typing-extensions==4.6.3
uritemplate==4.1.1
urllib3==1.26.16
zipp==3.15.0
Policy
policies:
- &label_policy
name: ensure_gcp_instance_labels
description: |
Report resources without labels
resource: gcp.instance
filters:
- <<: *label_filters
actions:
- type: notify
slack_template: slack_alert
slack_msg_color: danger
to:
- "slack://#{slack_channel}"
transport:
type: pubsub
topic: "{topic}"
- <<: *label_policy
name: ensure_gcp_instance_template_labels
resource: gcp.instance-template
... and so on
Relevant log/traceback output
Policy execution:
2023-07-05 21:32:44,776: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_instance_labels matched:54 time:1.11
2023-07-05 21:32:45,310: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_instance_template_labels matched:36 time:1.64
2023-07-05 21:32:46,052: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_disk_labels matched:103 time:2.38
2023-07-05 21:32:46,660: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_pubsub_topic_labels matched:255 time:2.99
2023-07-05 21:33:04,083: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_bq_dataset_labels matched:92 time:20.41
2023-07-05 21:33:04,470: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_cloud_function_labels matched:30 time:20.80
2023-07-05 21:33:04,976: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_cloud_run_service_labels matched:34 time:21.31
2023-07-05 21:33:05,074: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_secret_labels matched:36 time:21.40
2023-07-05 21:33:05,527: c7n_org:INFO Ran account:dev region:global policy:ensure_gcp_bucket_labels matched:132 time:21.86
Mailer run 1:
2023-07-05 21:39:38,502 - custodian-mailer - INFO - Downloading messages from the GCP PubSub Subscription.
2023-07-05 21:39:41,739 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_disk_labels gcp.disk:103 slack:slack_alert to custodian-reports
2023-07-05 21:39:42,901 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_instance_labels gcp.instance:54 slack:slack_alert to custodian-reports
2023-07-05 21:39:43,268 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_pubsub_topic_labels gcp.pubsub-topic:255 slack:slack_alert to custodian-reports
2023-07-05 21:39:43,881 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bq_dataset_labels gcp.bq-dataset:92 slack:slack_alert to custodian-reports
2023-07-05 21:39:44,072 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_cloud_function_labels gcp.function:30 slack:slack_alert to custodian-reports
2023-07-05 21:39:44,258 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_cloud_run_service_labels gcp.cloud-run-service:34 slack:slack_alert to custodian-reports
2023-07-05 21:39:44,456 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_secret_labels gcp.secret:36 slack:slack_alert to custodian-reports
2023-07-05 21:39:44,651 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bucket_labels gcp.bucket:132 slack:slack_alert to custodian-reports
2023-07-05 21:39:44,866 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_instance_template_labels gcp.instance-template:36 slack:slack_alert to custodian-reports
2023-07-05 21:39:46,578 - custodian-mailer - INFO - No messages left in the gcp topic subscription, now exiting c7n_mailer.
Mailer run 2:
2023-07-05 21:41:48,049 - custodian-mailer - INFO - Downloading messages from the GCP PubSub Subscription.
2023-07-05 21:41:51,626 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_cloud_run_service_labels gcp.cloud-run-service:34 slack:slack_alert to custodian-reports
2023-07-05 21:41:52,658 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_disk_labels gcp.disk:103 slack:slack_alert to custodian-reports
2023-07-05 21:41:52,856 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_pubsub_topic_labels gcp.pubsub-topic:255 slack:slack_alert to custodian-reports
2023-07-05 21:41:53,462 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bq_dataset_labels gcp.bq-dataset:92 slack:slack_alert to custodian-reports
2023-07-05 21:41:53,662 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_secret_labels gcp.secret:36 slack:slack_alert to custodian-reports
2023-07-05 21:41:53,865 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bucket_labels gcp.bucket:132 slack:slack_alert to custodian-reports
2023-07-05 21:41:54,103 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_cloud_function_labels gcp.function:30 slack:slack_alert to custodian-reports
2023-07-05 21:41:56,105 - custodian-mailer - INFO - No messages left in the gcp topic subscription, now exiting c7n_mailer.
Mailer run 3:
2023-07-05 21:53:39,169 - custodian-mailer - INFO - Downloading messages from the GCP PubSub Subscription.
2023-07-05 21:53:42,847 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_secret_labels gcp.secret:36 slack:slack_alert to custodian-reports
2023-07-05 21:53:43,080 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bucket_labels gcp.bucket:132 slack:slack_alert to custodian-reports
2023-07-05 21:53:43,295 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_cloud_run_service_labels gcp.cloud-run-service:34 slack:slack_alert to custodian-reports
2023-07-05 21:53:45,192 - custodian-mailer - INFO - No messages left in the gcp topic subscription, now exiting c7n_mailer.
Mailer run 4:
2023-07-05 22:17:56,370 - custodian-mailer - INFO - Downloading messages from the GCP PubSub Subscription.
2023-07-05 22:17:59,814 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_secret_labels gcp.secret:36 slack:slack_alert to custodian-reports
2023-07-05 22:18:00,057 - custodian-mailer - INFO - Sending account:foo policy:ensure_gcp_bucket_labels gcp.bucket:132 slack:slack_alert to custodian-reports
2023-07-05 22:18:02,235 - custodian-mailer - INFO - No messages left in the gcp topic subscription, now exiting c7n_mailer.
Extra information or context
Notice the order of the c7n policy run and how it impacts which messages were ACKed by mailer: Run 1 : 9 messages total; mailer finished on policy ensure_gcp_instance_template_labels (n=2), so next run will be 9-2=7 Run 2: 7 messages total; mailer finished on policy ensure_gcp_cloud_function_labels (n=6), so next run will be 9-6=3 Run 3: 3 messages total; mailer finished on policy ensure_gcp_cloud_run_service_labels (n=7), so next run will be 9-7=2 Run 4: 2 messages total; mailer finished on policy ensure_gcp_bucket_labels (n=9), so next run will be 9-9=0
hmm.. we should switch this to using ack by message id, rather than assuming total ordering
If anyone runs into the same issue as me, the workaround so far is to implement your own QueueProcessor and use it instead of the regular one by duplicating the cli code. At least, I didn't find a way to plug custom classes into the module.
from typing import List
from c7n_mailer.cli import *
from c7n_mailer.gcp_mailer import gcp_queue_processor
class MailerGcpQueueProcessor(gcp_queue_processor.MailerGcpQueueProcessor):
"""Custom processor that acknowledge by IDs instead of datetime"""
def run(self):
self.logger.info("Downloading messages from the GCP PubSub Subscription.")
# Get first set of messages to process
messages = self.receive_messages()
while messages and len(messages["receivedMessages"]) > 0:
# Process received messages
ack_ids = []
for message in messages["receivedMessages"]:
self.process_message(message, message["message"]["publishTime"])
ack_ids.append(message["ackId"])
# Acknowledge and purge processed messages then get next set of messages
if ack_ids:
self.ack_messages(ack_ids)
messages = self.receive_messages()
self.logger.info("No messages left in the gcp topic subscription, now exiting c7n_mailer.")
def ack_messages(self, ids: List[str]):
"""Acknowledge and Discard messages with specific IDs"""
return self.client.execute_command(
"acknowledge", {"subscription": self.subscription, "body": {"ackIds": ids}}
)
# Copy of the mailer CLI function
def main():
....
if args_dict.get("run"):
....
processor = MailerGcpQueueProcessor(mailer_config, logger)
....
@nvarscar could you submit a pull request?
Last time I couldn't get past the agreement page as the process requires a CLA Manager of the organization to sign it and I don't have any idea who that would be in my organization. I wish I could.