borg icon indicating copy to clipboard operation
borg copied to clipboard

locking.py seems multiprocess-safe but not thread-safe

Open justinsteven opened this issue 1 year ago • 3 comments

Have you checked borgbackup docs, FAQ, and open GitHub issues?

Mostly

Is this a BUG / ISSUE report or a QUESTION?

Kind of?

System information. For client/server mode post info for both machines.

Your borg version (borg -V).

N/A

Operating system (distribution) and version.

Linux

Hardware / network configuration, and filesystems used.

N/A

How much data is handled by borg?

N/A

Full borg commandline that lead to the problem (leave away excludes and passwords)

N/A

Describe the problem you're observing.

I am making a bad decision that I will own, but I think I've come across something interesting in doing so.

I have adapted and repurposed src/borg/locking.py into my own Python scripts that are responsible for, among other things, synchronising borg repos using rsync to multiple disks at once. The reason for repurposing locking.py is so that I can get non-exclusive locks (borg with-lock seems to give exclusive locks only) so that I can safely have multiple rsync's reading from a given borg repo at once while being sure that borg won't write to the repo. I also prefer being able to use with Lock(): in my code rather than wrapping rsync in borg with-lock.

In repurposing locking.py I made some changes to make it self-contained and to somewhat appease my linter. My borg_lock.py is as follows, but I don't think the changes I made to it are causing the problems I'm having. If they are responsible, I apologise. If the reproducer below reproduces against the real locking.py (I haven't dabbled in borg development so I don't know how to do this myself) then my borg_lock.py should be able to be ruled out as being responsible.

borg_lock.py
#!/usr/bin/env python3
import errno
import json
import os
import socket
import sys
import tempfile
import time
import uuid

# Taken from https://github.com/borgbackup/borg/blob/1525c72549a8e0de3e95fb991f56da54de54a1d6/src/borg/locking.py

ADD, REMOVE, REMOVE2 = "add", "remove", "remove2"
SHARED, EXCLUSIVE = "shared", "exclusive"


def logger(msg, *args):
    sys.stderr.write((msg % args) + "\n")


class TimeoutTimer:
    """
    A timer for timeout checks (can also deal with "never timeout").
    It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
    polling too often or to support thread/process rescheduling).
    """

    def __init__(self, timeout=None, sleep=None):
        """
        Initialize a timer.

        :param timeout: time out interval [s] or None (never timeout, wait forever) [default]
        :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
                      or None (autocompute: use 10% of timeout [but not more than 60s],
                      or 1s for "never timeout" mode)
        """
        if timeout is not None and timeout < 0:
            raise ValueError("timeout must be >= 0")
        self.timeout_interval = timeout
        if sleep is None:
            if timeout is None:
                sleep = 1.0
            else:
                sleep = min(60.0, timeout / 10.0)
        self.sleep_interval = sleep
        self.start_time = None
        self.end_time = None

    def __repr__(self):
        return "<{}: start={!r} end={!r} timeout={!r} sleep={!r}>".format(
            self.__class__.__name__, self.start_time, self.end_time, self.timeout_interval, self.sleep_interval
        )

    def start(self):
        self.start_time = time.time()
        if self.timeout_interval is not None:
            self.end_time = self.start_time + self.timeout_interval
        return self

    def sleep(self):
        if self.sleep_interval >= 0:
            time.sleep(self.sleep_interval)

    def timed_out(self):
        return self.end_time is not None and time.time() >= self.end_time

    def timed_out_or_sleep(self):
        if self.timed_out():
            return True
        else:
            self.sleep()
            return False


class LockError(Exception):
    pass


class LockFailed(LockError):
    pass


class LockTimeout(LockError):
    pass


class NotLocked(LockError):
    pass


class NotMyLock(LockError):
    pass


