airflow icon indicating copy to clipboard operation
airflow copied to clipboard

HTTPOperator should have the possibility to retry at the page level using the http hook run_with_advanced_retry

Open lopezvit opened this issue 10 months ago • 3 comments

Description

I should be possible that the HTTPOperator retries some of the request without failing the whole task, since when it is using pagination, some pages might fail.

Use case/motivation

I'm requesting all the pages from an API endpoint (TeamTailor JobPosts) and that is a total of 438 pages. Sometimes while going through these pages the server returns error 500. If that page in particular would be retried with exponential backoff the whole task would probably succeed, but retrying the whole task might reproduce the problem.

Related issues

No response

Are you willing to submit a PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

lopezvit avatar Apr 25 '24 07:04 lopezvit

I have created this extended version and seems to be working correctly, sorry I don't have time to create a PR:

from __future__ import annotations

from airflow.configuration import conf
from airflow.providers.http.operators.http import HttpOperator
from airflow.utils.context import Context
from requests import Response
from requests.auth import AuthBase
from typing import Any, Callable


class RetryHttpOperator(HttpOperator):

    def __init__(self, *, endpoint: str | None = None, method: str = "POST", data: dict[str, Any] | str | None = None,
                 headers: dict[str, str] | None = None, pagination_function: Callable[..., Any] | None = None,
                 response_check: Callable[..., bool] | None = None, response_filter: Callable[..., Any] | None = None,
                 extra_options: dict[str, Any] | None = None, http_conn_id: str = "http_default",
                 log_response: bool = False, auth_type: type[AuthBase] | None = None, tcp_keep_alive: bool = True,
                 tcp_keep_alive_idle: int = 120, tcp_keep_alive_count: int = 20, tcp_keep_alive_interval: int = 30,
                 deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
                 retry_args: dict[Any, Any],
                 **kwargs: Any) -> None:
        super().__init__(endpoint=endpoint, method=method, data=data, headers=headers,
                         pagination_function=pagination_function, response_check=response_check,
                         response_filter=response_filter, extra_options=extra_options, http_conn_id=http_conn_id,
                         log_response=log_response, auth_type=auth_type, tcp_keep_alive=tcp_keep_alive,
                         tcp_keep_alive_idle=tcp_keep_alive_idle, tcp_keep_alive_count=tcp_keep_alive_count,
                         tcp_keep_alive_interval=tcp_keep_alive_interval, deferrable=deferrable, **kwargs)
        self._retry_args = retry_args

    def execute_sync(self, context: Context) -> Any:
        self.log.info("Calling HTTP method")
        if self._retry_args:
            response = self.hook.run_with_advanced_retry(self._retry_args, self.endpoint, self.data, self.headers,
                                                         self.extra_options)
        else:
            response = self.hook.run(self.endpoint, self.data, self.headers, self.extra_options)
        response = self.paginate_sync(response=response)
        return self.process_response(context=context, response=response)

    def paginate_sync(self, response: Response) -> Response | list[Response]:
        if not self.pagination_function:
            return response

        all_responses = [response]
        while True:
            next_page_params = self.pagination_function(response)
            if not next_page_params:
                break
            if self._retry_args:
                response = self.hook.run_with_advanced_retry(self._retry_args,
                                                             **self._merge_next_page_parameters(next_page_params))
            else:
                response = self.hook.run(**self._merge_next_page_parameters(next_page_params))
            all_responses.append(response)
        return all_responses

lopezvit avatar Apr 25 '24 10:04 lopezvit

Hi, I can work on this issue. Could you please assign me?

boraberke avatar Apr 25 '24 11:04 boraberke

Sorry, I'm not a collaborator, so I cannot assign anybody... maybe @eladkal could?

lopezvit avatar Apr 25 '24 15:04 lopezvit