locust icon indicating copy to clipboard operation
locust copied to clipboard

Asyncio support

Open pawamoy opened this issue 5 years ago • 33 comments

Follow up of #924 and #1079.

Is your feature request related to a problem? Please describe.

Sometimes one needs to simulate an asynchronous user behavior. Hatching more users does not solve the problem.

Describe the solution you'd like

Support defining async tasks or something similar.

Describe alternatives you've considered

One could also write a custom client. Maybe this would solve the problem. It would be great to collect examples or snippets on how to do so in this issue, and add them in https://docs.locust.io/en/stable/testing-other-systems.html or another doc page.

pawamoy avatar Jan 31 '20 16:01 pawamoy

You can achieve this by spawning greenlets within your locust tasks. Here's a small example:


from gevent.pool import Pool
from locust import HttpLocust, TaskSet, task, constant

class MyLocust(HttpLocust):
    host = "https://docs.locust.io"
    wait_time = constant(5)
    class task_set(TaskSet):
        @task
        def dual_greenlet_task(self):
            def do_http_request_or_whatever():
                print("yay, running in separate greenlet")
                response = self.client.get("/")
                print("status code:", response.status_code)
            pool = Pool()
            pool.spawn(do_http_request_or_whatever)
            pool.spawn(do_http_request_or_whatever)
            pool.join()

Locust is heavily reliant on Gevent, and as far as I know gevent and python async are not 100% compatible. Therefore I don't see locust supporting python async any time soon. There's been discussion about it in the gevent project: https://github.com/gevent/gevent/issues/982

heyman avatar Jan 31 '20 16:01 heyman

Alright thanks for the explanation, example and link 🙂

pawamoy avatar Jan 31 '20 16:01 pawamoy

Is there any reasons to not switch to asyncio completely?

redpublic avatar Dec 07 '20 17:12 redpublic

The main reason is that it would be a ton of work to rewrite everything. Apart from that I think it is a great idea :)

cyberw avatar Dec 07 '20 18:12 cyberw

You can achieve this by spawning greenlets within your locust tasks. Here's a small example:

from gevent.pool import Pool
from locust import HttpLocust, TaskSet, task, constant

class MyLocust(HttpLocust):
    host = "https://docs.locust.io"
    wait_time = constant(5)
    class task_set(TaskSet):
        @task
        def dual_greenlet_task(self):
            def do_http_request_or_whatever():
                print("yay, running in separate greenlet")
                response = self.client.get("/")
                print("status code:", response.status_code)
            pool = Pool()
            pool.spawn(do_http_request_or_whatever)
            pool.spawn(do_http_request_or_whatever)
            pool.join()

Locust is heavily reliant on Gevent, and as far as I know gevent and python async are not 100% compatible. Therefore I don't see locust supporting python async any time soon. There's been discussion about it in the gevent project: gevent/gevent#982

AFAIK using different threads to make many parallel requests with requests.Session is not good idea because requests.Session is not thread safe (https://github.com/psf/requests/issues/1871). You need to use other client to do this

Dalas avatar Jan 06 '21 15:01 Dalas

AFAIK using different threads to make many parallel requests with requests.Session is not good idea because requests.Session is not thread safe (psf/requests#1871). You need to use other client to do this

From the description of that issue it sounds like it should only be a problem if you're using the same client to connect to many different hosts. In that case one could create another HttpSession instance.

heyman avatar Jan 06 '21 18:01 heyman

@heyman in my case it happens when I'm executing many requests on the same host (as in your example).

Dalas avatar Jan 11 '21 10:01 Dalas

@heyman sorry about confusion, looks like it was my mistake. Everything is ok with requests.Session

Dalas avatar Jan 11 '21 14:01 Dalas

Do we support Asyncio in locust task now?

Monica-Ji avatar Feb 09 '23 12:02 Monica-Ji

hey guys, maintainers, theoretically, do you see any potential issues with using asgiref project to execute async functions as sync ones ?

sergeyglazyrindev avatar Dec 12 '23 21:12 sergeyglazyrindev

Havent tried it, would love to see a proof of concept!

cyberw avatar Dec 12 '23 21:12 cyberw

Most asyncio wrappers have issues coexisting with gevent though.. but if it works then it would be awesome!

cyberw avatar Dec 12 '23 21:12 cyberw

gevent now has an asyncio version, so that should make integration easier: https://github.com/gfmio/asyncio-gevent

jimdowling avatar Mar 15 '24 08:03 jimdowling

gevent now has an asyncio version, so that should make integration easier: https://github.com/gfmio/asyncio-gevent

Unfortunately that repo hasnt been updated in two years, so I'm not sure if it works any more.

I think it would be doable to implement an asyncio version of the locust worker, but it'll take some work.

cyberw avatar Mar 15 '24 08:03 cyberw

Is there a more concrete example of calling ayncio coroutine inside a Locust gevent task? Sorry but it's not immediately obvious after reading the code examples above (where I can't find where asyncio coroutines are called...)

thx123 avatar Nov 18 '24 17:11 thx123

Asyncio is not supported right now.

We're planning to build a completely asyncio-based worker implementation, but unless theres a simple way to allow gevent and asyncio to coexist (like asyncio-gevent seemed to promise before it went silent) it might be a while.

In the event that someone wants to take it upon themselves to reimplement the worker on asyncio themselves, it would be most welcome :)

