haystack
haystack copied to clipboard
Concurrency problems in Pipelines
Describe the bug We are running the EmbeddingRetriever and DPRetriever FAQ pipeline performance benchmark with concurrent client requests. But we found the exception"IndexError: list index out of range". I did some debug and found that several query threads enter the EmbeddingRetriever.retrieve() function at the same time. Is the self.embed_queries thread safety? Is it neccessary to add a lock here?
Error message
applicationsaiappliedmlworkflowodqa-haystack-api-1 | [2022-08-24 03:46:21 +0000] [9] [ERROR] Exception in ASGI application
applicationsaiappliedmlworkflowodqa-haystack-api-1 | Traceback (most recent call last):
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/pipelines/base.py", line 513, in run
applicationsaiappliedmlworkflowodqa-haystack-api-1 | node_output, stream_id = self.graph.nodes[node_id]["component"]._dispatch_run(**node_input)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/base.py", line 169, in _dispatch_run
applicationsaiappliedmlworkflowodqa-haystack-api-1 | return self._dispatch_run_general(self.run, **kwargs)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/base.py", line 213, in _dispatch_run_general
applicationsaiappliedmlworkflowodqa-haystack-api-1 | output, stream = run_method(**run_inputs, **run_params)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 280, in run
applicationsaiappliedmlworkflowodqa-haystack-api-1 | query=query, filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score, request_id=request_id
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 109, in wrapper
applicationsaiappliedmlworkflowodqa-haystack-api-1 | ret = fn(*args, **kwargs)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 330, in run_query
applicationsaiappliedmlworkflowodqa-haystack-api-1 | query=query, filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score, request_id=request_id
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/dense.py", line 1663, in retrieve
applicationsaiappliedmlworkflowodqa-haystack-api-1 | query_emb=query_emb[0], filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score
applicationsaiappliedmlworkflowodqa-haystack-api-1 | IndexError: list index out of range
applicationsaiappliedmlworkflowodqa-haystack-api-1 |
applicationsaiappliedmlworkflowodqa-haystack-api-1 | During handling of the above exception, another exception occurred:
applicationsaiappliedmlworkflowodqa-haystack-api-1 |
applicationsaiappliedmlworkflowodqa-haystack-api-1 | Traceback (most recent call last):
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/uvicorn/protocols/http/h11_impl.py", line 403, in run_asgi
applicationsaiappliedmlworkflowodqa-haystack-api-1 | result = await app(self.scope, self.receive, self.send)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | return await self.app(scope, receive, send)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/fastapi/applications.py", line 269, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | await super().call(scope, receive, send)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/applications.py", line 124, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.middleware_stack(scope, receive, send)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/errors.py", line 184, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | raise exc
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/errors.py", line 162, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.app(scope, receive, _send)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/cors.py", line 84, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.app(scope, receive, send)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/exceptions.py", line 93, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | raise exc
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/exceptions.py", line 82, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.app(scope, receive, sender)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | raise e
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.app(scope, receive, send)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/routing.py", line 670, in call
applicationsaiappliedmlworkflowodqa-haystack-api-1 | await route.handle(scope, receive, send)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/routing.py", line 266, in handle
applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.app(scope, receive, send)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/routing.py", line 65, in app
applicationsaiappliedmlworkflowodqa-haystack-api-1 | response = await func(request)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/fastapi/routing.py", line 228, in app
applicationsaiappliedmlworkflowodqa-haystack-api-1 | dependant=dependant, values=values, is_coroutine=is_coroutine
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/fastapi/routing.py", line 162, in run_endpoint_function
applicationsaiappliedmlworkflowodqa-haystack-api-1 | return await run_in_threadpool(dependant.call, **values)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/concurrency.py", line 41, in run_in_threadpool
applicationsaiappliedmlworkflowodqa-haystack-api-1 | return await anyio.to_thread.run_sync(func, *args)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/anyio/to_thread.py", line 32, in run_sync
applicationsaiappliedmlworkflowodqa-haystack-api-1 | func, *args, cancellable=cancellable, limiter=limiter
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
applicationsaiappliedmlworkflowodqa-haystack-api-1 | return await future
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 867, in run
applicationsaiappliedmlworkflowodqa-haystack-api-1 | result = context.run(func, *args)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/rest_api/controller/search.py", line 72, in query
applicationsaiappliedmlworkflowodqa-haystack-api-1 | result = _process_request(query_pipeline, request)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/telemetry.py", line 113, in wrapper
applicationsaiappliedmlworkflowodqa-haystack-api-1 | return func(*args, **kwargs)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/rest_api/controller/search.py", line 139, in _process_request
applicationsaiappliedmlworkflowodqa-haystack-api-1 | result = pipeline.run(query=request.query, params=params, debug=request.debug)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/pipelines/base.py", line 521, in run
applicationsaiappliedmlworkflowodqa-haystack-api-1 | f"Exception while running node {node_id}
with input {node_input}
: {e}, full stack trace: {tb}"
applicationsaiappliedmlworkflowodqa-haystack-api-1 | Exception: Exception while running node Retriever
with input {'root_node': 'Query', 'params': {'filters': {}, 'Retriever': {'top_k': 1000, 'debug': False}, 'request_id': {'id': '271_1'}, 'Query': {}}, 'query': 'How to get current date in python?', 'node_id': 'Retriever'}
: list index out of range, full stack trace: Traceback (most recent call last):
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/pipelines/base.py", line 513, in run
applicationsaiappliedmlworkflowodqa-haystack-api-1 | node_output, stream_id = self.graph.nodes[node_id]["component"]._dispatch_run(**node_input)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/base.py", line 169, in _dispatch_run
applicationsaiappliedmlworkflowodqa-haystack-api-1 | return self._dispatch_run_general(self.run, **kwargs)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/base.py", line 213, in _dispatch_run_general
applicationsaiappliedmlworkflowodqa-haystack-api-1 | output, stream = run_method(**run_inputs, **run_params)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 280, in run
applicationsaiappliedmlworkflowodqa-haystack-api-1 | query=query, filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score, request_id=request_id
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 109, in wrapper
applicationsaiappliedmlworkflowodqa-haystack-api-1 | ret = fn(*args, **kwargs)
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 330, in run_query
applicationsaiappliedmlworkflowodqa-haystack-api-1 | query=query, filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score, request_id=request_id
applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/dense.py", line 1663, in retrieve
applicationsaiappliedmlworkflowodqa-haystack-api-1 | query_emb=query_emb[0], filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score
applicationsaiappliedmlworkflowodqa-haystack-api-1 | IndexError: list index out of range
Expected behavior no exception.
Additional context pipelines:
- ElasticsearchDocumentStore->EmbeddingRetriever(deepset/sentence_bert)->Docs2Answers
- FAISSDocumentStore->DPR→Docs2Answers
To Reproduce 0. Remove the 'txt' at the end of files name.
- copy pipelines.haystack-EmbeddingRetriever-pipeline.yml into rest_api/pipeline/.
- Modify the docker-compose.yml : PIPELINE_YAML_PATH=/home/user/rest_api/pipeline/pipelines.haystack-EmbeddingRetriever-pipeline.yml CONCURRENT_REQUEST_PER_WORKER=48 image: dingke1980/elasticsearch-stack-overflow:1.0
- docker-compose -f docker-compose.yml up --build
- copy the benchmark.py into haystack-ui-1 contiainers.
- Run the following command in the containers. python3 benchmark.py -p 10 -n 100 -c 1 -t 1000
FAQ Check
- [Y] Have you had a look at our new FAQ page?
System:
- OS: CentOS Linux release 8.4.2105
- GPU/CPU:CPU
- Haystack version (commit or version number): commit 0a4477d315d94acab4d08c44c1939389cdddd548 Author: bogdankostic [email protected] Date: Fri Jun 3 15:01:00 2022 +0200
- DocumentStore: dockerhub: dingke1980/elasticsearch-stack-overflow:1.0
- Reader: NO
- Retriever:EmbeddingRetriever(deepset/sentence_bert) or DensePassageRetriever
pipelines.haystack-EmbeddingRetriever-pipeline.yml.txt benchmark.py.txt
Hi @yuanwu2017! Sorry for the late reply, we have been able to reproduce your issue and are investigating why we see this kind of behavior when doing concurrent requests.
Quick update: I was able to reproduce with v1.8.0 and running the script from outside the container (this should simplify a bit the replication steps).
We added a threading.RLock() in the retrieve funtion. It works, but we think it is only a workaround patch. Hope to have a better solution.
Thanks for the update @yuanwu2017 that helps. Unfortunately I didn't have the chance to get back at this in the last week but it's on my plate.
@masci The REST API implements a RequestLimiter using semaphores, and the CONCURRENT_REQUEST_PER_WORKER environment variable defines the value used on the semaphore limit. But the issue comes when the pipelines are defined globally, and some underlying libraries don't support multithreading.
The solution would be to move to "multiprocessing". I think this can be fixed by increasing the number of workers (and limiting each worker to one request). Why? gunicorn spawn subprocesses, and the global variables are not shared.
UPDATE: I just found an old blog post regarding a similar approach: https://medium.com/huggingface/scaling-a-massive-state-of-the-art-deep-learning-model-in-production-8277c5652d5f. Here at Intelijus we are using gunicorn, single thread, multiple workers. This implementation will avoid the current REST API locking into this scenario.
UPDATE: There are issues on sentence-transformers repo that Nils discuss a similar subject; his advice is multiprocessing, as he adds that fast-tokenizer are not thread-safe. Being one solution, turn to a slow tokenizer.
- https://github.com/UKPLab/sentence-transformers/issues/857#issuecomment-813868305
- https://github.com/UKPLab/sentence-transformers/issues/794#issuecomment-791242063
- https://github.com/UKPLab/sentence-transformers/issues/794#issuecomment-823487317
- https://github.com/UKPLab/sentence-transformers/issues/794#issuecomment-985144497
Thanks @danielbichuetti and @masci for your update. Increasing the number of workers does not solve this problem, because there is no way to control how many requests the requester initiates at the same time. The haystack-api server will be prone to return the error of "the server is busy processing requests". We prefer to use a large CONCURRENT_REQUEST_PER_WORKER and use a workaround patch of using RLock in retrieve function. In this way, we can flexibly increase the number of servers using K8S when it is found that the server processing requests is slow.
@yuanwu2017 In our scenario it worked because we are the ones making requests. We adopted gunicorn with --workers set to 10 and concurrent to one. This way you will have just one request per worker. We monitor the API endpoint, and when we detect a slow response we start another pod. Also when you receive the busy error, you can trigger a new pod launch. You can also trigger new pod launch when all gunicorn workers are busy (using statsD).
In my personal opinion, I don't like to set multi-threading because it will serve more requests but each one will be slower. Hugging Face and others have made similar option.
But, the issue is that Rust based tokenizer is not thread-safe. You will have to make the choice: thread locking mechanisms or multiprocessing.
Thanks @danielbichuetti. We will try both options to see which one is more reasonable in our performance test.
@yuanwu2017 I was reading some of our projects codebase and I just found something that might be useful. But it would need some re-architecture of your project. Well, I will share it; it might be useful some day.
When the request is received in a FastAPI endpoint, we publish to Kafka a message and return a job id. Processing servers take the message, processes it and there is another endpoint that returns the response. This endpoint is not an AI/ML endpoint, it just reads the results and show to user. When the Kafka queues are too big, KEDA launch more replicas of processing endpoints. This is one way to implement it in a cost/effective way.
I don't have anything to add to the solution here, but just wanted to say that I've had similar issues.
If I start the rest_api with uvicorn application:app
I get issues with thread.lock taking up 90+% of the runtime. If I start it with gunicorn application:app -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker -t 300
, I don't have this issue - presumably for the reasons stated by others above.
The problem is that I can't seem to use the --reload
parameter (which exists in both uvicorn and gunicorn) when using gunicorn - this makes development slow and annoying as it doesn't hot reload on file changes. Additionally, I can't figure out how to use vscode debugging while using gunicorn, whereas it is easy with uvicorn.
Does anyone know a solution for this? If not, it would be great if the rest_api could handle this by default, as well as address whatever multiprocessing issues @danielbichuetti has noted above.
I created PR #3709 that hopefully fixes this issue.
The problem is that the EmbeddingRetriever uses the InferenceProcessor
which defines an instance variable self.baskets
. In the dataset_from_dicts
method, we append the preprocessed queries to self.baskets
, which seems to be shared among the threads.
@yuanwu2017 #3709 is now merged, so you won't need your workaround anymore on main branch :)