snowflake-connector-python icon indicating copy to clipboard operation
snowflake-connector-python copied to clipboard

SNOW-742076: PUT file_stream: io.UnsupportedOperation: File or stream is not seekable

Open pp-gborodin opened this issue 2 years ago • 5 comments

  1. What version of Python are you using?
Python 3.10.6 (main, Nov 14 2022, 16:10:14) [GCC 11.3.0]
  1. What operating system and processor architecture are you using?
Linux-5.15.0-58-generic-x86_64-with-glibc2.35
  1. What are the component versions in the environment (pip freeze)?
apache-airflow-providers-snowflake==2.2.0
  1. What did you do?

I tried to upload data to snowflake with PUT command via unix pipe. I don't want to store a temporary file on my system, so I tried to stream the data from postgres to snowflake through a pipe. Here is a code sample:

def _get_data_from_postgres(file):
    with os.fdopen(file, 'wb') as f:
        f.write(b'fruit,count\n')
        f.write(b'apple,3\n')
        f.write(b'orange,5\n')

def put_data_to_snowflake(f):
    conn = snowflake_connection
    with conn.cursor() as cur:
        cur.execute(f"PUT file:///pipe.csv @fruits OVERWRITE = TRUE", file_stream=File(f))

def _put_data_to_snowflake(file):
    with os.fdopen(file, 'rb') as f:
        put_data_to_snowflake(f)


rf, wf = os.pipe()

get_thread = Thread(target=_get_data_from_postgres, args=(wf,))
put_thread = Thread(target=_put_data_to_snowflake, args=(rf,))

get_thread.start()
put_thread.start()

timeout=30
get_thread.join(timeout=timeout)
put_thread.join(timeout=timeout)
  1. What did you expect to see?

I expected to see the data was uploaded

  1. What happened instead?
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/xometry-dags/dags/common/dwh_snowflake.py", line 37, in _put_data_to_snowflake
    put_data_to_snowflake(f, conn, table)
  File "/opt/xometry-dags/dags/common/dwh_snowflake.py", line 27, in put_data_to_snowflake
    cur.execute(f"PUT file:///pipe.csv @fruits OVERWRITE = TRUE", file_stream=File(f))
  File "/home/***/.local/lib/python3.7/site-packages/snowflake/connector/cursor.py", line 794, in execute
    sf_file_transfer_agent.execute()
  File "/home/***/.local/lib/python3.7/site-packages/snowflake/connector/file_transfer_agent.py", line 359, in execute
    self._init_file_metadata()
  File "/home/***/.local/lib/python3.7/site-packages/snowflake/connector/file_transfer_agent.py", line 973, in _init_file_metadata
    src_file_size=self._source_from_stream.seek(0, os.SEEK_END),
io.UnsupportedOperation: File or stream is not seekable.
  1. Thoughts

I see that snowflake connector uses seek method to calculate the file size. Is this mandatory? Can I upload the data without knowing the size? If so how can I do that?

pp-gborodin avatar Feb 10 '23 10:02 pp-gborodin