smart_open icon indicating copy to clipboard operation
smart_open copied to clipboard

"io.UnsupportedOperation: read" when processing log statement for s3 multipart upload

Open hhagblom opened this issue 4 years ago • 2 comments

Problem description

  • What are you trying to achieve? I use airflow MsSQLToS3Operator which in turn depends on smart_open to save extracted SQL Server datasets to s3.

  • What is the expected result? The expected result is that the dataset that I extract will be uploaded to s3

  • What are you seeing instead? I get the following stack-trace

[2021-06-09 09:43:17,500] {mssql_to_s3_operator.py:276} INFO - Uploading!
[2021-06-09 09:43:17,657] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/developer/airflow/dags/modified_operators/mssql_to_s3_operator_mod.py", line 100, in execute
    self.build_fetch_query(hook)
  File "/home/developer/airflow/dags/modified_operators/mssql_to_s3_operator_mod.py", line 169, in build_fetch_query
    self.get_records_batch(hook, query_filter)
  File "/home/developer/airflow/dags/modified_operators/mssql_to_s3_operator_mod.py", line 277, in get_records_batch
    fout.write(results)
  File "/home/developer/.local/lib/python3.6/site-packages/smart_open/s3.py", line 839, in write
    self._upload_next_part()
  File "/home/developer/.local/lib/python3.6/site-packages/smart_open/s3.py", line 870, in _upload_next_part
    self._total_bytes / 1024.0 ** 3,
  File "/usr/lib/python3.6/logging/__init__.py", line 1308, in info
    self._log(INFO, msg, args, **kwargs)
  File "/usr/lib/python3.6/logging/__init__.py", line 1444, in _log
    self.handle(record)
  File "/usr/lib/python3.6/logging/__init__.py", line 1454, in handle
    self.callHandlers(record)
  File "/usr/lib/python3.6/logging/__init__.py", line 1516, in callHandlers
    hdlr.handle(record)
  File "/usr/lib/python3.6/logging/__init__.py", line 861, in handle
    rv = self.filter(record)
  File "/usr/lib/python3.6/logging/__init__.py", line 720, in filter
    result = f.filter(record)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/log/secrets_masker.py", line 157, in filter
    record.__dict__[k] = self.redact(v)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/log/secrets_masker.py", line 203, in redact
    return tuple(self.redact(subval) for subval in item)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/log/secrets_masker.py", line 203, in <genexpr>
    return tuple(self.redact(subval) for subval in item)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/log/secrets_masker.py", line 205, in redact
    return list(self.redact(subval) for subval in item)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/log/secrets_masker.py", line 205, in <genexpr>
    return list(self.redact(subval) for subval in item)
io.UnsupportedOperation: read

Analysis performed

I can see in the code for mssql operator, this call.

    with smart_open.smart_open(url, "wb") as fout:

    # In s3.py open()
        if mode not in constants.BINARY_MODES:
        raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES))

I have tried to change "wb" to "wb+" which might fix the problem but it doesn't allow me further down in the smart_open call-stack.

The error message is coming from the following snippet in s3.py

    #
    # Internal methods.
    #
    def _upload_next_part(self):
        part_num = self._total_parts + 1
        logger.info(
            "%s: uploading part_num: %i, %i bytes (total %.3fGB)",
            self,
            part_num,
            self._buf.tell(),
            self._total_bytes / 1024.0 ** 3,
        )

I think that the bug might be exposed by the secrets_masker.py class from airflow, however I'm unsure if the bug is there or if it is from smart_open itself?

I should mention that this worked fine in Airflow 2.0.1 and 2.1.0 has exposed this problem. I have tried in a docker container with python 3.6, 3.7 and 3.8 but the same problem regardless.

Versions

Linux-5.10.25-linuxkit-x86_64-with-glibc2.2.5
Python 3.8.10 (default, May 12 2021, 15:56:47)
[GCC 8.3.0]
smart_open 5.1.0

hhagblom avatar Jun 09 '21 14:06 hhagblom

Thank you for the detailed report. I'm still unsure about what's happening here. The biggest mystery for me is:

  • You're uploading a file, so smart_open should be writing, but
  • The error is "io.UnsupportedOperation: read", so something is trying to read from a write-only stream

It's possible that the newer version of Airflow is expecting the log stream to be read-writeable (why?), but smart_open doesn't support this for S3 yet. Can you investigate and let me know if this hypothesis is correct? Depending on the answer, we have several options:

  • Treat this as a problem in Airflow, and see if it's possible for them to adjust their logging
  • Treat this as a problem in smart_open, and make read-write streams work for S3
  • Treat this as a problem in your application, and write logs to e.g. local disk before uploading to S3

mpenkov avatar Jun 10 '21 02:06 mpenkov

We’ll try to fix this on Airflow’s side.

uranusjr avatar Jun 11 '21 15:06 uranusjr