haystack icon indicating copy to clipboard operation
haystack copied to clipboard

can't pass an object parameter to pipeline.run()

Open faaany opened this issue 2 years ago • 10 comments

Describe the bug I implemented a FastAPITokenStreamingHandler object in the RESTAPI layer following the example mentioned in this PR and pass it as a parameter to pipeline.run(). The only difference to that example is that now I am using a pipeline instead of promptnode. My code is like this:

params['stream_handler'] = FastAPITokenStreamingHandler(g)
pipeline.run(query=prompt, params={"agent_node": params})

But I then got the following error message: Error message

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/haystack/pipelines/base.py", line 556, in run
    node_output, stream_id = self._run_node(node_id, node_input)
  File "/opt/conda/lib/python3.10/site-packages/haystack/pipelines/base.py", line 469, in _run_node
    return self.graph.nodes[node_id]["component"]._dispatch_run(**node_input)
  File "/opt/conda/lib/python3.10/site-packages/haystack/nodes/base.py", line 201, in _dispatch_run
    return self._dispatch_run_general(self.run, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/haystack/nodes/base.py", line 219, in _dispatch_run_general
    arguments = deepcopy(kwargs)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/opt/conda/lib/python3.10/copy.py", line 271, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/opt/conda/lib/python3.10/copy.py", line 271, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/opt/conda/lib/python3.10/copy.py", line 271, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 161, in deepcopy
    rv = reductor(4)
TypeError: cannot pickle '_thread.lock' object

Expected behavior After debugging, I found that the problem lies in this line of code. Is there any particular reason that we use deep copy instead of shallow copy here? If no, I would suggest using shallow copy because the deep copy prevents passing an object to the underlying pipeline.run()

FAQ Check

System:

  • OS:
  • GPU/CPU:
  • Haystack version (commit or version number):
  • DocumentStore:
  • Reader:
  • Retriever:

faaany avatar Aug 28 '23 09:08 faaany

@faaany Thank you for the detailed description of this issue. I see that you already opened a pull request to fix it, which is great! 🤩 @ZanSara will review your pull request.

julian-risch avatar Aug 31 '23 16:08 julian-risch

Hey @faaany sorry for the late reply. I've seen your PR and I have to say I'm really torn on that... I believe there are reasons why we deepcopy kwargs, but the Pipeline code is so intricate I have a hard time proving whether it's necessary or not without investing a lot of time.

So, before I go into a long chase to understand whether your change is safe or not, let's understand how blocking it is.

Is it really necessary to pass the streaming handler as a parameter of the pipeline? What's the difference between this approach and giving such handler to PromptNode directly?

ZanSara avatar Sep 04 '23 10:09 ZanSara

@ZanSara

for my usage scenario, this is necessary, because my pipeline is deployed using a pipeline YAML with REST API. If my front-end application doesn't send any requests to the backend, the pipeline won't run. So I need to pass the streaming handler param during runtime in order to get a streamed response.

If I give such a handler to PromptNode directly, I will see the streamed output on my backend machine, but not in the front-end.

I saw that many users in Discord asked for this feature. I think they also have similar problems as I do.

faaany avatar Sep 06 '23 09:09 faaany

Hey @faaany sorry for the late reply. I've seen your PR and I have to say I'm really torn on that... I believe there are reasons why we deepcopy kwargs, but the Pipeline code is so intricate I have a hard time proving whether it's necessary or not without investing a lot of time.

So, before I go into a long chase to understand whether your change is safe or not, let's understand how blocking it is.

Is it really necessary to pass the streaming handler as a parameter of the pipeline? What's the difference between this approach and giving such handler to PromptNode directly?

And yes, it is a bit risky to replace deep copy with shallow copy. How about this approach: we can explicitly drop the stream_handler param from the kwargs and after the deep copy, we add it back to the parameter list?

faaany avatar Sep 06 '23 09:09 faaany

@vblagoje maybe you have more insights on this issue?

faaany avatar Sep 11 '23 10:09 faaany

Hey @faaany sorry for the late reply :see_no_evil:

for my usage scenario, this is necessary, because my pipeline is deployed using a pipeline YAML with REST API. If my front-end application doesn't send any requests to the backend, the pipeline won't run. So I need to pass the streaming handler param during runtime in order to get a streamed response.

I'm not sure I get this. How do you pass a streaming handler over HTTP? :eyes: Maybe I didn't understand your explanation. Feel free to share a code example or a colab if that helps!

ZanSara avatar Sep 12 '23 16:09 ZanSara

@faaany I'm thinking about an alternative solution to your problem that wouldn't require this change. I'm not sure whether it would work though.

I'm taking into consideration #5697 too.

My idea would be to create a stream handler similar to FastAPITokenStreamingHandler, but it creates its own thread generator that is publicly accessible.

Then we'd change _process_request_streaming to find the streaming handler being used and if it has a thread generator use it to read the tokens as they are generated. You're already checking if the last node is a PromptNode so given that information we can go through it and find the invocation layer streamer.

It should be accessible with a similar logic:

prompt_node = PromptNode()
prompt_node.prompt_model.model_invocation_layer.model_input_kwargs["stream_handler"]

Not elegant but it should do the job.

I didn't test it but I believe that conceptually it should work, even if a bit convoluted.

I very much prefer if we don't touch the Pipeline.run() method as it could create a cascade of issues that I really wouldn't want to handle right now. 😅

silvanocerza avatar Sep 14 '23 10:09 silvanocerza

@silvanocerza thanks for the idea! I will give it a try and let you know how it goes.

faaany avatar Sep 14 '23 12:09 faaany

Hey @faaany, any news on this? I'm curious to know if you managed to make it work. 👀

silvanocerza avatar Sep 28 '23 16:09 silvanocerza

Hey @faaany, any news on this? I'm curious to know if you managed to make it work. 👀

Hi @silvanocerza, sorry for my late response! We had long national holidays here in China and I also took some additional days off... I was not able to set this up before my holidays, but I will look at it this week and get back to you soon. Thank you for answering!

faaany avatar Oct 16 '23 07:10 faaany