pyinfra
pyinfra copied to clipboard
Implement secure temporary file handling
Is your feature request related to a problem? Please describe
I have noticed that the temporary files created by pyinfra are world-readable and often have a predictable name. For example, when using pyinfra.operations.server.script()
, the uploaded script is stored in the temporary directory with permission bits set to 755. This has security implications if the script contains sensitive information. Another example would be pyinfra.operations.apt.deb()
which downloads data and stores it in a temporary file with a predictable name (SHA-1 hash of the download URL). In this example, a malicious actor could simply place a Debian package with the same name in the temporary directory, and because the internal call to pyinfra.operations.files.download()
does not use the force
parameter, the malicious actor's Debian package would be installed.
Describe the solution you'd like
In my opinion, the most robust solution would be to store all temporary deploy files in a secure temporary directory managed by pyinfra. I have addressed the issue in pyinfra 3.0b0 using a config file (config.py
) that looks as follows:
"""The pyinfra config file."""
import shlex
import sys
import typing
import uuid
import pyinfra
import pyinfra.api.host
import pyinfra.connectors.ssh
import pyinfra.facts.server
import pyinfra.operations.python
def pyinfra_temp_path(self: pyinfra.api.host.Host, *_: typing.Any) -> str:
"""Override for ``pyinfra.api.host.Host.get_temp_filename``.
pyinfra creates predictably named world-readable temporary files.
This override introduces more secure handling.
"""
def create_temp_dir(user: str) -> str:
class TempDir(typing.TypedDict): # pylint: disable=missing-class-docstring
path: str
created: bool
queued: bool
temp_dirs = vars(pyinfra_temp_path).setdefault(self.name, {})
temp_dir: TempDir = temp_dirs.setdefault(
user, {"path": "", "created": False, "queued": False}
)
# One temporary directory per host and user combination should
# be created.
if not temp_dir["path"]:
temp_dir["path"] = "/tmp/iac." + uuid.uuid4().hex[:10]
# Temporary directories should only be created when needed
# during the execution stage.
if pyinfra.state.is_executing:
if not temp_dir["created"]:
# A fact is used to execute commands so that operation
# parameters are automatically inherited.
self.get_fact(
pyinfra.facts.server.Command,
(
shlex.join(["mkdir", "-m", "700", "--", temp_dir["path"]])
+ " && "
+ shlex.join(["chown", "--", user, temp_dir["path"]])
),
)
temp_dir["created"] = True
elif not self.in_op:
# If this function is not called as part of an operation,
# an operation has to be queued to ensure that the directory
# creation code is executed during the execution stage.
if not temp_dir["queued"]:
pyinfra.operations.python.call(
name=f'Create directory "{temp_dir["path"]}"',
function=create_temp_dir,
user=user,
)
temp_dir["queued"] = True
return temp_dir["path"]
# SFTP uploads are executed with the user specified in the connector.
# If an operation uses privilege escalation, any uploads end up in
# the temporary directory and are moved to the desired location with
# escalated privileges. By using a separate temporary directory for
# this process, the connected user does not have to receive access
# to the superuser's temporary directory.
user = None
if isinstance(self.connector, pyinfra.connectors.ssh.SSHConnector):
# Using stack frame objects to check whether this function is
# being called as part of the put operation is simpler than
# monkey patching operation functions.
# pylint: disable-next=protected-access
caller_frame = sys._getframe().f_back
if caller_frame:
if (
caller_frame.f_code.co_filename.endswith("/pyinfra/operations/files.py")
and caller_frame.f_code.co_name == "put"
):
user = self.get_fact(
pyinfra.facts.server.User, _sudo=False, _su_user=None, _doas=False
)
if user is None:
user = self.get_fact(pyinfra.facts.server.User)
# Any arguments passed to the function are ignored because it should
# always return a random path to prevent collisions.
return create_temp_dir(user) + "/" + uuid.uuid4().hex[:14]
pyinfra.api.host.Host.get_temp_filename = pyinfra_temp_path # type: ignore
With this approach, pyinfra creates a temporary directory with a random name and permission bits set to 700 (/tmp
is used statically because I don't require $TMPDIR
evaluation). All temporary file/directory paths generated by pyinfra are scoped to this directory. The temporary directory is created when needed during the execution stage to prevent needless creation of temporary directories. If an SFTP upload is executed with an unprivileged user, an additional temporary directory for that user is created. In addition, all arguments passed to pyinfra.api.host.Host.get_temp_filename()
are ignored, because the returned path should always be random to prevent collisions.