# From https://github.com/borgbackup/borg/blob/1525c72549a8e0de3e95fb991f56da54de54a1d6/src/borg/platform/base.py#L302
# patched socket.getfqdn() - see https://bugs.python.org/issue5004
def getfqdn(name=""):
    """Get fully qualified domain name from name.

    An empty argument is interpreted as meaning the local host.
    """
    name = name.strip()
    if not name or name == "0.0.0.0":
        name = socket.gethostname()
    try:
        addrs = socket.getaddrinfo(name, None, 0, socket.SOCK_DGRAM, 0, socket.AI_CANONNAME)
    except OSError:
        pass
    else:
        for addr in addrs:
            if addr[3]:
                name = addr[3]
                break
    return name


# for performance reasons, only determine hostname / fqdn / hostid once.
# XXX this sometimes requires live internet access for issuing a DNS query in the background.
hostname = socket.gethostname()
fqdn = getfqdn(hostname)
# some people put the fqdn into /etc/hostname (which is wrong, should be the short hostname)
# fix this (do the same as "hostname --short" cli command does internally):
hostname = hostname.split(".")[0]

# uuid.getnode() is problematic in some environments (e.g. OpenVZ, see #3968) where the virtual MAC address
# is all-zero. uuid.getnode falls back to returning a random value in that case, which is not what we want.
# thus, we offer BORG_HOST_ID where a user can set an own, unique id for each of his hosts.
hostid = os.environ.get("BORG_HOST_ID")
if not hostid:
    hostid = f"{fqdn}@{uuid.getnode()}"


def get_process_id() -> tuple[str, int, int]:
    """
    Return identification tuple (hostname, pid, thread_id) for 'us'.
    This always returns the current pid, which might be different from before, e.g. if daemonize() was used.

    Note: Currently thread_id is *always* zero.
    """
    thread_id = 0
    pid = os.getpid()
    return hostid, pid, thread_id

# END taken from platform/base.py


# Taken from https://github.com/borgbackup/borg/blob/1525c72549a8e0de3e95fb991f56da54de54a1d6/src/borg/platform/posix.pyx#L37
def local_pid_alive(pid):
    """Return whether *pid* is alive."""
    try:
        # This doesn't work on Windows.
        # This does not kill anything, 0 means "see if we can send a signal to this process or not".
        # Possible errors: No such process (== stale lock) or permission denied (not a stale lock).
        # If the exception is not raised that means such a pid is valid and we can send a signal to it.
        os.kill(pid, 0)
        return True
    except OSError as err:
        if err.errno == errno.ESRCH:
            # ESRCH = no such process
            return False
        # Any other error (eg. permissions) means that the process ID refers to a live process.
        return True


def process_alive(host, pid, thread):
    """
    Check whether the (host, pid, thread_id) combination corresponds to a process potentially alive.

    If the process is local, then this will be accurate. If the process is not local, then this
    returns always True, since there is no real way to check.
    """
    assert isinstance(host, str)
    assert isinstance(hostid, str)
    assert isinstance(pid, int)
    assert isinstance(thread, int)

    if host != hostid:
        return True

    if thread != 0:
        # Currently thread is always 0, if we ever decide to set this to a non-zero value,
        # this code needs to be revisited, too, to do a sensible thing
        return True

    return local_pid_alive(pid)
# END taken from platform/posix.py


