airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Getting 404 not found error when writing airflow logs to google cloud storage

Open sai3563 opened this issue 3 years ago • 3 comments

Apache Airflow version

2.3.4

What happened

Hello, I've come across something that I think is most likely a bug in airflow.

After I setup remote logging to google cloud storage in airflow, I noticed that whenever airflow tried to save logs to the cloud, I would get this error:

google.api_core.exceptions.NotFound: 404 GET https://storage.googleapis.com/download/storage/v1/b/airflow_logs/o/var%2Flog%2Fairflow%2Fdag_id%3Dtest_dag%2Frun_id%3Dscheduled__2022-09-16T16%3A34%3A00%2B00%3A00%2Ftask_id%3Dbranch_decision%2Fattempt%3D1.log?alt=media: No such object: airflow_logs/var/log/airflow/dag_id=test_dag/run_id=scheduled__2022-09-16T16:34:00+00:00/task_id=branch_decision/attempt=1.log: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)

But on google cloud storage, logs were being saved just fine. On analyzing the relevant source code, I found this

def gcs_write(self, log, remote_log_location):
        """
        Writes the log to the remote_log_location. Fails silently if no log
        was created.

        :param log: the log to write to the remote_log_location
        :type log: str
        :param remote_log_location: the log's location in remote storage
        :type remote_log_location: str (path)
        """
        try:
            blob = storage.Blob.from_string(remote_log_location, self.client)
            old_log = blob.download_as_bytes().decode()
            log = '\n'.join([old_log, log]) if old_log else log
        except Exception as e:  # pylint: disable=broad-except
            if not hasattr(e, 'resp') or e.resp.get('status') != '404':  # pylint: disable=no-member
                log = f'*** Previous log discarded: {str(e)}\n\n' + log
                self.log.info("Previous log discarded: %s", e)

        try:
            blob = storage.Blob.from_string(remote_log_location, self.client)
            blob.upload_from_string(log, content_type="text/plain")
        except Exception as e:  # pylint: disable=broad-except
            self.log.error('Could not write logs to %s: %s', remote_log_location, e)

The gcs_write function is used to write logs, but for some reason, before writing it tried to read in a try. So of course, since it's trying to read the file first before writing. It is unable to find it. So it goes to the except. There it checks whether the attribute resp doesn't exists for the exception, which it doesn't. These are the attributes it has

['__annotations__', '__cause__', '__class__', '__context__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__suppress_context__', '__traceback__', '__weakref__', '_details', '_error_info', '_errors', '_response', 'args', 'code', 'details', 'domain', 'errors', 'grpc_status_code', 'message', 'metadata', 'reason', 'response', 'with_traceback']

So the if condition criteria is met and an error is thrown. As soon as that part is done, it's going to the write part and successfully saving it. Weird.

What you think should happen instead

The error should not be thrown, as logs are being successfully written in Google Cloud Storage.

How to reproduce

Enable remote logging in airflow and run any dag which generates logs and check airflow worker logs.

Operating System

Ubuntu 20.04 LTS

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==5.1.0 apache-airflow-providers-celery==3.0.0 apache-airflow-providers-common-sql==1.2.0 apache-airflow-providers-ftp==3.1.0 apache-airflow-providers-google==8.3.0 <-- This is the relevant library apache-airflow-providers-http==4.0.0 apache-airflow-providers-imap==3.0.0 apache-airflow-providers-mongo==3.0.0 apache-airflow-providers-mysql==3.2.0 apache-airflow-providers-slack==5.1.0 apache-airflow-providers-sqlite==3.2.1

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

sai3563 avatar Sep 17 '22 07:09 sai3563

Thanks for opening your first issue here! Be sure to follow the issue template!

boring-cyborg[bot] avatar Sep 17 '22 07:09 boring-cyborg[bot]

Feel free to fix it - apparently we need to check other field for 404. the reason why it tries to get the log to see if it is not being overwritten, but first of all it only should check if the file exists rather than retrieve it and secondly, it should not print log if it does not.

potiuk avatar Sep 19 '22 11:09 potiuk

Assigned you!

potiuk avatar Sep 19 '22 11:09 potiuk

Hi! @potiuk how are you?!! Do you have any update about this issue? we're using v2.1.0 deployed by helm chart 8.5.0 and have this issue too. The logs are in the GCS OK but all dags show that message.

Thanks!!

jholowaty avatar Aug 09 '23 12:08 jholowaty

All information are here. If you want to take care and implement it - it's still ready for someone to take a look, diagnose and fix.

potiuk avatar Aug 09 '23 13:08 potiuk

Asking me particularly about the issue makes little sense as I ma not working on it, Someeone might - you can ping and ask the person who is assigned, but pinging me to ping that person introduces unnecessary step on the way so I would not advice doing it

potiuk avatar Aug 09 '23 13:08 potiuk

@sai3563 how are you? how can i fix this?

@potiuk my apologies!!

jholowaty avatar Aug 09 '23 13:08 jholowaty

@eladkal @potiuk I believe this is resolved now.

I'm not seeing this error in my prod instance. I checked code of the latest version.

    def gcs_write(self, log, remote_log_location) -> bool:
        """
        Write the log to the remote location and return `True`; fail silently and return `False` on error.

        :param log: the log to write to the remote_log_location
        :param remote_log_location: the log's location in remote storage
        :return: whether the log is successfully written to remote location or not.
        """
        try:
            blob = storage.Blob.from_string(remote_log_location, self.client)
            old_log = blob.download_as_bytes().decode()
            log = f"{old_log}\n{log}" if old_log else log
        except Exception as e:
            if not self.no_log_found(e):
                log += self._add_message(
                    f"Error checking for previous log; if exists, may be overwritten: {e}"
                )
                self.log.warning("Error checking for previous log: %s", e)
        try:
            blob = storage.Blob.from_string(remote_log_location, self.client)
            blob.upload_from_string(log, content_type="text/plain")
        except Exception as e:
            self.log.error("Could not write logs to %s: %s", remote_log_location, e)
            return False
        return True

    @staticmethod
    def no_log_found(exc):
        """
        Given exception, determine whether it is result of log not found.

        :meta private:
        """
        if (exc.args and isinstance(exc.args[0], str) and "No such object" in exc.args[0]) or getattr(
            exc, "resp", {}
        ).get("status") == "404":
            return True
        return False

Since no_log_found is returning True when error response value is 404 and in exception of gcs_write, it's checking if not self.no_log_found(e):, the log shouldn't be printed.

Closing this.

sai3563 avatar Apr 25 '24 08:04 sai3563