haystack icon indicating copy to clipboard operation
haystack copied to clipboard

Concurrency problems in Pipelines

Open yuanwu2017 opened this issue 2 years ago • 10 comments

Describe the bug We are running the EmbeddingRetriever and DPRetriever FAQ pipeline performance benchmark with concurrent client requests. But we found the exception"IndexError: list index out of range". I did some debug and found that several query threads enter the EmbeddingRetriever.retrieve() function at the same time. Is the self.embed_queries thread safety? Is it neccessary to add a lock here?

Error message applicationsaiappliedmlworkflowodqa-haystack-api-1 | [2022-08-24 03:46:21 +0000] [9] [ERROR] Exception in ASGI application applicationsaiappliedmlworkflowodqa-haystack-api-1 | Traceback (most recent call last): applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/pipelines/base.py", line 513, in run applicationsaiappliedmlworkflowodqa-haystack-api-1 | node_output, stream_id = self.graph.nodes[node_id]["component"]._dispatch_run(**node_input) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/base.py", line 169, in _dispatch_run applicationsaiappliedmlworkflowodqa-haystack-api-1 | return self._dispatch_run_general(self.run, **kwargs) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/base.py", line 213, in _dispatch_run_general applicationsaiappliedmlworkflowodqa-haystack-api-1 | output, stream = run_method(**run_inputs, **run_params) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 280, in run applicationsaiappliedmlworkflowodqa-haystack-api-1 | query=query, filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score, request_id=request_id applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 109, in wrapper applicationsaiappliedmlworkflowodqa-haystack-api-1 | ret = fn(*args, **kwargs) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 330, in run_query applicationsaiappliedmlworkflowodqa-haystack-api-1 | query=query, filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score, request_id=request_id applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/dense.py", line 1663, in retrieve applicationsaiappliedmlworkflowodqa-haystack-api-1 | query_emb=query_emb[0], filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score applicationsaiappliedmlworkflowodqa-haystack-api-1 | IndexError: list index out of range applicationsaiappliedmlworkflowodqa-haystack-api-1 | applicationsaiappliedmlworkflowodqa-haystack-api-1 | During handling of the above exception, another exception occurred: applicationsaiappliedmlworkflowodqa-haystack-api-1 | applicationsaiappliedmlworkflowodqa-haystack-api-1 | Traceback (most recent call last): applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/uvicorn/protocols/http/h11_impl.py", line 403, in run_asgi applicationsaiappliedmlworkflowodqa-haystack-api-1 | result = await app(self.scope, self.receive, self.send) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | return await self.app(scope, receive, send) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/fastapi/applications.py", line 269, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | await super().call(scope, receive, send) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/applications.py", line 124, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.middleware_stack(scope, receive, send) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/errors.py", line 184, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | raise exc applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/errors.py", line 162, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.app(scope, receive, _send) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/middleware/cors.py", line 84, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.app(scope, receive, send) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/exceptions.py", line 93, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | raise exc applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/exceptions.py", line 82, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.app(scope, receive, sender) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | raise e applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.app(scope, receive, send) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/routing.py", line 670, in call applicationsaiappliedmlworkflowodqa-haystack-api-1 | await route.handle(scope, receive, send) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/routing.py", line 266, in handle applicationsaiappliedmlworkflowodqa-haystack-api-1 | await self.app(scope, receive, send) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/routing.py", line 65, in app applicationsaiappliedmlworkflowodqa-haystack-api-1 | response = await func(request) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/fastapi/routing.py", line 228, in app applicationsaiappliedmlworkflowodqa-haystack-api-1 | dependant=dependant, values=values, is_coroutine=is_coroutine applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/fastapi/routing.py", line 162, in run_endpoint_function applicationsaiappliedmlworkflowodqa-haystack-api-1 | return await run_in_threadpool(dependant.call, **values) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/starlette/concurrency.py", line 41, in run_in_threadpool applicationsaiappliedmlworkflowodqa-haystack-api-1 | return await anyio.to_thread.run_sync(func, *args) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/anyio/to_thread.py", line 32, in run_sync applicationsaiappliedmlworkflowodqa-haystack-api-1 | func, *args, cancellable=cancellable, limiter=limiter applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread applicationsaiappliedmlworkflowodqa-haystack-api-1 | return await future applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/usr/local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 867, in run applicationsaiappliedmlworkflowodqa-haystack-api-1 | result = context.run(func, *args) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/rest_api/controller/search.py", line 72, in query applicationsaiappliedmlworkflowodqa-haystack-api-1 | result = _process_request(query_pipeline, request) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/telemetry.py", line 113, in wrapper applicationsaiappliedmlworkflowodqa-haystack-api-1 | return func(*args, **kwargs) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/rest_api/controller/search.py", line 139, in _process_request applicationsaiappliedmlworkflowodqa-haystack-api-1 | result = pipeline.run(query=request.query, params=params, debug=request.debug) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/pipelines/base.py", line 521, in run applicationsaiappliedmlworkflowodqa-haystack-api-1 | f"Exception while running node {node_id} with input {node_input}: {e}, full stack trace: {tb}" applicationsaiappliedmlworkflowodqa-haystack-api-1 | Exception: Exception while running node Retriever with input {'root_node': 'Query', 'params': {'filters': {}, 'Retriever': {'top_k': 1000, 'debug': False}, 'request_id': {'id': '271_1'}, 'Query': {}}, 'query': 'How to get current date in python?', 'node_id': 'Retriever'}: list index out of range, full stack trace: Traceback (most recent call last): applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/pipelines/base.py", line 513, in run applicationsaiappliedmlworkflowodqa-haystack-api-1 | node_output, stream_id = self.graph.nodes[node_id]["component"]._dispatch_run(**node_input) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/base.py", line 169, in _dispatch_run applicationsaiappliedmlworkflowodqa-haystack-api-1 | return self._dispatch_run_general(self.run, **kwargs) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/base.py", line 213, in _dispatch_run_general applicationsaiappliedmlworkflowodqa-haystack-api-1 | output, stream = run_method(**run_inputs, **run_params) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 280, in run applicationsaiappliedmlworkflowodqa-haystack-api-1 | query=query, filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score, request_id=request_id applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 109, in wrapper applicationsaiappliedmlworkflowodqa-haystack-api-1 | ret = fn(*args, **kwargs) applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/base.py", line 330, in run_query applicationsaiappliedmlworkflowodqa-haystack-api-1 | query=query, filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score, request_id=request_id applicationsaiappliedmlworkflowodqa-haystack-api-1 | File "/home/user/haystack/nodes/retriever/dense.py", line 1663, in retrieve applicationsaiappliedmlworkflowodqa-haystack-api-1 | query_emb=query_emb[0], filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score applicationsaiappliedmlworkflowodqa-haystack-api-1 | IndexError: list index out of range

