Web UI not working when multiprocessing queue has > 320 in size
Prerequisites
- [x] I am using the latest version of Locust
- [x] I am reporting a bug, not asking a question
Description
I tried to do load testing with Locust for online incremental machine learning in large scale system. Therefore, the input for the system can't be random and I have prepared a dataset to be the input. The dataset should be loaded before the load testing begins. I put the dataset at the start of the file so it will be a global variable. And I use queue because it is easy to use one list for many concurrent virtual user.
So, I do this to my code:
from multiprocessing import Queue
import pandas as pd
dataset_dir = "/root/Datasets/csv/"
file_name = "AGR_a_testing.csv"
file_path = dataset_dir + file_name
df = pd.read_csv(file_path, sep=',', header=0)
df_queue = Queue()
i = 0
for record in df.to_dict('records'):
df_queue.put(record)
i += 1
if i == 320:
break
class MLPredictionUser(HttpUser):
@task
def make_prediction_request(self):
while not df_queue.empty():
try:
X = df_queue.get()
# and so on...
What makes it interesting is that, if I put it if i == 320 then break, then the web UI will work fine
But if I put if i == 321 then break, then the web UI won't work
Please help me solve this problem.
Sincerely,
Adil
Command line
locust -f locust-test.py --web-host=0.0.0.0 --web-port=8089
Locustfile contents
from multiprocessing import Queue
import pandas as pd
dataset_dir = "/root/Datasets/csv/"
file_name = "AGR_a_testing.csv"
file_path = dataset_dir + file_name
df = pd.read_csv(file_path, sep=',', header=0)
df_queue = Queue()
i = 0
for record in df.to_dict('records'):
df_queue.put(record)
i += 1
if i == 320:
break
class MLPredictionUser(HttpUser):
@task
def make_prediction_request(self):
while not df_queue.empty():
try:
X = df_queue.get()
# and so on...
Python version
3.12.3
Locust version
locust==2.37.0 locust-cloud==1.21.2
Operating system
Linux Ubuntu WSL 24.04
So, I changed the line df_queue = Queue() into
import multiprocessing
manager = multiprocessing.Manager()
df_queue = manager.Queue()
And this error happens now
Traceback (most recent call last):
File "/root/.venv/bin/locust", line 8, in <module>
sys.exit(main())
^^^^^^
File "/root/.venv/lib/python3.12/site-packages/locust/main.py", line 167, in main
) = merge_locustfiles_content(locustfiles)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/.venv/lib/python3.12/site-packages/locust/main.py", line 123, in merge_locustfiles_content
user_classes, shape_classes = load_locustfile(_locustfile)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/.venv/lib/python3.12/site-packages/locust/util/load_locustfile.py", line 70, in load_locustfile
loader.exec_module(imported)
File "<frozen importlib._bootstrap_external>", line 995, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "/root/MLOps-Architecture/Serialization_Datasets/locust-test.py", line 34, in <module>
df_queue = manager.Queue()
^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/multiprocessing/managers.py", line 726, in temp
token, exp = self._create(typeid, *args, **kwds)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/multiprocessing/managers.py", line 606, in _create
conn = self._Client(self._address, authkey=self._authkey)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/multiprocessing/connection.py", line 525, in Client
answer_challenge(c, authkey)
...
File "/usr/lib/python3.12/multiprocessing/connection.py", line 395, in _recv
chunk = read(handle, remaining)
^^^^^^^^^^^^^^^^^^^^^^^
BlockingIOError: [Errno 11] Resource temporarily unavailable
Can you please help me solve this problem?
Hi! These are both interesting issues, but I think they are more related to your specific use case in combination with gevent and not so much to Locust specifically. I don't have much experience with pandas.
The first issue you're having might be Locust related. Can you try moving the initialization code from module level into the init event?
You probably want to use a gevent.queue.Queue rather than a multiprocessing.Queue
https://www.gevent.org/api/gevent.queue.html
Thank you so much @cyberw and @cgoldberg for the suggestion. Though, after I asked Claude 3.7 Sonnet, I found a workaround for my problem. Simply by using the threading.RLock(). Yes, in my case, it isn't a distributed load testing, just a simple load testing from one machine. Though, Claude said that multiprocessing.Manager().Queue() is more suitable for distributed load testing. So far, I have found the solution. Please just close this issue if you have to.
Sincerely, Adil
My workaround solution
class SequentialDatasetReader:
"""
Thread-safe sequential dataset reader for concurrent virtual users.
Ensures each record is processed exactly once in original sequence.
"""
def __init__(self, dataset_path, cycle=True):
"""
Initialize the dataset reader.
Args:
dataset_path: Path to the CSV dataset file
cycle: If True, restart from beginning when dataset is exhausted
"""
self.dataset_path = dataset_path
self.cycle = cycle
self.lock = threading.RLock() # Reentrant lock for thread safety
self.data = []
self.current_index = 0
self._load_data()
def _load_data(self):
"""Load all dataset records into memory."""
with open(self.dataset_path, 'r') as file:
reader = csv.DictReader(file)
self.data = list(reader)
if not self.data:
raise ValueError("Dataset is empty")
def get_next_record(self):
"""
Thread-safely get the next record from the dataset.
Returns:
The next record as a dictionary, or None if dataset is exhausted and cycle=False
"""
with self.lock:
# Check if we've reached the end
if self.current_index >= len(self.data):
if not self.cycle:
return None
# Reset to beginning if cycling
self.current_index = 0
# Get the next record and increment the counter
record = self.data[self.current_index]
self.current_index += 1
return record
This issue is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 20 days.