Asyncio support
Follow up of #924 and #1079.
Is your feature request related to a problem? Please describe.
Sometimes one needs to simulate an asynchronous user behavior. Hatching more users does not solve the problem.
Describe the solution you'd like
Support defining async tasks or something similar.
Describe alternatives you've considered
One could also write a custom client. Maybe this would solve the problem. It would be great to collect examples or snippets on how to do so in this issue, and add them in https://docs.locust.io/en/stable/testing-other-systems.html or another doc page.
You can achieve this by spawning greenlets within your locust tasks. Here's a small example:
from gevent.pool import Pool
from locust import HttpLocust, TaskSet, task, constant
class MyLocust(HttpLocust):
host = "https://docs.locust.io"
wait_time = constant(5)
class task_set(TaskSet):
@task
def dual_greenlet_task(self):
def do_http_request_or_whatever():
print("yay, running in separate greenlet")
response = self.client.get("/")
print("status code:", response.status_code)
pool = Pool()
pool.spawn(do_http_request_or_whatever)
pool.spawn(do_http_request_or_whatever)
pool.join()
Locust is heavily reliant on Gevent, and as far as I know gevent and python async are not 100% compatible. Therefore I don't see locust supporting python async any time soon. There's been discussion about it in the gevent project: https://github.com/gevent/gevent/issues/982
Alright thanks for the explanation, example and link 🙂
Is there any reasons to not switch to asyncio completely?
The main reason is that it would be a ton of work to rewrite everything. Apart from that I think it is a great idea :)
You can achieve this by spawning greenlets within your locust tasks. Here's a small example:
from gevent.pool import Pool from locust import HttpLocust, TaskSet, task, constant class MyLocust(HttpLocust): host = "https://docs.locust.io" wait_time = constant(5) class task_set(TaskSet): @task def dual_greenlet_task(self): def do_http_request_or_whatever(): print("yay, running in separate greenlet") response = self.client.get("/") print("status code:", response.status_code) pool = Pool() pool.spawn(do_http_request_or_whatever) pool.spawn(do_http_request_or_whatever) pool.join()Locust is heavily reliant on Gevent, and as far as I know gevent and python async are not 100% compatible. Therefore I don't see locust supporting python
asyncany time soon. There's been discussion about it in the gevent project: gevent/gevent#982
AFAIK using different threads to make many parallel requests with requests.Session is not good idea because requests.Session is not thread safe (https://github.com/psf/requests/issues/1871). You need to use other client to do this
AFAIK using different threads to make many parallel requests with requests.Session is not good idea because requests.Session is not thread safe (psf/requests#1871). You need to use other client to do this
From the description of that issue it sounds like it should only be a problem if you're using the same client to connect to many different hosts. In that case one could create another HttpSession instance.
@heyman in my case it happens when I'm executing many requests on the same host (as in your example).
@heyman sorry about confusion, looks like it was my mistake. Everything is ok with requests.Session
Do we support Asyncio in locust task now?
hey guys, maintainers, theoretically, do you see any potential issues with using asgiref project to execute async functions as sync ones ?
Havent tried it, would love to see a proof of concept!
Most asyncio wrappers have issues coexisting with gevent though.. but if it works then it would be awesome!
gevent now has an asyncio version, so that should make integration easier: https://github.com/gfmio/asyncio-gevent
gevent now has an asyncio version, so that should make integration easier: https://github.com/gfmio/asyncio-gevent
Unfortunately that repo hasnt been updated in two years, so I'm not sure if it works any more.
I think it would be doable to implement an asyncio version of the locust worker, but it'll take some work.
Is there a more concrete example of calling ayncio coroutine inside a Locust gevent task? Sorry but it's not immediately obvious after reading the code examples above (where I can't find where asyncio coroutines are called...)
Asyncio is not supported right now.
We're planning to build a completely asyncio-based worker implementation, but unless theres a simple way to allow gevent and asyncio to coexist (like asyncio-gevent seemed to promise before it went silent) it might be a while.
In the event that someone wants to take it upon themselves to reimplement the worker on asyncio themselves, it would be most welcome :)
Thanks @cyberw for your prompt reply. After days of struggle, I finally found a way to make asyncio work in Locust. Here is a working example. The basic idea is to create a dedicated thread with the asyncio event loop in each process (since Locust can start multiple workers on multiple CPU cores that run in separate processes), and define a lambda function that takes a coroutine and its parameters as the argument, and dynamically run it in the event loop in the process that it belongs.
import asyncio
import logging
import gevent
import os
import threading
import time
from locust import User, task, between, events
import logging
import threading
from typing import Any, Callable, Awaitable
# Set the logging level for the root logger to WARNING to suppress INFO messages
logging.getLogger("root").setLevel(logging.WARNING)
def thread_func(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any:
"""Run asyncio coroutine in the current event loop to make gevent work."""
logging.debug(f'In thread_func()')
try:
assert loop.is_running(), "Asyncio event loop is not running!"
future = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop)
event = gevent.event.Event()
future.add_done_callback(lambda _: event.set())
event.wait()
return future.result(timeout=3)
except TimeoutError as te:
logging.exception(f'TimeoutError: {te}')
future.cancel()
raise te
except RuntimeError as rte:
logging.exception(f'RuntimeError: {rte}')
raise rte
except Exception as e:
logging.exception(f'Other Exception: {e}')
raise e
def run_asyncio(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any:
logging.debug(f'In run_asyncio()')
return thread_func(loop, coro, *args, **kwargs)
async def async_foo(param1, param2):
"""asyncio coroutine function to be tested"""
...
class AsyncioInLocustTest(User):
shortest_secs, longest_secs = 0.1, 1
wait_time = between(shortest_secs, longest_secs)
# Class-level variables to track process initialization
shared_loop = None
shared_thread = None
initialized_pid = None # Store the PID of the process where initialization has occurred
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Detect newly forked processes
current_pid = os.getpid()
if OSSLoadTest.initialized_pid != current_pid:
# Run initialization for the current process
self.init_process_resources(current_pid)
# Use the shared event loop and thread
self.loop = OSSLoadTest.shared_loop
@classmethod
def init_process_resources(cls, current_pid):
"""Initialize resources once for each new process."""
cls.initialized_pid = current_pid # Mark the process as initialized
print(f"Initializing resources for process PID: {current_pid}")
# Create a shared asyncio event loop and thread for this process
cls.shared_loop = asyncio.new_event_loop()
cls.shared_thread = threading.Thread(target=cls.shared_loop.run_forever, daemon=True)
cls.shared_thread.start()
@task
def asyncio_test(self):
start_time = time.time()
try:
run_asyncio(self.loop, async_foo, foo_param1, foo_param2)
# Fire success event
events.request.fire(
request_type="foo",
name="bar",
response_time=int((time.time() - start_time) * 1000),
response_length=0,
exception=None
)
except Exception as e:
# Fire failure event
events.request.fire(
request_type="foo",
name="bar",
response_time=int((time.time() - start_time) * 1000),
response_length=0,
exception=e
)
raise e
This issue is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 20 days.
This issue was closed because it has been marked stale for 20 days with no activity. This does not necessarily mean that the issue is bad, but it most likely means that nobody is willing to take the time to fix it. If you have found Locust useful, then consider contributing a fix yourself!
Thanks @cyberw for your prompt reply. After days of struggle, I finally found a way to make asyncio work in Locust. Here is a working example. The basic idea is to create a dedicated thread with the asyncio event loop in each process (since Locust can start multiple workers on multiple CPU cores that run in separate processes), and define a lambda function that takes a coroutine and its parameters as the argument, and dynamically run it in the event loop in the process that it belongs.
import asyncio import logging import gevent import os import threading import time from locust import User, task, between, events import logging import threading from typing import Any, Callable, Awaitable # Set the logging level for the root logger to WARNING to suppress INFO messages logging.getLogger("root").setLevel(logging.WARNING) def thread_func(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any: """Run asyncio coroutine in the current event loop to make gevent work.""" logging.debug(f'In thread_func()') try: assert loop.is_running(), "Asyncio event loop is not running!" future = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop) event = gevent.event.Event() future.add_done_callback(lambda _: event.set()) event.wait() return future.result(timeout=3) except TimeoutError as te: logging.exception(f'TimeoutError: {te}') future.cancel() raise te except RuntimeError as rte: logging.exception(f'RuntimeError: {rte}') raise rte except Exception as e: logging.exception(f'Other Exception: {e}') raise e def run_asyncio(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any: logging.debug(f'In run_asyncio()') return thread_func(loop, coro, *args, **kwargs) async def async_foo(param1, param2): """asyncio coroutine function to be tested""" ... class AsyncioInLocustTest(User): shortest_secs, longest_secs = 0.1, 1 wait_time = between(shortest_secs, longest_secs) # Class-level variables to track process initialization shared_loop = None shared_thread = None initialized_pid = None # Store the PID of the process where initialization has occurred def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Detect newly forked processes current_pid = os.getpid() if OSSLoadTest.initialized_pid != current_pid: # Run initialization for the current process self.init_process_resources(current_pid) # Use the shared event loop and thread self.loop = OSSLoadTest.shared_loop @classmethod def init_process_resources(cls, current_pid): """Initialize resources once for each new process.""" cls.initialized_pid = current_pid # Mark the process as initialized print(f"Initializing resources for process PID: {current_pid}") # Create a shared asyncio event loop and thread for this process cls.shared_loop = asyncio.new_event_loop() cls.shared_thread = threading.Thread(target=cls.shared_loop.run_forever, daemon=True) cls.shared_thread.start() @task def asyncio_test(self): start_time = time.time() try: run_asyncio(self.loop, async_foo, foo_param1, foo_param2) # Fire success event events.request.fire( request_type="foo", name="bar", response_time=int((time.time() - start_time) * 1000), response_length=0, exception=None ) except Exception as e: # Fire failure event events.request.fire( request_type="foo", name="bar", response_time=int((time.time() - start_time) * 1000), response_length=0, exception=e ) raise e
great, i will give it a try
This issue is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 20 days.
Hi, is there any progress?
Hi @cyberw, I understand this is a complex task. Could you share if the Locust team has any plans or estimated timelines for addressing this issue? Also, please let us know if there are ways the open-source community could contribute to help move this forward.
Thanks for your work on this!
Hi Sorry for the lack of response @GeraldWhitehead and @deepwzh . We did actually make an internal PoC before we got swamped doing all sorts of things for the SaaS/cloud solution.
I would love for people to contribute. I see a couple of options:
-
Completely new Runner (which is kind of what I did in the PoC here: https://github.com/locustcloud/aiolocust)
- Pros: pure async in the runner (so minimal risk for performance issues, clear paradigm), simple implementation
- Cons: Requires re-implementing a lot of stuff (probably all the way up to changes in main), probably would mean a very reduced feature set.
-
Run asyncio on gevent using something like https://github.com/gfmio/asyncio-gevent
- Pros: Simple approach, everything outside the Runner, or maybe even the User can continue to use gevent
- Cons: asyncio-gevent isn't super well-maintained (it came back to life just recently after a 2.5 year hiatus), mixed paradigm, could have sneaky performance issues
-
I dont think jiqiujia's/thx123's approaches would work, but if it can be improved beyond a hack, then maybe... At the very least it would have to do asyncio.run_coroutine_threadsafe in the TaskSet.run method instead of inside a specific task.
- Cons: I dont think it will scale very well
We (Locust Technologies & core maintainers) will get to this eventually, but any help from the community would greatly speed this up and we're willing to assist if someone wants to give it a go themselves.
Right now I think the first approach is the most appealing, but the second one could work too. I'm completely fine with a super-slim feature set (wouldn't even need TaskSets in a first version, just a single method (User.task or something) could be simpler.
Hello,
Another option on the table to establish a solid proof of concept is by using Niquests instead of Requests. It is a drop in replacement with both sync & async interfaces. This would eliminate the need to adapt from scratch basic thing that Locust does since age ago. I already proposed something at least a year ago, but then, it was not accepted (for reasonable reasons). Now things are changing for the better! (i) we are breaking the 1000th place on PyPI most downloaded packages out there (ii) we were featured on GitHub SOSS Fund (session 2). (iii) we support things that could be a game changer, like fine tuning performance metering and modern multiplexing on HTTP/2 and HTTP/3.
There is a chance we could generate the async interfaces automagically. Not sure yet. I am there if there is a need to discuss this one further on.
regards,
@Ousret , that sounds really interesting, but I think the majority of work in this ticket would be implementing async support in Locust (async tasks, Users etc), not so much implementing the specific http library.
If you want to make an attempt along the lines of what I suggested (using Niquests instead of aiohttp which would be the other obvious candidate) that could be interesting.
I agree, this specific issue is about async. But just so I understand correctly, it is okay to have an unified http client while providing support for a AsyncHttpUser?
If so, I am okay starting laying the foundation of it, preserving all the features available in HttpUser.
regards,
I'm not sure I understand you fully, but if you wanna do something like what I suggested (https://github.com/locustcloud/aiolocust), and implement a NiquestUser, AsyncHttpUser, call it what you will, that would be very interesting!
Me neither,
Those are to be implemented in locust main package or to be shipped as an additional package, namely aiolocust? If it were to be included in locust main distribution, that would mean adding a dependency to it, but I am fairly confident that would be undesirable.
I have a slight doubt that async user is not wanted within locust itself?
Sorry for the questions, it's best that we are in phase before starting anything.
Sorry for the questions, it's best that we are in phase before starting anything.
That's probably smart :)
I'm thinking a first version could be an async implementation/rewrite of WorkerRunner - that way we dont have to redo everything, and we could have a very reduced feature set.
The source code can be distributed with Locust in the same package, that's not an issue, and then any extra dependencies can be extras, like we did here for example: https://github.com/locustio/locust/pull/3168/files#diff-50c86b7ed8ac2cf95bd48334961bf0530cdc77b5a56f852c5c61b89d735fd711 (so you could do pip install locust[niquests] or whatnot)
This async version of locust would probably have to have its own entrypoint/script, like locust-async or something.
There will probably be some work redoing the master-worker signaling. Perhaps that piece of code can be made sync/async-agnostic and somehow shared https://github.com/locustio/locust/blob/master/locust/runners.py#L1333