Expected behavior no exception.

Additional context pipelines:

  1. ElasticsearchDocumentStore->EmbeddingRetriever(deepset/sentence_bert)->Docs2Answers
  2. FAISSDocumentStore->DPR→Docs2Answers

To Reproduce 0. Remove the 'txt' at the end of files name.

  1. copy pipelines.haystack-EmbeddingRetriever-pipeline.yml into rest_api/pipeline/.
  2. Modify the docker-compose.yml : PIPELINE_YAML_PATH=/home/user/rest_api/pipeline/pipelines.haystack-EmbeddingRetriever-pipeline.yml CONCURRENT_REQUEST_PER_WORKER=48 image: dingke1980/elasticsearch-stack-overflow:1.0
  3. docker-compose -f docker-compose.yml up --build
  4. copy the benchmark.py into haystack-ui-1 contiainers.
  5. Run the following command in the containers. python3 benchmark.py -p 10 -n 100 -c 1 -t 1000

FAQ Check

System:

  • OS: CentOS Linux release 8.4.2105
  • GPU/CPU:CPU
  • Haystack version (commit or version number): commit 0a4477d315d94acab4d08c44c1939389cdddd548 Author: bogdankostic [email protected] Date: Fri Jun 3 15:01:00 2022 +0200
  • DocumentStore: dockerhub: dingke1980/elasticsearch-stack-overflow:1.0
  • Reader: NO
  • Retriever:EmbeddingRetriever(deepset/sentence_bert) or DensePassageRetriever

pipelines.haystack-EmbeddingRetriever-pipeline.yml.txt benchmark.py.txt

yuanwu2017 avatar Aug 24 '22 04:08 yuanwu2017

Hi @yuanwu2017! Sorry for the late reply, we have been able to reproduce your issue and are investigating why we see this kind of behavior when doing concurrent requests.

bogdankostic avatar Aug 30 '22 12:08 bogdankostic

Quick update: I was able to reproduce with v1.8.0 and running the script from outside the container (this should simplify a bit the replication steps).

masci avatar Sep 01 '22 13:09 masci

We added a threading.RLock() in the retrieve funtion. It works, but we think it is only a workaround patch. Hope to have a better solution.

yuanwu2017 avatar Sep 09 '22 04:09 yuanwu2017

Thanks for the update @yuanwu2017 that helps. Unfortunately I didn't have the chance to get back at this in the last week but it's on my plate.

masci avatar Sep 09 '22 07:09 masci

