"io.UnsupportedOperation: read" when processing log statement for s3 multipart upload
Problem description
-
What are you trying to achieve? I use airflow MsSQLToS3Operator which in turn depends on smart_open to save extracted SQL Server datasets to s3.
-
What is the expected result? The expected result is that the dataset that I extract will be uploaded to s3
-
What are you seeing instead? I get the following stack-trace
[2021-06-09 09:43:17,500] {mssql_to_s3_operator.py:276} INFO - Uploading!
[2021-06-09 09:43:17,657] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/developer/airflow/dags/modified_operators/mssql_to_s3_operator_mod.py", line 100, in execute
self.build_fetch_query(hook)
File "/home/developer/airflow/dags/modified_operators/mssql_to_s3_operator_mod.py", line 169, in build_fetch_query
self.get_records_batch(hook, query_filter)
File "/home/developer/airflow/dags/modified_operators/mssql_to_s3_operator_mod.py", line 277, in get_records_batch
fout.write(results)
File "/home/developer/.local/lib/python3.6/site-packages/smart_open/s3.py", line 839, in write
self._upload_next_part()
File "/home/developer/.local/lib/python3.6/site-packages/smart_open/s3.py", line 870, in _upload_next_part
self._total_bytes / 1024.0 ** 3,
File "/usr/lib/python3.6/logging/__init__.py", line 1308, in info
self._log(INFO, msg, args, **kwargs)
File "/usr/lib/python3.6/logging/__init__.py", line 1444, in _log
self.handle(record)
File "/usr/lib/python3.6/logging/__init__.py", line 1454, in handle
self.callHandlers(record)
File "/usr/lib/python3.6/logging/__init__.py", line 1516, in callHandlers
hdlr.handle(record)
File "/usr/lib/python3.6/logging/__init__.py", line 861, in handle
rv = self.filter(record)
File "/usr/lib/python3.6/logging/__init__.py", line 720, in filter
result = f.filter(record)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/log/secrets_masker.py", line 157, in filter
record.__dict__[k] = self.redact(v)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/log/secrets_masker.py", line 203, in redact
return tuple(self.redact(subval) for subval in item)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/log/secrets_masker.py", line 203, in <genexpr>
return tuple(self.redact(subval) for subval in item)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/log/secrets_masker.py", line 205, in redact
return list(self.redact(subval) for subval in item)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/log/secrets_masker.py", line 205, in <genexpr>
return list(self.redact(subval) for subval in item)
io.UnsupportedOperation: read
Analysis performed
I can see in the code for mssql operator, this call.
with smart_open.smart_open(url, "wb") as fout:
# In s3.py open()
if mode not in constants.BINARY_MODES:
raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES))
I have tried to change "wb" to "wb+" which might fix the problem but it doesn't allow me further down in the smart_open call-stack.
The error message is coming from the following snippet in s3.py
#
# Internal methods.
#
def _upload_next_part(self):
part_num = self._total_parts + 1
logger.info(
"%s: uploading part_num: %i, %i bytes (total %.3fGB)",
self,
part_num,
self._buf.tell(),
self._total_bytes / 1024.0 ** 3,
)
I think that the bug might be exposed by the secrets_masker.py class from airflow, however I'm unsure if the bug is there or if it is from smart_open itself?
I should mention that this worked fine in Airflow 2.0.1 and 2.1.0 has exposed this problem. I have tried in a docker container with python 3.6, 3.7 and 3.8 but the same problem regardless.
Versions
Linux-5.10.25-linuxkit-x86_64-with-glibc2.2.5
Python 3.8.10 (default, May 12 2021, 15:56:47)
[GCC 8.3.0]
smart_open 5.1.0
Thank you for the detailed report. I'm still unsure about what's happening here. The biggest mystery for me is:
- You're uploading a file, so smart_open should be writing, but
- The error is "io.UnsupportedOperation: read", so something is trying to read from a write-only stream
It's possible that the newer version of Airflow is expecting the log stream to be read-writeable (why?), but smart_open doesn't support this for S3 yet. Can you investigate and let me know if this hypothesis is correct? Depending on the answer, we have several options:
- Treat this as a problem in Airflow, and see if it's possible for them to adjust their logging
- Treat this as a problem in smart_open, and make read-write streams work for S3
- Treat this as a problem in your application, and write logs to e.g. local disk before uploading to S3
We’ll try to fix this on Airflow’s side.