python-server-sdk icon indicating copy to clipboard operation
python-server-sdk copied to clipboard

Use a single thread within the SDK

Open zb-polysign opened this issue 3 years ago • 4 comments

Is your feature request related to a problem? Please describe. The SDK is currently launching 11 threads by default when doing ldclient.set_config(Config(sdk_key)); ldclient.get(). This is resource intensive in python and costing me more than 1Gb of memory per instance of my service that leverages the SDK and is expensive when running a large number of services.

Describe the solution you'd like The SDK should use a single thread by default or at least make it easy to configure it to only use a single thread.

Describe alternatives you've considered I've monkey patched the SDK to reduce the number of threads consumed, but will eventually have to abandon the SDK in favor of directly accessing the APIs.

zb-polysign avatar Oct 28 '22 22:10 zb-polysign

Hi. Rewriting the SDK to use async semantics instead of threads is something other customers have suggested, and we're not ruling it out— but it would be a full rewrite; there is no way to patch the current SDK code to be single-threaded, without entirely removing major functionality that developers expect (like streaming updates and database integrations).

However, I'm puzzled by the symptom of extremely large memory usage that you're seeing. It's hard for me to see how it could be entirely due to the use of worker threads as you're suggesting, so I'm wondering if there may be something else going on that we should look into. I say that for two reasons:

  • The majority of the threads you're counting are for very simple tasks like a repeating timer. They are almost always sleeping, and their memory usage should be very minimal.
  • The largest group of worker threads is a fixed-size pool of 5 threads that is used for sending events. Unless the events service on our side is responding so slowly that a request can't be finished prior to the next event flush, 4 of those threads will always be sleeping and will never do anything at all.
  • The worker threads that don't fall into either of those categories are for essential SDK services. The amount of memory they consume is normally proportional to the amount of data it is processing, regardless of what thread is doing the processing.

About that last point: if you have a very large amount of flag/segment data, the SDK has to load, parse, and store that data; and if you are generating a very large number of analytics events, the SDK has to buffer those events before they can be sent. A figure like "more than 1Gb of memory per instance" would be highly abnormal unless one or both of those things was the case. But if one of those things is the case, then I would tend to think the resource consumption would be fairly similar whether we used threads or not.