@masci The REST API implements a RequestLimiter using semaphores, and the CONCURRENT_REQUEST_PER_WORKER environment variable defines the value used on the semaphore limit. But the issue comes when the pipelines are defined globally, and some underlying libraries don't support multithreading.

The solution would be to move to "multiprocessing". I think this can be fixed by increasing the number of workers (and limiting each worker to one request). Why? gunicorn spawn subprocesses, and the global variables are not shared.

UPDATE: I just found an old blog post regarding a similar approach: https://medium.com/huggingface/scaling-a-massive-state-of-the-art-deep-learning-model-in-production-8277c5652d5f. Here at Intelijus we are using gunicorn, single thread, multiple workers. This implementation will avoid the current REST API locking into this scenario.

UPDATE: There are issues on sentence-transformers repo that Nils discuss a similar subject; his advice is multiprocessing, as he adds that fast-tokenizer are not thread-safe. Being one solution, turn to a slow tokenizer.

  • https://github.com/UKPLab/sentence-transformers/issues/857#issuecomment-813868305
  • https://github.com/UKPLab/sentence-transformers/issues/794#issuecomment-791242063
  • https://github.com/UKPLab/sentence-transformers/issues/794#issuecomment-823487317
  • https://github.com/UKPLab/sentence-transformers/issues/794#issuecomment-985144497

danielbichuetti avatar Sep 10 '22 10:09 danielbichuetti

Thanks @danielbichuetti and @masci for your update. Increasing the number of workers does not solve this problem, because there is no way to control how many requests the requester initiates at the same time. The haystack-api server will be prone to return the error of "the server is busy processing requests". We prefer to use a large CONCURRENT_REQUEST_PER_WORKER and use a workaround patch of using RLock in retrieve function. In this way, we can flexibly increase the number of servers using K8S when it is found that the server processing requests is slow.

yuanwu2017 avatar Sep 11 '22 16:09 yuanwu2017

@yuanwu2017 In our scenario it worked because we are the ones making requests. We adopted gunicorn with --workers set to 10 and concurrent to one. This way you will have just one request per worker. We monitor the API endpoint, and when we detect a slow response we start another pod. Also when you receive the busy error, you can trigger a new pod launch. You can also trigger new pod launch when all gunicorn workers are busy (using statsD).

In my personal opinion, I don't like to set multi-threading because it will serve more requests but each one will be slower. Hugging Face and others have made similar option.

But, the issue is that Rust based tokenizer is not thread-safe. You will have to make the choice: thread locking mechanisms or multiprocessing.

danielbichuetti avatar Sep 11 '22 16:09 danielbichuetti

Thanks @danielbichuetti. We will try both options to see which one is more reasonable in our performance test.

yuanwu2017 avatar Sep 11 '22 16:09 yuanwu2017

@yuanwu2017 I was reading some of our projects codebase and I just found something that might be useful. But it would need some re-architecture of your project. Well, I will share it; it might be useful some day.

When the request is received in a FastAPI endpoint, we publish to Kafka a message and return a job id. Processing servers take the message, processes it and there is another endpoint that returns the response. This endpoint is not an AI/ML endpoint, it just reads the results and show to user. When the Kafka queues are too big, KEDA launch more replicas of processing endpoints. This is one way to implement it in a cost/effective way.

danielbichuetti avatar Sep 11 '22 20:09 danielbichuetti

I don't have anything to add to the solution here, but just wanted to say that I've had similar issues.

If I start the rest_api with uvicorn application:app I get issues with thread.lock taking up 90+% of the runtime. If I start it with gunicorn application:app -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker -t 300, I don't have this issue - presumably for the reasons stated by others above.

The problem is that I can't seem to use the --reload parameter (which exists in both uvicorn and gunicorn) when using gunicorn - this makes development slow and annoying as it doesn't hot reload on file changes. Additionally, I can't figure out how to use vscode debugging while using gunicorn, whereas it is easy with uvicorn.

Does anyone know a solution for this? If not, it would be great if the rest_api could handle this by default, as well as address whatever multiprocessing issues @danielbichuetti has noted above.

nickchomey avatar Sep 12 '22 21:09 nickchomey

I created PR #3709 that hopefully fixes this issue. The problem is that the EmbeddingRetriever uses the InferenceProcessor which defines an instance variable self.baskets. In the dataset_from_dicts method, we append the preprocessed queries to self.baskets, which seems to be shared among the threads.

bogdankostic avatar Dec 13 '22 13:12 bogdankostic

@yuanwu2017 #3709 is now merged, so you won't need your workaround anymore on main branch :)

bogdankostic avatar Dec 21 '22 17:12 bogdankostic