airflow
airflow copied to clipboard
Also allow passing buffer instead of path for retrieve_file and store_file methods in SFTPHook
In the FTPSHook it's also possible to pass a buffer (like StringIO/BytesIO) instead of a path, I've refactored the retrieve_file and store_file methods in SFTPHook so those also support those types. This avoids the need to create a file to be able to download a file to FTP, you can directly download it into a buffer and vice versa if you need to upload content to an FTP server you don't need to create a file locally to be able to upload it. I just delegate this to the underlying paramiko getfo and putfo methods, nothing fancy had to be done to achieve this as the paramiko library already supported this. This shortcoming was described as a pitfall for the SFTPHook.
Also using our in-house StreamOperator which allows executing an operator in a multi-threaded way within the same worker (similar to expand but then distributing the work over multiple threads within the same worker instead of multiple workers), we discovered that when using the SFTPHook in a PythonOperator to retrieve a file, the connections don't get closed, which seems probable as each thread has to instantiate a new SFTPHook as the underlying library paramiko isn't thread-safe. When you look at the source code you'll notice that each operation within the SFTPHook get's a connection but actually never closes it as you have to call the close_conn method manually.
def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None:
"""
Transfer the remote file to a local location.
If local_full_path is a string path, the file will be put
at that location.
:param remote_full_path: full path to the remote file
:param local_full_path: full path to the local file or a file-like buffer
:param prefetch: controls whether prefetch is performed (default: True)
"""
conn = self.get_conn()
if isinstance(local_full_path, BytesIO):
conn.getfo(remote_full_path, local_full_path, prefetch=prefetch)
else:
conn.get(remote_full_path, local_full_path, prefetch=prefetch)
In the past we didn't use the SFTPHook but used directly the paramiko library like this and didn't have any issues:
def get_file(remote_full_path: str) -> str:
buffer = BytesIO()
conn = get_connection(conn_id=sftp_conn)
with Transport((conn.host, conn.port)) as transport:
transport.connect(username=conn.login, password=conn.password)
with SFTPClient.from_transport(transport) as sftp:
sftp.retrieve_file(remote_full_path, buffer) # Download file to buffer
return buffer.getvalue().decode("utf-8")
So I think we should refactor the SFTP hook so it always release the sftp connection after each operator to avoid connection leaks:
def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None:
"""
Transfer the remote file to a local location.
If local_full_path is a string path, the file will be put
at that location.
:param remote_full_path: full path to the remote file
:param local_full_path: full path to the local file or a file-like buffer
:param prefetch: controls whether prefetch is performed (default: True)
"""
with self.get_sftp_conn() as conn:
if isinstance(local_full_path, BytesIO):
conn.getfo(remote_full_path, local_full_path, prefetch=prefetch)
else:
conn.get(remote_full_path, local_full_path, prefetch=prefetch)
Otherwise we would need to call it like this, in the above you don't need to worry about the connection:
def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None:
"""
Transfer the remote file to a local location.
If local_full_path is a string path, the file will be put
at that location.
:param remote_full_path: full path to the remote file
:param local_full_path: full path to the local file or a file-like buffer
:param prefetch: controls whether prefetch is performed (default: True)
"""
try:
conn = self.get_sftp_conn():
if isinstance(local_full_path, BytesIO):
conn.getfo(remote_full_path, local_full_path, prefetch=prefetch)
else:
conn.get(remote_full_path, local_full_path, prefetch=prefetch)```
finally:
conn.close()
I've added a test case for both changes.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst
or {issue_number}.significant.rst
, in newsfragments.