class ExclusiveLock:
    """An exclusive Lock based on mkdir fs operation being atomic.

    If possible, try to use the contextmanager here like::

        with ExclusiveLock(...) as lock:
            ...

    This makes sure the lock is released again if the block is left, no
    matter how (e.g. if an exception occurred).
    """
    id_: tuple[str, int, int]

    def __init__(self, path, timeout=None, sleep=None, id_=None):
        self.timeout = timeout
        self.sleep = sleep
        self.path = os.path.abspath(path)
        self.id_ = id_ or get_process_id()
        self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id_)
        self.kill_stale_locks = True
        self.stale_warning_printed = False

    def __enter__(self):
        return self.acquire()

    def __exit__(self, *exc):
        self.release()

    def __repr__(self):
        return f"<{self.__class__.__name__}: {self.unique_name!r}>"

    def acquire(self, timeout=None, sleep=None):
        if timeout is None:
            timeout = self.timeout
        if sleep is None:
            sleep = self.sleep
        parent_path, base_name = os.path.split(self.path)
        unique_base_name = os.path.basename(self.unique_name)
        temp_path = tempfile.mkdtemp(".tmp", base_name + ".", parent_path)
        temp_unique_name = os.path.join(temp_path, unique_base_name)
        try:
            with open(temp_unique_name, "wb"):
                pass
        except OSError as err:
            raise LockFailed(f"Failed to lock {self.path}: {err}") from None
        else:
            timer = TimeoutTimer(timeout, sleep).start()
            while True:
                try:
                    os.replace(temp_path, self.path)
                except OSError:  # already locked
                    if self.by_me():
                        return self
                    self.kill_stale_lock()
                    if timer.timed_out_or_sleep():
                        raise LockTimeout(self.path) from None
                else:
                    temp_path = None  # see finally:-block below
                    return self
        finally:
            # Renaming failed for some reason, so temp_dir still exists and
            # should be cleaned up anyway. Try to clean up, but don't crash.
            try:
                os.unlink(temp_unique_name)
            except:  # noqa
                pass
            try:
                os.rmdir(temp_path)
            except:  # noqa
                pass

    def release(self):
        if not self.is_locked():
            raise NotLocked(self.path)
        if not self.by_me():
            raise NotMyLock(self.path)
        os.unlink(self.unique_name)
        for retry in range(42):
            try:
                os.rmdir(self.path)
            except OSError as err:
                if err.errno in (errno.EACCES,):
                    # windows behaving strangely? -> just try again.
                    continue
                if err.errno not in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
                    # EACCES or EIO or ... = we cannot operate anyway, so re-throw
                    raise err
                # else:
                # Directory is not empty or doesn't exist any more.
                # this means we lost the race to somebody else -- which is ok.
            return

    def is_locked(self):
        return os.path.exists(self.path)

    def by_me(self):
        return os.path.exists(self.unique_name)

    def kill_stale_lock(self):
        try:
            names = os.listdir(self.path)
        except FileNotFoundError:  # another process did our job in the meantime.
            return False
        except PermissionError:  # win32 might throw this.
            return False
        else:
            for name in names:
                try:
                    host_pid, thread_str = name.rsplit("-", 1)
                    host, pid_str = host_pid.rsplit(".", 1)
                    pid = int(pid_str)
                    thread = int(thread_str, 16)
                except ValueError:
                    # Malformed lock name? Or just some new format we don't understand?
                    logger(f"Found malformed lock {name} in {self.path}. Please check/fix manually.")
                    return False

                if process_alive(host, pid, thread):
                    return False

                if not self.kill_stale_locks:
                    if not self.stale_warning_printed:
                        # Log this at warning level to hint the user at the ability
                        logger(
                            f"Found stale lock {name}, but not deleting because self.kill_stale_locks = False."
                        )
                        self.stale_warning_printed = True
                    return False

                try:
                    os.unlink(os.path.join(self.path, name))
                    logger(f"Killed stale lock {name}.")
                except OSError as err:
                    if not self.stale_warning_printed:
                        # This error will bubble up and likely result in locking failure
                        logger(f"Found stale lock {name}, but cannot delete due to {err}")
                        self.stale_warning_printed = True
                    return False

        try:
            os.rmdir(self.path)
        except OSError as err:
            if err.errno in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
                # Directory is not empty or doesn't exist any more = we lost the race to somebody else--which is ok.
                return False
            # EACCES or EIO or ... = we cannot operate anyway
            logger("Failed to remove lock dir: %s", str(err))
            return False

        return True

    def break_lock(self):
        if self.is_locked():
            for name in os.listdir(self.path):
                os.unlink(os.path.join(self.path, name))
            os.rmdir(self.path)

    def migrate_lock(self, old_id, new_id):
        """migrate the lock ownership from old_id to new_id"""
        assert self.id_ == old_id
        new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id)
        if self.is_locked() and self.by_me():
            with open(new_unique_name, "wb"):
                pass
            os.unlink(self.unique_name)
        self.id_, self.unique_name = new_id, new_unique_name


