sagemaker-python-sdk icon indicating copy to clipboard operation
sagemaker-python-sdk copied to clipboard

Importing sagemaker library in airflow, makes airflow fail to upload logs to s3.

Open cosmic-heart opened this issue 6 months ago • 0 comments

Describe the bug Importing the SageMaker library at the top of the Airflow plugin or hook declaration file causes issues with uploading logs to S3.

To reproduce

Setup a sample dag with the plugin as mentioned below. main problem is around importing sagemaker library.

Sample dag

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging

LOGGER = logging.getLogger(__name__)


def test_function(**_):
    LOGGER.info("Success_test_dag")


with DAG(
    dag_id="test_dag",
    description=__doc__,
    schedule_interval=None,  # Every month on 6th at 6:00 PM IST
    default_args={
        "depends_on_past": False,
        "owner": "Data Platform",
        "start_date": datetime(2025, 4, 29),
        "retries": 1,
    },
    catchup=False,
    max_active_runs=1,
) as dag:
    PythonOperator(
        task_id="test_task",
        python_callable=test_function,
        provide_context=True,
    )

sample plugin

plugins/sample_hook.py

from airflow.hooks.base import BaseHook
from airflow.plugins_manager import AirflowPlugin

import sagemaker

class SampleHook(BaseHook):
    ...

class SamplePlugin(AirflowPlugin):
    name = "sample_hook"
    hooks = [SampleHook]

Env variables: AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=s3://airflow/logs AIRFLOW__LOGGING__REMOTE_LOGGING=True AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID=s3_airflow_logging AIRFLOW__LOGGING__REMOTE_LOGGING=True

Expected behavior

When you run the DAG, it shows as successful, but Airflow fails to retrieve the logs from S3, even though the s3_airflow_logging connection is configured correctly. However, if you remove the line that imports the SageMaker library, the logs are successfully stored and retrieved from S3 as expected.

Screenshots or logs

When sagemaker is not imported

da08261f9df2
 ▼ Log message source details
*** Found logs in s3:
***   * s3://airflow/logs/dag_id=test_dag/run_id=manual__2025-06-16T10:10:39.397088+00:00/task_id=test_task/attempt=1.log
 ▲▲▲ Log group end
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:256226136576032}} {{local_task_job_runner.py:123}} ▼ Pre task execution logs
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:256226136576032}} {{taskinstance.py:2613}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.test_task manual__2025-06-16T10:10:39.397088+00:00 [queued]>
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:256226136576032}} {{taskinstance.py:2613}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_dag.test_task manual__2025-06-16T10:10:39.397088+00:00 [queued]>
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:256226136576032}} {{taskinstance.py:2866}} INFO - Starting attempt 1 of 2
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:256226136576032}} {{taskinstance.py:2889}} INFO - Executing <Task(PythonOperator): test_task> on 2025-06-16 10:10:39.397088+00:00
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:256226136576032}} {{standard_task_runner.py:72}} INFO - Started process 29 to run task
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:29, thread = MainThread:256226136576032}} {{standard_task_runner.py:104}} INFO - Running: ['***', 'tasks', 'run', 'test_dag', 'test_task', 'manual__2025-06-16T10:10:39.397088+00:00', '--job-id', '65', '--raw', '--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpmn1u32bm']
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:29, thread = MainThread:256226136576032}} {{standard_task_runner.py:105}} INFO - Job 65: Subtask test_task
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:29, thread = MainThread:256226136576032}} {{task_command.py:467}} INFO - Running <TaskInstance: test_dag.test_task manual__2025-06-16T10:10:39.397088+00:00 [running]> on host da08261f9df2
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:29, thread = MainThread:256226136576032}} {{taskinstance.py:3132}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='Data Platform' AIRFLOW_CTX_DAG_ID='test_dag' AIRFLOW_CTX_TASK_ID='test_task' AIRFLOW_CTX_EXECUTION_DATE='2025-06-16T10:10:39.397088+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-06-16T10:10:39.397088+00:00'
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:29, thread = MainThread:256226136576032}} {{taskinstance.py:731}} ▲▲▲ Log group end
[2025-06-16, 15:40:42 IST] {{process = ForkPoolWorker-3:29, thread = MainThread:256226136576032}} {{test.py:10}} INFO - Success_test_dag
[2025-06-16, 15:40:43 IST] {{process = ForkPoolWorker-3:29, thread = MainThread:256226136576032}} {{python.py:240}} INFO - Done. Returned value was: None
[2025-06-16, 15:40:43 IST] {{process = ForkPoolWorker-3:29, thread = MainThread:256226136576032}} {{taskinstance.py:340}} ▼ Post task execution logs
[2025-06-16, 15:40:43 IST] {{process = ForkPoolWorker-3:29, thread = MainThread:256226136576032}} {{taskinstance.py:352}} INFO - Marking task as SUCCESS. dag_id=test_dag, task_id=test_task, run_id=manual__2025-06-16T10:10:39.397088+00:00, execution_date=20250616T101039, start_date=20250616T101042, end_date=20250616T101043
[2025-06-16, 15:40:43 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:256226136576032}} {{local_task_job_runner.py:266}} INFO - Task exited with return code 0
[2025-06-16, 15:40:43 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:256226136576032}} {{taskinstance.py:3895}} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2025-06-16, 15:40:43 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:256226136576032}} {{local_task_job_runner.py:245}} ▲▲▲ Log group end