cyberw avatar Nov 18 '24 19:11 cyberw

Thanks @cyberw for your prompt reply. After days of struggle, I finally found a way to make asyncio work in Locust. Here is a working example. The basic idea is to create a dedicated thread with the asyncio event loop in each process (since Locust can start multiple workers on multiple CPU cores that run in separate processes), and define a lambda function that takes a coroutine and its parameters as the argument, and dynamically run it in the event loop in the process that it belongs.

import asyncio
import logging
import gevent
import os
import threading
import time
from locust import User, task, between, events
import logging
import threading
from typing import Any, Callable, Awaitable

# Set the logging level for the root logger to WARNING to suppress INFO messages
logging.getLogger("root").setLevel(logging.WARNING)

def thread_func(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any:
    """Run asyncio coroutine in the current event loop to make gevent work."""
    logging.debug(f'In thread_func()')
    try:
        assert loop.is_running(), "Asyncio event loop is not running!"
        future = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop)
        event = gevent.event.Event()
        future.add_done_callback(lambda _: event.set())
        event.wait()
        return future.result(timeout=3)
    except TimeoutError as te:
        logging.exception(f'TimeoutError: {te}')
        future.cancel()
        raise te
    except RuntimeError as rte:
        logging.exception(f'RuntimeError: {rte}')
        raise rte
    except Exception as e:
        logging.exception(f'Other Exception: {e}')
        raise e