class LockRoster:
    """
    A Lock Roster to track shared/exclusive lockers.

    Note: you usually should call the methods with an exclusive lock held,
    to avoid conflicting access by multiple threads/processes/machines.
    """

    def __init__(self, path, id_=None):
        self.path = path
        self.id_ = id_ or get_process_id()
        self.kill_stale_locks = True

    def load(self):
        try:
            with open(self.path) as f:
                data = json.load(f)

            # Just nuke the stale locks early on load
            if self.kill_stale_locks:
                for key in (SHARED, EXCLUSIVE):
                    try:
                        entries = data[key]
                    except KeyError:
                        continue
                    elements = set()
                    for host, pid, thread in entries:
                        if process_alive(host, pid, thread):
                            elements.add((host, pid, thread))
                        else:
                            logger(
                                f"Removed stale {key} roster lock for host {host} pid {pid} thread {thread}."
                            )
                    data[key] = list(elements)
        except (FileNotFoundError, ValueError):
            # no or corrupt/empty roster file?
            data = {}
        return data

    def save(self, data):
        with open(self.path, "w") as f:
            json.dump(data, f)

    def remove(self):
        try:
            os.unlink(self.path)
        except FileNotFoundError:
            pass

    def get(self, key):
        roster = self.load()
        return {tuple(e) for e in roster.get(key, [])}

    def empty(self, *keys):
        return all(not self.get(key) for key in keys)

    def modify(self, key, op):
        roster = self.load()
        try:
            elements = {tuple(e) for e in roster[key]}
        except KeyError:
            elements = set()
        if op == ADD:
            elements.add(self.id_)
        elif op == REMOVE:
            # note: we ignore it if the element is already not present anymore.
            # this has been frequently seen in teardowns involving Repository.__del__ and Repository.__exit__.
            elements.discard(self.id_)
        elif op == REMOVE2:
            # needed for callers that do not want to ignore.
            elements.remove(self.id_)
        else:
            raise ValueError("Unknown LockRoster op %r" % op)
        roster[key] = list(list(e) for e in elements)
        self.save(roster)

    def migrate_lock(self, key, old_id, new_id):
        """migrate the lock ownership from old_id to new_id"""
        assert self.id_ == old_id
        # need to switch off stale lock killing temporarily as we want to
        # migrate rather than kill them (at least the one made by old_id).
        killing, self.kill_stale_locks = self.kill_stale_locks, False
        try:
            try:
                self.modify(key, REMOVE2)
            except KeyError:
                # entry was not there, so no need to add a new one, but still update our id
                self.id_ = new_id
            else:
                # old entry removed, update our id and add a updated entry
                self.id_ = new_id
                self.modify(key, ADD)
        finally:
            self.kill_stale_locks = killing


