can't pass an object parameter to pipeline.run()
Describe the bug
I implemented a FastAPITokenStreamingHandler object in the RESTAPI layer following the example mentioned in this PR and pass it as a parameter to pipeline.run(). The only difference to that example is that now I am using a pipeline instead of promptnode. My code is like this:
params['stream_handler'] = FastAPITokenStreamingHandler(g)
pipeline.run(query=prompt, params={"agent_node": params})
But I then got the following error message: Error message
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/haystack/pipelines/base.py", line 556, in run
node_output, stream_id = self._run_node(node_id, node_input)
File "/opt/conda/lib/python3.10/site-packages/haystack/pipelines/base.py", line 469, in _run_node
return self.graph.nodes[node_id]["component"]._dispatch_run(**node_input)
File "/opt/conda/lib/python3.10/site-packages/haystack/nodes/base.py", line 201, in _dispatch_run
return self._dispatch_run_general(self.run, **kwargs)
File "/opt/conda/lib/python3.10/site-packages/haystack/nodes/base.py", line 219, in _dispatch_run_general
arguments = deepcopy(kwargs)
File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/lib/python3.10/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/opt/conda/lib/python3.10/copy.py", line 271, in _reconstruct
state = deepcopy(state, memo)
File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/lib/python3.10/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/opt/conda/lib/python3.10/copy.py", line 271, in _reconstruct
state = deepcopy(state, memo)
File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/lib/python3.10/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/opt/conda/lib/python3.10/copy.py", line 271, in _reconstruct
state = deepcopy(state, memo)
File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/lib/python3.10/copy.py", line 161, in deepcopy
rv = reductor(4)
TypeError: cannot pickle '_thread.lock' object
Expected behavior
After debugging, I found that the problem lies in this line of code. Is there any particular reason that we use deep copy instead of shallow copy here? If no, I would suggest using shallow copy because the deep copy prevents passing an object to the underlying pipeline.run()
FAQ Check
- [ x ] Have you had a look at our new FAQ page?
System:
- OS:
- GPU/CPU:
- Haystack version (commit or version number):
- DocumentStore:
- Reader:
- Retriever:
@faaany Thank you for the detailed description of this issue. I see that you already opened a pull request to fix it, which is great! 🤩 @ZanSara will review your pull request.
Hey @faaany sorry for the late reply. I've seen your PR and I have to say I'm really torn on that... I believe there are reasons why we deepcopy kwargs, but the Pipeline code is so intricate I have a hard time proving whether it's necessary or not without investing a lot of time.
So, before I go into a long chase to understand whether your change is safe or not, let's understand how blocking it is.
Is it really necessary to pass the streaming handler as a parameter of the pipeline? What's the difference between this approach and giving such handler to PromptNode directly?
@ZanSara
for my usage scenario, this is necessary, because my pipeline is deployed using a pipeline YAML with REST API. If my front-end application doesn't send any requests to the backend, the pipeline won't run. So I need to pass the streaming handler param during runtime in order to get a streamed response.
If I give such a handler to PromptNode directly, I will see the streamed output on my backend machine, but not in the front-end.
I saw that many users in Discord asked for this feature. I think they also have similar problems as I do.
Hey @faaany sorry for the late reply. I've seen your PR and I have to say I'm really torn on that... I believe there are reasons why we deepcopy kwargs, but the Pipeline code is so intricate I have a hard time proving whether it's necessary or not without investing a lot of time.
So, before I go into a long chase to understand whether your change is safe or not, let's understand how blocking it is.
Is it really necessary to pass the streaming handler as a parameter of the pipeline? What's the difference between this approach and giving such handler to PromptNode directly?
And yes, it is a bit risky to replace deep copy with shallow copy. How about this approach: we can explicitly drop the stream_handler param from the kwargs and after the deep copy, we add it back to the parameter list?
@vblagoje maybe you have more insights on this issue?
Hey @faaany sorry for the late reply :see_no_evil:
for my usage scenario, this is necessary, because my pipeline is deployed using a pipeline YAML with REST API. If my front-end application doesn't send any requests to the backend, the pipeline won't run. So I need to pass the streaming handler param during runtime in order to get a streamed response.
I'm not sure I get this. How do you pass a streaming handler over HTTP? :eyes: Maybe I didn't understand your explanation. Feel free to share a code example or a colab if that helps!
@faaany I'm thinking about an alternative solution to your problem that wouldn't require this change. I'm not sure whether it would work though.
I'm taking into consideration #5697 too.
My idea would be to create a stream handler similar to FastAPITokenStreamingHandler, but it creates its own thread generator that is publicly accessible.
Then we'd change _process_request_streaming to find the streaming handler being used and if it has a thread generator use it to read the tokens as they are generated. You're already checking if the last node is a PromptNode so given that information we can go through it and find the invocation layer streamer.
It should be accessible with a similar logic:
prompt_node = PromptNode()
prompt_node.prompt_model.model_invocation_layer.model_input_kwargs["stream_handler"]
Not elegant but it should do the job.
I didn't test it but I believe that conceptually it should work, even if a bit convoluted.
I very much prefer if we don't touch the Pipeline.run() method as it could create a cascade of issues that I really wouldn't want to handle right now. 😅
@silvanocerza thanks for the idea! I will give it a try and let you know how it goes.
Hey @faaany, any news on this? I'm curious to know if you managed to make it work. 👀
Hey @faaany, any news on this? I'm curious to know if you managed to make it work. 👀
Hi @silvanocerza, sorry for my late response! We had long national holidays here in China and I also took some additional days off... I was not able to set this up before my holidays, but I will look at it this week and get back to you soon. Thank you for answering!