So, I'm wondering if you could say a little more about 1. what kind of patches you've been doing, 2. what impact on memory usage you saw from them, and 3. the approximate size of your flag/segment data (you can get this by doing curl -H 'Authorization:MY_SDK_KEY' https://sdk.launchdarkly.com/sdk/latest-all and looking at the size of the response).

eli-darkly avatar Oct 28 '22 23:10 eli-darkly

I'll preface this with I did my testing inside a docker container running with platform linux/x86_64 on an M1 machine, so that may cause python to allocate more memory per thread. That said, the size of the flag/segmentation data data isn't particularly large < content-length: 19644 and I did all of my testing under 0 load (it was all during process startup).

When I was testing, I put in a breakpoint and monitored memory consumption side by side and as I stepped through the code every time it launched a new thread I saw around a 75-100 MB increase in memory consumption for the process. In total I patched out 6 of the threads used by the sdk and saw ~600MB reduction in total memory consumption compared to no patch. As for the patch itself, it is below but I'll preface it with I took a path of changing as little as I could within the sdk (I agree it would take a significant rewrite of the sdk to cleanly reduce the number of threads) to achieve this so similar effect could be achieve much more cleanly with some more changes inside the SDK.

class PatchedRepeatingTask(ldclient.impl.repeating_task.RepeatingTask):
    """A patched repeating task designed to only use a single thread across all instances."""

    _TIME_TO_WAIT_FOR_INITIAL_TASK = 1

    __PATCH_LOCK = ReadWriteLock()
    __PATCH_SHARED_STATE = {
        "tasks": {},
        "stop": None,
        "thread": None,
        "thread_started": False,
    }

    @classmethod
    def _ensure_thread(cls) -> None:
        """Create the thread object."""
        try:
            cls.__PATCH_LOCK.rlock()
            if cls.__PATCH_SHARED_STATE["thread"]:
                return
        finally:
            cls.__PATCH_LOCK.runlock()

        try:
            cls.__PATCH_LOCK.lock()
            if not cls.__PATCH_SHARED_STATE["thread"]:
                cls.__PATCH_SHARED_STATE["stop"] = Event()
                cls.__PATCH_SHARED_STATE["thread"] = Thread(target=cls._shared_run)

            return
        finally:
            cls.__PATCH_LOCK.unlock()

    def __init__(self, interval: float, initial_delay: float, callable: Callable):
        self._initial_time = time.time() + initial_delay

        self._ensure_thread()

        try:
            self.__PATCH_LOCK.lock()

            # giving the task a random identifier for later use
            self.__PATCH_SHARED_STATE["tasks"][uuid.uuid4()] = {
                "action": callable,
                "next_time": self._initial_time,
                "interval": interval,
            }
        finally:
            self.__PATCH_LOCK.unlock()

    def start(self):
        """Starts the worker thread."""
        try:
            self.__PATCH_LOCK.lock()

            if not self.__PATCH_SHARED_STATE["thread"]:
                # this thread is already stopped, nothing to do
                return

            if self.__PATCH_SHARED_STATE["thread_started"]:
                return

            self.__PATCH_SHARED_STATE["thread"].start()
            self.__PATCH_SHARED_STATE["thread_started"] = True
        finally:
            self.__PATCH_LOCK.unlock()

    def stop(self):
        """Tells the worker thread to stop."""
        try:
            self.__PATCH_LOCK.lock()

            if not self.__PATCH_SHARED_STATE["thread_started"]:
                return

            self.__PATCH_SHARED_STATE["stop"].set()

            # intentionally replacing the objects so that if a run is using them
            # there is no complicated state management logic
            self.__PATCH_SHARED_STATE["tasks"] = {}
            self.__PATCH_SHARED_STATE["stop"] = None
            self.__PATCH_SHARED_STATE["thread"] = None
            self.__PATCH_SHARED_STATE["thread_started"] = False
        finally:
            self.__PATCH_LOCK.unlock()

    def _run(self):
        raise NotImplementedError("Instance specific run disabled, we shouldn't get here")

    @classmethod
    def _shared_run(cls):
        """Run all tasks that have been registered."""
        try:
            cls.__PATCH_LOCK.rlock()
            stop: Optional[Event] = cls.__PATCH_SHARED_STATE["stop"]
        finally:
            cls.__PATCH_LOCK.runlock()

        if not stop:
            return

        stopped = stop.is_set()
        while not stopped:
            try:
                cls.__PATCH_LOCK.rlock()
                tasks = cls.__PATCH_SHARED_STATE["tasks"]
            finally:
                cls.__PATCH_LOCK.runlock()

            current_time = time.time()
            updated_tasks = {}
            earliest_next_task = math.inf
            for task_id, task in tasks.items():
                if task["next_time"] > current_time:
                    earliest_next_task = min(earliest_next_task, task["next_time"])
                    continue

                updated_tasks[task_id] = current_time + task["interval"]
                try:
                    task["action"]()
                except Exception as exc:
                    LOGGER.warning("Unexpected exception on worker thread: %s", exc)

                earliest_next_task = min(earliest_next_task, updated_tasks[task_id])

            if updated_tasks:
                try:
                    cls.__PATCH_LOCK.lock()
                    for task_id, next_time in updated_tasks.items():
                        tasks[task_id]["next_time"] = next_time
                finally:
                    cls.__PATCH_LOCK.unlock()

            delay = (
                cls._TIME_TO_WAIT_FOR_INITIAL_TASK
                if earliest_next_task == math.inf
                else min(earliest_next_task - time.time(), cls._TIME_TO_WAIT_FOR_INITIAL_TASK)
            )
            stopped = stop.wait(delay) if delay > 0 else stop.is_set()


def monkey_patch_repeating_task():
    """Monkeypatch the LaunchDarkly client library to consume less threads."""
    if not all(
        (
            hasattr(ldclient.event_processor, "__MAX_FLUSH_THREADS__"),
            hasattr(ldclient.impl.repeating_task, "RepeatingTask"),
            hasattr(ldclient.event_processor, "RepeatingTask"),
            hasattr(ldclient.polling, "RepeatingTask"),
            hasattr(ldclient.repeating_timer, "RepeatingTask"),
            hasattr(ldclient.impl.big_segments, "RepeatingTask"),
            hasattr(ldclient.impl.integrations.files.file_data_source, "RepeatingTask"),
        )
    ):
        raise RuntimeError("Something changed in the LaunchDarkly library, check the changes and fix the patch")

    # patch the base object
    ldclient.impl.repeating_task.RepeatingTask = PatchedRepeatingTask

    # patch all consumption of the object
    ldclient.event_processor.RepeatingTask = PatchedRepeatingTask
    ldclient.polling.RepeatingTask = PatchedRepeatingTask
    ldclient.repeating_timer.RepeatingTask = PatchedRepeatingTask
    ldclient.impl.big_segments.RepeatingTask = PatchedRepeatingTask
    ldclient.impl.integrations.files.file_data_source.RepeatingTask = PatchedRepeatingTask

    # set the number of threads allocated to flushing
    ldclient.event_processor.__MAX_FLUSH_THREADS__ = 1

zb-polysign avatar Oct 29 '22 04:10 zb-polysign

Using one scheduler thread for all repeating tasks is a good idea that I think we should adopt. That still isn't going to get you anywhere near having only a single thread for the whole SDK, though. And I don't think it's possible to use a similar approach to merge the SDK's other worker threads; they are just doing completely different things.

I am still very puzzled as to how simply starting a thread in Python, especially very simple ones with almost no data of their own like the timer threads, could cause such a large jump in memory usage. And in order to try to reproduce this, I would need to know a bit more about your operating environment beyond just the host OS and the fact that you're using Docker. What version of Python is this? And is it the standard CPython runtime, or a variant? Also, what mechanism are you using to measure memory usage?

eli-darkly avatar Nov 01 '22 19:11 eli-darkly

I'm sure most of this can use a wider variety of versions, but dumping anything that might be helpful:

Host OS: Mac 12.6 Docker Version: 4.12.0 Docker base image: python:3.7-slim Docker platform: --platform linux/x86_64 sdk version: launchdarkly-server-sdk==7.5.1

python version is the built in for the image with no special configurations/variations memory measured using top and htop

zb-polysign avatar Nov 01 '22 20:11 zb-polysign