class Lock:
    """
    A Lock for a resource that can be accessed in a shared or exclusive way.
    Typically, write access to a resource needs an exclusive lock (1 writer,
    no one is allowed reading) and read access to a resource needs a shared
    lock (multiple readers are allowed).

    If possible, try to use the contextmanager here like::

        with Lock(...) as lock:
            ...

    This makes sure the lock is released again if the block is left, no
    matter how (e.g. if an exception occurred).
    """

    def __init__(self, path, exclusive=False, sleep=None, timeout=None, id_=None):
        self.path = path
        self.is_exclusive = exclusive
        self.sleep = sleep
        self.timeout = timeout
        self.id_ = id_ or get_process_id()
        # globally keeping track of shared and exclusive lockers:
        self._roster = LockRoster(path + ".roster", id_=id_)
        # an exclusive lock, used for:
        # - holding while doing roster queries / updates
        # - holding while the Lock itself is exclusive
        self._lock = ExclusiveLock(path + ".exclusive", id_=id_, timeout=timeout)

    def __enter__(self):
        return self.acquire()

    def __exit__(self, *exc):
        self.release()

    def __repr__(self):
        return f"<{self.__class__.__name__}: {self.id_!r}>"

    def acquire(self, exclusive=None, remove=None, sleep=None):
        if exclusive is None:
            exclusive = self.is_exclusive
        sleep = sleep or self.sleep or 0.2
        if exclusive:
            self._wait_for_readers_finishing(remove, sleep)
            self._roster.modify(EXCLUSIVE, ADD)
        else:
            with self._lock:
                if remove is not None:
                    self._roster.modify(remove, REMOVE)
                self._roster.modify(SHARED, ADD)
        self.is_exclusive = exclusive
        return self

    def _wait_for_readers_finishing(self, remove, sleep):
        timer = TimeoutTimer(self.timeout, sleep).start()
        while True:
            self._lock.acquire()
            try:
                if remove is not None:
                    self._roster.modify(remove, REMOVE)
                if len(self._roster.get(SHARED)) == 0:
                    return  # we are the only one and we keep the lock!
                # restore the roster state as before (undo the roster change):
                if remove is not None:
                    self._roster.modify(remove, ADD)
            except:  # noqa
                # avoid orphan lock when an exception happens here, e.g. Ctrl-C!
                self._lock.release()
                raise
            else:
                self._lock.release()
            if timer.timed_out_or_sleep():
                raise LockTimeout(self.path)

    def release(self):
        if self.is_exclusive:
            self._roster.modify(EXCLUSIVE, REMOVE)
            if self._roster.empty(EXCLUSIVE, SHARED):
                self._roster.remove()
            self._lock.release()
        else:
            with self._lock:
                self._roster.modify(SHARED, REMOVE)
                if self._roster.empty(EXCLUSIVE, SHARED):
                    self._roster.remove()

    def upgrade(self):
        # WARNING: if multiple read-lockers want to upgrade, it will deadlock because they
        # all will wait until the other read locks go away - and that won't happen.
        if not self.is_exclusive:
            self.acquire(exclusive=True, remove=SHARED)

    def downgrade(self):
        if self.is_exclusive:
            self.acquire(exclusive=False, remove=EXCLUSIVE)

    def got_exclusive_lock(self):
        return self.is_exclusive and self._lock.is_locked() and self._lock.by_me()

    def break_lock(self):
        self._roster.remove()
        self._lock.break_lock()

    def migrate_lock(self, old_id, new_id):
        assert self.id_ == old_id
        self.id_ = new_id
        if self.is_exclusive:
            self._lock.migrate_lock(old_id, new_id)
            self._roster.migrate_lock(EXCLUSIVE, old_id, new_id)
        else:
            with self._lock:
                self._lock.migrate_lock(old_id, new_id)
                self._roster.migrate_lock(SHARED, old_id, new_id)

The problem I'm having is that when run in a threaded context, exclusive and non-exclusive locks raise within the locking code itself.

Create a repo:

% borg init -e none myrepo

Regularly fail to get exclusive and non-exclusive locks in threads:

#!/usr/bin/env python3
import concurrent.futures
from borg_lock import Lock
import os


def worker_exclusive(*args, **kwargs):
    print("Getting exclusive lock")
    with Lock("myrepo/", exclusive=True):
        print(os.getpid())


def worker_nonexclusive(*args, **kwargs):
    print("Getting non-exclusive lock")
    with Lock("myrepo/", exclusive=False):
        print(os.getpid())


with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(worker_exclusive) for _ in range(8)]
    for future in concurrent.futures.as_completed(futures):
        print(future)

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(worker_nonexclusive) for _ in range(8)]
    for future in concurrent.futures.as_completed(futures):
        print(future)
