airflow
airflow copied to clipboard
HTTPOperator should have the possibility to retry at the page level using the http hook run_with_advanced_retry
Description
I should be possible that the HTTPOperator retries some of the request without failing the whole task, since when it is using pagination, some pages might fail.
Use case/motivation
I'm requesting all the pages from an API endpoint (TeamTailor JobPosts) and that is a total of 438 pages. Sometimes while going through these pages the server returns error 500. If that page in particular would be retried with exponential backoff the whole task would probably succeed, but retrying the whole task might reproduce the problem.
Related issues
No response
Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I have created this extended version and seems to be working correctly, sorry I don't have time to create a PR:
from __future__ import annotations
from airflow.configuration import conf
from airflow.providers.http.operators.http import HttpOperator
from airflow.utils.context import Context
from requests import Response
from requests.auth import AuthBase
from typing import Any, Callable
class RetryHttpOperator(HttpOperator):
def __init__(self, *, endpoint: str | None = None, method: str = "POST", data: dict[str, Any] | str | None = None,
headers: dict[str, str] | None = None, pagination_function: Callable[..., Any] | None = None,
response_check: Callable[..., bool] | None = None, response_filter: Callable[..., Any] | None = None,
extra_options: dict[str, Any] | None = None, http_conn_id: str = "http_default",
log_response: bool = False, auth_type: type[AuthBase] | None = None, tcp_keep_alive: bool = True,
tcp_keep_alive_idle: int = 120, tcp_keep_alive_count: int = 20, tcp_keep_alive_interval: int = 30,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
retry_args: dict[Any, Any],
**kwargs: Any) -> None:
super().__init__(endpoint=endpoint, method=method, data=data, headers=headers,
pagination_function=pagination_function, response_check=response_check,
response_filter=response_filter, extra_options=extra_options, http_conn_id=http_conn_id,
log_response=log_response, auth_type=auth_type, tcp_keep_alive=tcp_keep_alive,
tcp_keep_alive_idle=tcp_keep_alive_idle, tcp_keep_alive_count=tcp_keep_alive_count,
tcp_keep_alive_interval=tcp_keep_alive_interval, deferrable=deferrable, **kwargs)
self._retry_args = retry_args
def execute_sync(self, context: Context) -> Any:
self.log.info("Calling HTTP method")
if self._retry_args:
response = self.hook.run_with_advanced_retry(self._retry_args, self.endpoint, self.data, self.headers,
self.extra_options)
else:
response = self.hook.run(self.endpoint, self.data, self.headers, self.extra_options)
response = self.paginate_sync(response=response)
return self.process_response(context=context, response=response)
def paginate_sync(self, response: Response) -> Response | list[Response]:
if not self.pagination_function:
return response
all_responses = [response]
while True:
next_page_params = self.pagination_function(response)
if not next_page_params:
break
if self._retry_args:
response = self.hook.run_with_advanced_retry(self._retry_args,
**self._merge_next_page_parameters(next_page_params))
else:
response = self.hook.run(**self._merge_next_page_parameters(next_page_params))
all_responses.append(response)
return all_responses
Hi, I can work on this issue. Could you please assign me?
Sorry, I'm not a collaborator, so I cannot assign anybody... maybe @eladkal could?