When sagemaker is imported

da08261f9df2
 ▼ Log message source details
*** No logs found on s3 for ti=<TaskInstance: test_dag.test_task manual__2025-06-16T10:18:26.179356+00:00 [success]>
*** Found local files:
***   * /opt/airflow/logs/dag_id=test_dag/run_id=manual__2025-06-16T10:18:26.179356+00:00/task_id=test_task/attempt=1.log
 ▲▲▲ Log group end
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:248620777738272}} {{local_task_job_runner.py:123}} ▼ Pre task execution logs
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:248620777738272}} {{taskinstance.py:2613}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.test_task manual__2025-06-16T10:18:26.179356+00:00 [queued]>
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:248620777738272}} {{taskinstance.py:2613}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_dag.test_task manual__2025-06-16T10:18:26.179356+00:00 [queued]>
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:248620777738272}} {{taskinstance.py:2866}} INFO - Starting attempt 1 of 2
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:248620777738272}} {{taskinstance.py:2889}} INFO - Executing <Task(PythonOperator): test_task> on 2025-06-16 10:18:26.179356+00:00
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:248620777738272}} {{standard_task_runner.py:72}} INFO - Started process 38 to run task
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:38, thread = MainThread:248620777738272}} {{standard_task_runner.py:104}} INFO - Running: ['***', 'tasks', 'run', 'test_dag', 'test_task', 'manual__2025-06-16T10:18:26.179356+00:00', '--job-id', '71', '--raw', '--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpegkx_0gj']
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:38, thread = MainThread:248620777738272}} {{standard_task_runner.py:105}} INFO - Job 71: Subtask test_task
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:38, thread = MainThread:248620777738272}} {{task_command.py:467}} INFO - Running <TaskInstance: test_dag.test_task manual__2025-06-16T10:18:26.179356+00:00 [running]> on host da08261f9df2
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:38, thread = MainThread:248620777738272}} {{taskinstance.py:3132}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='Data Platform' AIRFLOW_CTX_DAG_ID='test_dag' AIRFLOW_CTX_TASK_ID='test_task' AIRFLOW_CTX_EXECUTION_DATE='2025-06-16T10:18:26.179356+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-06-16T10:18:26.179356+00:00'
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:38, thread = MainThread:248620777738272}} {{taskinstance.py:731}} ▲▲▲ Log group end
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:38, thread = MainThread:248620777738272}} {{test.py:10}} INFO - Success_test_dag
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:38, thread = MainThread:248620777738272}} {{python.py:240}} INFO - Done. Returned value was: None
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:38, thread = MainThread:248620777738272}} {{taskinstance.py:340}} ▼ Post task execution logs
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:38, thread = MainThread:248620777738272}} {{taskinstance.py:352}} INFO - Marking task as SUCCESS. dag_id=test_dag, task_id=test_task, run_id=manual__2025-06-16T10:18:26.179356+00:00, execution_date=20250616T101826, start_date=20250616T101829, end_date=20250616T101829
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:248620777738272}} {{local_task_job_runner.py:266}} INFO - Task exited with return code 0
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:248620777738272}} {{taskinstance.py:3895}} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2025-06-16, 15:48:29 IST] {{process = ForkPoolWorker-3:26, thread = MainThread:248620777738272}} {{local_task_job_runner.py:245}} ▲▲▲ Log group end

System information Version:

  • airflow: 2.10.4
  • python: 3.9
  • sagemaker: 2.233.0

cosmic-heart avatar Jun 16 '25 10:06 cosmic-heart