% python3 thread.py
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
3048259
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
3048259
3048259
3048259
3048259
<Future at 0x7fe92dbb23a0 state=finished returned NoneType>
3048259
3048259
<Future at 0x7fe92dbcbbb0 state=finished raised NotLocked>
<Future at 0x7fe92dbd7190 state=finished raised NotLocked>
<Future at 0x7fe92dbd7af0 state=finished raised NotLocked>
<Future at 0x7fe92dbd7610 state=finished raised NotLocked>
<Future at 0x7fe92dbd7df0 state=finished raised NotLocked>
<Future at 0x7fe92dbcbd30 state=finished raised NotLocked>
3048259
<Future at 0x7fe92dbded60 state=finished returned NoneType>
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
3048259
3048259
3048259
3048259
<Future at 0x7fe92c35dbb0 state=finished raised FileNotFoundError>
<Future at 0x7fe92c3595b0 state=finished raised FileNotFoundError>
<Future at 0x7fe92dbd72b0 state=finished returned NoneType>
<Future at 0x7fe92c35d1c0 state=finished raised NotLocked>
<Future at 0x7fe92dbe45e0 state=finished returned NoneType>
<Future at 0x7fe92c35dbe0 state=finished raised NotLocked>
3048259
<Future at 0x7fe92c3590d0 state=finished raised NotLocked>
<Future at 0x7fe92c35d340 state=finished returned NoneType>

Reliably succeed in getting exclusive and non-exclusive locks across multiple processes:

#!/usr/bin/env python3
from borg_lock import Lock
from multiprocessing import Pool
import os


def worker_exclusive(*args, **kwargs):
    print("Getting exclusive lock")
    with Lock("myrepo/", exclusive=True):
        print(os.getpid())


def worker_nonexclusive(*args, **kwargs):
    print("Getting non-exclusive lock")
    with Lock("myrepo/", exclusive=False):
        print(os.getpid())


with Pool(8) as p:
    print(p.map(worker_exclusive, range(8)))

with Pool(8) as p:
    print(p.map(worker_nonexclusive, range(8)))
% ./multiprocess.py
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
3049067
3049066
3049073
3049069
3049070
3049071
3049068
3049072
[None, None, None, None, None, None, None, None]
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
3049102
3049106
3049104
3049103
3049107
3049105
3049108
3049109
[None, None, None, None, None, None, None, None]

Suspicions and thoughts

I'm guessing that perhaps https://github.com/borgbackup/borg/blob/e3f565623dd482ea4a8211644e234a15724d5f3f/src/borg/platform/base.py#L309 could be related. I'm guessing this causes the locking to key off of the hostid and PID only. Without the thread ID being part of the keying, multiple parallel lock acquisitions and/or releases within the same process could interfere with each other.

I'm also guessing that borg doesn't use threading. And so this might not be a problem for the project. It might only be a problem in the case that borg adopts threading somewhere, or for folks like me who are borrowing parts of borg's internals for their own bad decisions.

And so perhaps the locking is working as intended. It just worries me to see locking code that seems to fail under multi-threading.

justinsteven avatar May 22 '24 01:05 justinsteven

@justinsteven could you do a PR with a test for this (which fails with current code) and a fix for the locking code that makes that test succeed?

ThomasWaldmann avatar May 31 '24 12:05 ThomasWaldmann

@ThomasWaldmann sorry for the slow response. I can try, but I'm not sure when I'll be able to get to it. Also, just thinking out loud, a test case would likely fail non-deterministically (though the POC shown above demonstrates failure quite reliably on my machine)

justinsteven avatar Jun 25 '24 09:06 justinsteven

If it randomly fails, we could skip it by default. It would be useful nevertheless, just to check it works / to find if it doesn't.

ThomasWaldmann avatar Jun 25 '24 11:06 ThomasWaldmann