def run_asyncio(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any:
    logging.debug(f'In run_asyncio()')
    return thread_func(loop, coro, *args, **kwargs)


async def async_foo(param1, param2):
    """asyncio coroutine function to be tested"""
    ...


class AsyncioInLocustTest(User):
    shortest_secs, longest_secs = 0.1, 1
    wait_time = between(shortest_secs, longest_secs)

    # Class-level variables to track process initialization
    shared_loop = None
    shared_thread = None
    initialized_pid = None  # Store the PID of the process where initialization has occurred

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Detect newly forked processes
        current_pid = os.getpid()
        if OSSLoadTest.initialized_pid != current_pid:
            # Run initialization for the current process
            self.init_process_resources(current_pid)

        # Use the shared event loop and thread
        self.loop = OSSLoadTest.shared_loop

    @classmethod
    def init_process_resources(cls, current_pid):
        """Initialize resources once for each new process."""
        cls.initialized_pid = current_pid  # Mark the process as initialized
        print(f"Initializing resources for process PID: {current_pid}")

        # Create a shared asyncio event loop and thread for this process
        cls.shared_loop = asyncio.new_event_loop()
        cls.shared_thread = threading.Thread(target=cls.shared_loop.run_forever, daemon=True)
        cls.shared_thread.start()

    @task
    def asyncio_test(self):
        start_time = time.time()
        try:
            run_asyncio(self.loop, async_foo, foo_param1, foo_param2)

            # Fire success event
            events.request.fire(
                request_type="foo",
                name="bar",
                response_time=int((time.time() - start_time) * 1000),
                response_length=0,
                exception=None
            )
        except Exception as e:
            # Fire failure event
            events.request.fire(
                request_type="foo",
                name="bar",
                response_time=int((time.time() - start_time) * 1000),
                response_length=0,
                exception=e
            )
            raise e

thx123 avatar Nov 19 '24 04:11 thx123

This issue is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 20 days.

github-actions[bot] avatar Mar 12 '25 02:03 github-actions[bot]

This issue was closed because it has been marked stale for 20 days with no activity. This does not necessarily mean that the issue is bad, but it most likely means that nobody is willing to take the time to fix it. If you have found Locust useful, then consider contributing a fix yourself!

github-actions[bot] avatar Apr 01 '25 02:04 github-actions[bot]

Thanks @cyberw for your prompt reply. After days of struggle, I finally found a way to make asyncio work in Locust. Here is a working example. The basic idea is to create a dedicated thread with the asyncio event loop in each process (since Locust can start multiple workers on multiple CPU cores that run in separate processes), and define a lambda function that takes a coroutine and its parameters as the argument, and dynamically run it in the event loop in the process that it belongs.

import asyncio
import logging
import gevent
import os
import threading
import time
from locust import User, task, between, events
import logging
import threading
from typing import Any, Callable, Awaitable

# Set the logging level for the root logger to WARNING to suppress INFO messages
logging.getLogger("root").setLevel(logging.WARNING)

def thread_func(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any:
    """Run asyncio coroutine in the current event loop to make gevent work."""
    logging.debug(f'In thread_func()')
    try:
        assert loop.is_running(), "Asyncio event loop is not running!"
        future = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop)
        event = gevent.event.Event()
        future.add_done_callback(lambda _: event.set())
        event.wait()
        return future.result(timeout=3)
    except TimeoutError as te:
        logging.exception(f'TimeoutError: {te}')
        future.cancel()
        raise te
    except RuntimeError as rte:
        logging.exception(f'RuntimeError: {rte}')
        raise rte
    except Exception as e:
        logging.exception(f'Other Exception: {e}')
        raise e

def run_asyncio(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any:
    logging.debug(f'In run_asyncio()')
    return thread_func(loop, coro, *args, **kwargs)


async def async_foo(param1, param2):
    """asyncio coroutine function to be tested"""
    ...


class AsyncioInLocustTest(User):
    shortest_secs, longest_secs = 0.1, 1
    wait_time = between(shortest_secs, longest_secs)

    # Class-level variables to track process initialization
    shared_loop = None
    shared_thread = None
    initialized_pid = None  # Store the PID of the process where initialization has occurred

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Detect newly forked processes
        current_pid = os.getpid()
        if OSSLoadTest.initialized_pid != current_pid:
            # Run initialization for the current process
            self.init_process_resources(current_pid)

        # Use the shared event loop and thread
        self.loop = OSSLoadTest.shared_loop

    @classmethod
    def init_process_resources(cls, current_pid):
        """Initialize resources once for each new process."""
        cls.initialized_pid = current_pid  # Mark the process as initialized
        print(f"Initializing resources for process PID: {current_pid}")

        # Create a shared asyncio event loop and thread for this process
        cls.shared_loop = asyncio.new_event_loop()
        cls.shared_thread = threading.Thread(target=cls.shared_loop.run_forever, daemon=True)
        cls.shared_thread.start()

    @task
    def asyncio_test(self):
        start_time = time.time()
        try:
            run_asyncio(self.loop, async_foo, foo_param1, foo_param2)

            # Fire success event
            events.request.fire(
                request_type="foo",
                name="bar",
                response_time=int((time.time() - start_time) * 1000),
                response_length=0,
                exception=None
            )
        except Exception as e:
            # Fire failure event
            events.request.fire(
                request_type="foo",
                name="bar",
                response_time=int((time.time() - start_time) * 1000),
                response_length=0,
                exception=e
            )
            raise e

great, i will give it a try

jiqiujia avatar Apr 26 '25 10:04 jiqiujia

This issue is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 20 days.

github-actions[bot] avatar Jun 26 '25 02:06 github-actions[bot]

Hi, is there any progress?

GeraldWhitehead avatar Jul 05 '25 11:07 GeraldWhitehead

Hi @cyberw, I understand this is a complex task. Could you share if the Locust team has any plans or estimated timelines for addressing this issue? Also, please let us know if there are ways the open-source community could contribute to help move this forward.

Thanks for your work on this!

deepwzh avatar Jul 31 '25 09:07 deepwzh

Hi Sorry for the lack of response @GeraldWhitehead and @deepwzh . We did actually make an internal PoC before we got swamped doing all sorts of things for the SaaS/cloud solution.

I would love for people to contribute. I see a couple of options:

  • Completely new Runner (which is kind of what I did in the PoC here: https://github.com/locustcloud/aiolocust)

    • Pros: pure async in the runner (so minimal risk for performance issues, clear paradigm), simple implementation
    • Cons: Requires re-implementing a lot of stuff (probably all the way up to changes in main), probably would mean a very reduced feature set.
  • Run asyncio on gevent using something like https://github.com/gfmio/asyncio-gevent

    • Pros: Simple approach, everything outside the Runner, or maybe even the User can continue to use gevent
    • Cons: asyncio-gevent isn't super well-maintained (it came back to life just recently after a 2.5 year hiatus), mixed paradigm, could have sneaky performance issues
  • I dont think jiqiujia's/thx123's approaches would work, but if it can be improved beyond a hack, then maybe... At the very least it would have to do asyncio.run_coroutine_threadsafe in the TaskSet.run method instead of inside a specific task.

    • Cons: I dont think it will scale very well

We (Locust Technologies & core maintainers) will get to this eventually, but any help from the community would greatly speed this up and we're willing to assist if someone wants to give it a go themselves.

Right now I think the first approach is the most appealing, but the second one could work too. I'm completely fine with a super-slim feature set (wouldn't even need TaskSets in a first version, just a single method (User.task or something) could be simpler.

cyberw avatar Aug 04 '25 20:08 cyberw

Hello,

Another option on the table to establish a solid proof of concept is by using Niquests instead of Requests. It is a drop in replacement with both sync & async interfaces. This would eliminate the need to adapt from scratch basic thing that Locust does since age ago. I already proposed something at least a year ago, but then, it was not accepted (for reasonable reasons). Now things are changing for the better! (i) we are breaking the 1000th place on PyPI most downloaded packages out there (ii) we were featured on GitHub SOSS Fund (session 2). (iii) we support things that could be a game changer, like fine tuning performance metering and modern multiplexing on HTTP/2 and HTTP/3.

There is a chance we could generate the async interfaces automagically. Not sure yet. I am there if there is a need to discuss this one further on.

regards,

Ousret avatar Aug 18 '25 22:08 Ousret

@Ousret , that sounds really interesting, but I think the majority of work in this ticket would be implementing async support in Locust (async tasks, Users etc), not so much implementing the specific http library.

If you want to make an attempt along the lines of what I suggested (using Niquests instead of aiohttp which would be the other obvious candidate) that could be interesting.

cyberw avatar Aug 19 '25 13:08 cyberw

I agree, this specific issue is about async. But just so I understand correctly, it is okay to have an unified http client while providing support for a AsyncHttpUser? If so, I am okay starting laying the foundation of it, preserving all the features available in HttpUser.

regards,

Ousret avatar Aug 19 '25 18:08 Ousret

I'm not sure I understand you fully, but if you wanna do something like what I suggested (https://github.com/locustcloud/aiolocust), and implement a NiquestUser, AsyncHttpUser, call it what you will, that would be very interesting!

cyberw avatar Aug 20 '25 10:08 cyberw

Me neither,

Those are to be implemented in locust main package or to be shipped as an additional package, namely aiolocust? If it were to be included in locust main distribution, that would mean adding a dependency to it, but I am fairly confident that would be undesirable.

I have a slight doubt that async user is not wanted within locust itself?

Sorry for the questions, it's best that we are in phase before starting anything.

Ousret avatar Aug 20 '25 14:08 Ousret

Sorry for the questions, it's best that we are in phase before starting anything.

That's probably smart :)

I'm thinking a first version could be an async implementation/rewrite of WorkerRunner - that way we dont have to redo everything, and we could have a very reduced feature set.

The source code can be distributed with Locust in the same package, that's not an issue, and then any extra dependencies can be extras, like we did here for example: https://github.com/locustio/locust/pull/3168/files#diff-50c86b7ed8ac2cf95bd48334961bf0530cdc77b5a56f852c5c61b89d735fd711 (so you could do pip install locust[niquests] or whatnot)

This async version of locust would probably have to have its own entrypoint/script, like locust-async or something.

There will probably be some work redoing the master-worker signaling. Perhaps that piece of code can be made sync/async-agnostic and somehow shared https://github.com/locustio/locust/blob/master/locust/runners.py#L1333

cyberw avatar Aug 20 '25 18:08 cyberw