LambdaInvokeFunctionOperator throwing ReadTimeout error even when the actual lambda invocation completed within the time limits
Apache Airflow version
main (development)
If "Other Airflow 2 version" selected, which one?
No response
What happened?
When invoking lambda functions using LambdaInvokeFunctionOperator, the task continues to run even after the actual lambda invocation is completed. It then throws a ReadTimeoutError.
It is more common with Lambda functions that take more than 13 minutes to run. For Lambda functions that take more than 4 minutes, this is common when multiple tasks with LambdaInvokeFunctionOperator are triggered(I.e., Invoke the same lambda).
I have followed the recommended settings as mentioned here:
My AWS connection extra parameter has the following json
{
"config_kwargs": {
"connect_timeout": 5,
"read_timeout": 900,
"tcp_keepalive": true,
"retries": {
"max_attempts": 0
}
}
}
I did set the max timeout to 15 minutes for the lambda function on AWS.
For the mentioned recommendations on the docs:
I have noticed this issue even with lambda functions that take 4 minutes to run. However, ReadTimeouts occur relatively rarely and mostly happen when running multiple invocations in quick succession.
I have updated
sysctl.confas below:
echo "net.ipv4.tcp_keepalive_time = 320" >> /etc/sysctl.conf
echo "net.ipv4.tcp_keepalive_intvl = 60" >> /etc/sysctl.conf
echo "net.ipv4.tcp_keepalive_probes = 20" >> /etc/sysctl.conf
What you think should happen instead?
Tasks Should not run beyond the actual completion of the lambda invocations.
How to reproduce
Use the following DAG:
from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
}
with DAG('invoke_lambda_dag', default_args=default_args, schedule_interval=None, catchup=False) as dag:
invoke_lambda_task = LambdaInvokeFunctionOperator(
task_id='invoke_lambda',
function_name='runForFifteenMinutes',
payload='{"key1": "value1","key2": "value2","key3": "value3"}',
aws_conn_id='aws'
)
Create an AWS connection with the following json in the extra(You might need to add AWS aws_session_token and region_name to the extra:
{
"config_kwargs": {
"connect_timeout": 5,
"read_timeout": 900,
"tcp_keepalive": true,
"retries": {
"max_attempts": 0
}
}
}
On AWS, create a lambda function, and update timeout to 15 minutes(That is the max possible value)
You can add time.sleep(780)(13 minutes) to your lambda code so that it runs for 13 minutes.
Also decrease sleep time to 4 minutes and trigger the DAG multiple times quickly to reproduce ReadTimeout's
Operating System
ubuntu-22.04
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==8.27.0
Deployment
Docker-Compose
Deployment details
Docker file update as below
FROM apache/airflow:2.9.3
ADD requirements.txt .
RUN pip install apache-airflow==${AIRFLOW_VERSION} -r requirements.txt
RUN apt-get update && apt-get install -y procps
# Set TCP keepalive settings
RUN echo "net.ipv4.tcp_keepalive_time = 600" >> /etc/sysctl.conf && \
echo "net.ipv4.tcp_keepalive_intvl = 60" >> /etc/sysctl.conf && \
echo "net.ipv4.tcp_keepalive_probes = 20" >> /etc/sysctl.conf
# Apply sysctl settings
RUN sysctl -p
Docker-compose.yml
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.9.3
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed.
# Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Use this option ONLY for quick checks. Installing requirements at container
# startup is done EVERY TIME the service is started.
# A better way is to build a custom image or extend the official image
# as described in https://airflow.apache.org/docs/docker-stack/build.html.
# Default: ''
#
# Feel free to modify this file to suit your needs.
---
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.9.3}
build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
# yamllint enable rule:line-length
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
# WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
# for other purpose (development, test and especially production usage) build/extend Airflow image.
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
# The following line can be used to set a custom config file, stored in the local config folder
# If you want to use it, outcomment it and replace airflow.cfg with the name of your config file
# AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
redis:
# Redis is limited to 7.2-bookworm due to licencing change
# https://redis.io/blog/redis-adopts-dual-source-available-licensing/
image: redis:7.2-bookworm
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
# yamllint disable rule:line-length
test:
- "CMD-SHELL"
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- ${AIRFLOW_PROJ_DIR:-.}:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
# or by explicitly targeted on the command line e.g. docker-compose up flower.
# See: https://docs.docker.com/compose/profiles/
flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- "5555:5555"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
volumes:
postgres-db-volume:
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
cc @vincbeck @o-nikolas looks like a valid bug
Hi @rawwar i have tried the steps you have mentioned from my local, but not been able to re produce the issue, in my case its getting succeeded. is there any other information help to reproduce this issue? could you please provide.
First task:
Between there are couple of tasks triggered with seconds gap.
last task with after 4minutes.
@gopidesupavan and I connected last week to replicate the issue. But the problem is only happening on my local setup. I am testing this out on a fresh EC2 instance and will provide an update.
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.
I am also getting the same issue when i am deploying it on AWS. Can anyone help for resolution?
@sachinbct , can you please share more details about your env/setup?
I am getting the same issue from AWS MWAA (Managed Workflows for Apache Airflow).
My extras config contains the following:
{
"config_kwargs": {
"read_timeout": 900,
"connect_timeout": 900,
"tcp_keepalive": true
}
However, the LambdaFunctionInvokeOperator is failing after 5:30 minutes. Lambda function invocation itself is successful after ~1.5 minutes.
@michael-lok - I had also done the same extra configs. Even I tried to change botocore_config which we pass to InvoleAWSLambdaOperator as below - config = Config( connect_timeout=900, read_timeout=900, retries={ 'max_attempts': 5, 'mode': 'adaptive' }, tcp_keepalive=True ) but still no luck. Airflow setup includes, input and output S3 buckets on AWS
@sachinbct , I think this is only happening on deployments on AWS. And, the fix needs to be done in botocore: https://github.com/boto/botocore/pull/3140 . I verified this fix and its working once you update the botocore with these changes