py-amqp icon indicating copy to clipboard operation
py-amqp copied to clipboard

Celery frequently and randomly disconnecting with RabbitMQ

Open sharonkwong opened this issue 4 months ago • 1 comments

I set up a celery + beat containers in ECS and used AWS MQ as the broker. However, my setup seems to be very prone to disconnections, where celery workers are suddenly disconnected from MQ.

I end up seeing these logs, and at some point, celery stops retrying to connect and my entire background task container stops working.

Error logs from celery:

Unrecoverable error: OperationalError('_ssl.c:1000: The handshake operation timed out')", "exc_info": ["<class 'kombu.exceptions.OperationalError'>", "OperationalError('_ssl.c:1000: The handshake operation timed out')", "<traceback object at 0x7f442b213200>"

/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:495`
**Message:** {"event": "consumer: Cannot connect to amqps://admin:**@instance_did.mq.us-west-2.on.aws:5671//: timed out.\nTrying again in 16.00 seconds... (8/10)

Timed out waiting for UP message from <ForkProcess(ForkPoolWorker-313, started daemon)>


<class 'celery.beat.SchedulingError'>", "SchedulingError(\"Couldn't apply scheduled task scheduler-stress-test: _ssl.c:1000: The handshake operation timed out\")", "<traceback object at 0x7f36272f7840>

Here is my django settings.py config for celery:

# Function to fetch RabbitMQ credentials from AWS Secrets Manager
def get_rabbitmq_broker_url():
    """
    Fetch RabbitMQ credentials from AWS Secrets Manager and construct broker URL.
    Uses the same pattern as database password fetching.
    """
    if IS_LOCAL:
        return ""  # Local development uses Redis broker

    try:
        secrets_client = boto3.client("secretsmanager", region_name=AWS_REGION_NAME)
        print(
            f"allen about to fetch secret at {NON_LOCAL_ENVIRONMENT}/rabbitmq/credentials"
        )
        secret_response = secrets_client.get_secret_value(
            SecretId=f"{NON_LOCAL_ENVIRONMENT}/rabbitmq/credentials"
        )
        creds = json.loads(secret_response["SecretString"])

        # Construct the AMQP URL - FIXED: Use amqps:// for AWS MQ SSL connections
        host = creds["host"]  # now "b-XXXXXX.mq.us-west-2.on.aws:5671"
        username = creds["username"]
        password = creds["password"]
        vhost = creds.get("vhost", "/")

        # URL-encode the vhost if it's "/"
        if vhost == "/":
            encoded_vhost = "%2F"
        else:
            encoded_vhost = vhost

        if ":5671" in host:
            # Host already includes port, use as-is
            final_mq_url = f"amqps://{username}:{password}@{host}/{encoded_vhost}"
        else:
            # Host doesn't include port, add it
            final_mq_url = f"amqps://{username}:{password}@{host}:5671/{encoded_vhost}"

        print(
            f"Connecting to RabbitMQ: host={host}, username={username}, vhost={vhost}"
        )
        print(f"DEBUG: Celery broker URL: {final_mq_url}")
        return final_mq_url

    except Exception as e:
        print(f"Error fetching RabbitMQ credentials from Secrets Manager: {e}")
        return ""


# Celery Configuration
if IS_LOCAL:
    # Use Redis as both broker and result backend for local development
    CELERY_BROKER_URL = REDIS_URL
    print(f"Running with local Redis broker: {CELERY_BROKER_URL}")
else:
    # Use RabbitMQ in production
    CELERY_BROKER_URL = get_rabbitmq_broker_url()
    if not CELERY_BROKER_URL:
        raise RuntimeError("RabbitMQ broker URL not found in environment")
    print(f"Running with RabbitMQ broker: {CELERY_BROKER_URL}")

CELERY_RESULT_BACKEND = REDIS_URL
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = TIME_ZONE
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30 minutes

# Message persistence - ensures tasks survive broker restarts
CELERY_TASK_DELIVERY_MODE = 2  # Persistent messages

# Task publishing settings for reliability
CELERY_TASK_PUBLISH_RETRY = True  # Retry publishing failed tasks
CELERY_TASK_PUBLISH_RETRY_POLICY = {
    "max_retries": 3,
    "interval_start": 0,
    "interval_step": 1,
    "interval_max": 5,
}

# Redis Cluster configuration to avoid CROSSSLOT errors
CELERY_REDIS_BACKEND_USE_REDIS_CLUSTER = True
CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = {
    "global_keyprefix": "{beat}:"  # Hash tag ensures all keys go to the same slot
}

# https://gist.github.com/fjsj/da41321ac96cf28a96235cb20e7236f6

# RabbitMQ Specific Celery Settings - only apply when not local
if not IS_LOCAL:
    CELERY_BROKER_TRANSPORT_OPTIONS = {
        "confirm_publish": True,  # Enable publisher confirmation for exactly-once delivery
        "max_retries": 3,
        "interval_start": 1,
        "interval_step": 2,
        "interval_max": 10,
        # Connection timeout settings - optimized for AWS MQ with native SSL
        "socket_timeout": 30,
        "connect_timeout": 30,
        "heartbeat": 60,
        # SSL-specific settings - simplified for amqps:// connections
        "ssl_handshake_timeout": 30,  # Increased for AWS MQ latency
        "ssl_timeout": 30,  # Increased timeout
        # Connection pool settings
        "socket_keepalive": True,  # Enable TCP keep-alive
        "socket_keepalive_options": {
            1: 30,
            2: 10,
            3: 3,
        },
        # Additional reliability settings
        "blocked_connection_timeout": 60,
        "connection_attempts": 3,
        "retry_on_timeout": True,  # Retry on timeout errors
        # Additional AWS MQ specific settings
        "failover_strategy": "round-robin",  # Better failover handling
    }

    # RabbitMQ connection settings for better reliability
    CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
    CELERY_BROKER_CONNECTION_RETRY = True
    CELERY_BROKER_CONNECTION_MAX_RETRIES = 10

    # Add connection pool settings - INCREASED for better throughput
    CELERY_BROKER_POOL_LIMIT = None
    CELERY_BROKER_CONNECTION_TIMEOUT = 30
    CELERY_BROKER_HEARTBEAT = 60

    # Channel management settings to prevent ChannelPromise issues
    CELERY_BROKER_CHANNEL_ERROR_RETRY = True  # Retry on channel errors

    # AWS recommended settings for Celery 5.5+ with RabbitMQ
    # These reduce memory pressure on the broker
    CELERY_TASK_CREATE_MISSING_QUEUES = False  # Prevent queue churn
    CELERY_WORKER_ENABLE_REMOTE_CONTROL = (
        True  # Enable for Flower monitoring (slight memory cost)
    )
    CELERY_TASK_DEFAULT_QUEUE_TYPE = (
        "quorum"  # Use quorum queues for better reliability
    )

    CELERY_BROKER_NATIVE_DELAYED_DELIVERY_QUEUE_TYPE = "classic"

    CELERY_BROKER_TRANSPORT_OPTIONS["queue_arguments"] = {
        "x-dead-letter-strategy": "at-least-once"
    }


CELERY_TASK_ACKS_LATE = True
CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = True
# CELERY_TASK_REJECT_ON_WORKER_LOST = True


CELERY_EVENT_QUEUE_EXPIRES = 60.0
CELERY_EVENT_QUEUE_TTL = 3600  # 1 hour message expiration
CELERY_TASK_SEND_SENT_EVENT = True
CELERY_WORKER_SEND_TASK_EVENTS = True  # Ensure worker events for Flower

TASK_HTTP_CONNECT_TIMEOUT = 5
TASK_HTTP_READ_TIMEOUT = 60

# Celery Logging Configuration
# Disable Celery's built-in logging setup so it uses Django's configuration
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
CELERY_WORKER_LOG_COLOR = False  # Disable colors in production logs

# Task execution logging settings
CELERY_TASK_EAGER_PROPAGATES = True  # Propagate exceptions in eager mode
CELERY_TASK_STORE_EAGER_RESULT = True  # Store results even in eager mode

# Worker logging settings for better debugging
CELERY_WORKER_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"
CELERY_WORKER_TASK_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s"

# Additional debugging configuration
CELERY_TASK_ANNOTATIONS = {
    "*": {
        "preserve_name": True,  # Keep original task names for debugging
    }
}

# Enable task result metadata for better debugging
CELERY_RESULT_EXTENDED = True

My dockerfile command to start celery:

CMD ["celery", "-A", "server", "worker", "-E", "--autoscale=2,1", "--without-heartbeat", "--max-tasks-per-child=1000", "--time-limit=600", "--loglevel=info", "-n", "worker@%h"]

And my main.tf for my mq instance:

# RabbitMQ password is provided via variable (like other passwords in your infrastructure)

# Create security group for RabbitMQ
resource "aws_security_group" "rabbitmq" {
  name        = "${var.environment}-rabbitmq-sg"
  description = "Security group for RabbitMQ broker"
  vpc_id      = var.vpc_id

  # RabbitMQ AMQP port
  ingress {
    from_port   = 5671
    to_port     = 5671
    protocol    = "tcp"
    cidr_blocks = [var.vpc_cidr]
    description = "RabbitMQ AMQP over TLS"
  }

  # RabbitMQ Management Console (optional, for debugging)
  ingress {
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = [var.vpc_cidr]
    description = "RabbitMQ Management Console"
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name        = "${var.environment}-rabbitmq-sg"
    Environment = var.environment
  }

  lifecycle {
    create_before_destroy = true
  }
}

# Create subnet group for RabbitMQ
resource "aws_mq_broker" "rabbitmq" {
  broker_name        = "${var.environment}-project-rabbitmq"
  engine_type        = "RabbitMQ"
  engine_version     = "3.13"
  host_instance_type = var.instance_type
  security_groups    = [aws_security_group.rabbitmq.id]
  subnet_ids         = var.deployment_mode == "SINGLE_INSTANCE" ? [var.private_subnet_ids[0]] : var.private_subnet_ids

  deployment_mode = var.deployment_mode
  storage_type    = "ebs"

  # Configure the admin user
  user {
    username = var.rabbitmq_username
    password = var.rabbitmq_password
  }

  # Enable logging
  logs {
    general = true
  }

  # Maintenance window (adjust as needed)
  maintenance_window_start_time {
    day_of_week = "sunday"
    time_of_day = "02:00"
    time_zone   = "UTC"
  }

  # Auto minor version upgrade
  auto_minor_version_upgrade = true
  publicly_accessible        = false

  tags = {
    Name        = "${var.environment}-project-rabbitmq"
    Environment = var.environment
  }
}

# Store RabbitMQ credentials in AWS Secrets Manager
resource "aws_secretsmanager_secret" "rabbitmq_credentials" {
  name        = "${var.environment}/rabbitmq/credentials"
  description = "RabbitMQ broker credentials for ${var.environment}"

  tags = {
    Environment = var.environment
  }
}

resource "aws_secretsmanager_secret_version" "rabbitmq_credentials" {
  secret_id     = aws_secretsmanager_secret.rabbitmq_credentials.id
  secret_string = jsonencode({
    username = var.rabbitmq_username
    password = var.rabbitmq_password
    host     = replace(aws_mq_broker.rabbitmq.instances[0].endpoints[0], "amqps://", "")
    port     = "5671"
    vhost    = "/"
    # Add management API URL for Flower
    management_url = aws_mq_broker.rabbitmq.instances[0].console_url
  })

  depends_on = [aws_mq_broker.rabbitmq]
}

# Create CloudWatch log group for RabbitMQ logs
resource "aws_cloudwatch_log_group" "rabbitmq_logs" {
  name              = "/aws/amazonmq/broker/${aws_mq_broker.rabbitmq.broker_name}"
  retention_in_days = var.log_retention_days

  tags = {
    Environment = var.environment
  }
}

sharonkwong avatar Jul 24 '25 16:07 sharonkwong

why you are opening same issues in multiple repo? <class 'celery.beat.SchedulingError'>", "SchedulingError("Couldn't apply scheduled task scheduler-stress-test: check this to debug your issue

auvipy avatar Jul 26 '25 06:07 auvipy