dify icon indicating copy to clipboard operation
dify copied to clipboard

feat: add in-memory repo to improve workflow performance

Open xinlmain opened this issue 6 months ago • 6 comments

Summary

Addresses #20147 See also #10982, See also https://github.com/langgenius/dify/pull/19325

The default workflow-run api is very slow on simple tasks, making it unacceptable for real time interactions. The api calls spend most of the time interacting with DB. In a scenario where the api is called by another service, there is not much sense to save each call into DB. image

So here I added a fast mode for workflow api, it can be turned on by setting WORKFLOW_NODE_EXECUTION_REPO_MODE=memory. This enables deploying a dedicated api service for external api calls, with the tradeoff of not able to see execution detail in application logs from the web page.

Checklist

[!IMPORTANT]
Please review the checklist below before submitting your pull request.

  • [ ] This change requires a documentation update, included: Dify Document
  • [x] I understand that this PR may be closed in case there was no previous discussion or issues. (This doesn't apply to typos!)
  • [x] I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
  • [x] I've updated the documentation accordingly.
  • [x] I ran dev/reformat(backend) and cd web && npx lint-staged(frontend) to appease the lint gods

xinlmain avatar May 21 '25 08:05 xinlmain

Great PR! However, the two issues(PRs) you’ve referenced are not currently open. Could you please create a proposal for this implementation? We’d like to discuss some implementation details there. You can refer to issue #19429 for guidance.

laipz8200 avatar May 22 '25 05:05 laipz8200

Great PR! However, the two issues(PRs) you’ve referenced are not currently open. Could you please create a proposal for this implementation? We’d like to discuss some implementation details there. You can refer to issue #19429 for guidance.

@laipz8200 Thanks, I have created this one #20147

xinlmain avatar May 23 '25 06:05 xinlmain

+1, great PR. Some minor suggetions, 1. add some test for both implementations of the WorkflowNodeExecutionRepository , or only for InMemoryWorkflowNodeExecutionRepository , to ensure they are properly maintained and tested in future . 2. run dev/reformat locally for the sytle fixes and static type checks.

@bowenliang123 1 & 2 done. Also I added in-memory mode for WorkflowExecutionRepository, so the execution will merely have to interact with db. I've tested both modes locally.

xinlmain avatar Jun 03 '25 09:06 xinlmain

image The test failed because of TiFlash setup error, how to re-run the test?

xinlmain avatar Jun 05 '25 03:06 xinlmain

@QuantumGhost @laipz8200 Could you help give a review here?

xinlmain avatar Jun 05 '25 03:06 xinlmain

Hello @xinlmain,

To ensure that users can switch persistence strategies in a more effective manner, we are currently making some preparations. Once we make progress, we will promptly share our results with you and proceed with merging this PR. In the meantime, we appreciate your patience.

laipz8200 avatar Jun 05 '25 12:06 laipz8200

Hello @xinlmain,

To ensure that users can switch persistence strategies in a more effective manner, we are currently making some preparations. Once we make progress, we will promptly share our results with you and proceed with merging this PR. In the meantime, we appreciate your patience.

Hello @laipz8200 , how is this going?

xinlmain avatar Jul 04 '25 07:07 xinlmain

@xinlmain We are planning to merge #21458 soon, please check this solution. I look forward to hearing your advice.

laipz8200 avatar Jul 06 '25 16:07 laipz8200

@xinlmain We are planning to merge #21458 soon, please check this solution. I look forward to hearing your advice.

@laipz8200 I had a glimpse over #21458, if it's merged, I have to adjust my code to the new interfaces, right? If yes I'm OK with that.

xinlmain avatar Jul 07 '25 06:07 xinlmain

if it's merged, I have to adjust my code to the new interfaces, right?

Yes. Additionally, would you be open to modifying the in-memory mode to asynchronous storage mode(with Celery)? I believe this could enhance performance without reducing functionality.

For the in-memory option or any other solutions, we currently prefer to provide them in a third-party repository and support them through dynamic imports.

laipz8200 avatar Jul 07 '25 14:07 laipz8200

Hi @xinlmain! I hope you’re doing well. I wanted to let you know that #21458 has been merged. Would you like to begin working on adding another option for the WorkflowNodeExecution and WorkflowRun storage? I’m looking forward to an implementation using Celery to speed up this storage action. You can either rebase your code to this PR or open a new one. Let me know what you prefer!

laipz8200 avatar Jul 14 '25 06:07 laipz8200

Hi @xinlmain! I hope you’re doing well. I wanted to let you know that #21458 has been merged. Would you like to begin working on adding another option for the WorkflowNodeExecution and WorkflowRun storage? I’m looking forward to an implementation using Celery to speed up this storage action. You can either rebase your code to this PR or open a new one. Let me know what you prefer!

@laipz8200 Great, I'm working on it.

To be clear, the Celery approach you’re suggesting is to use Celery as a queue to make the database saving asynchronous. However, before the API call returns, it still needs to wait until all the writes are completed and the execution data is read from the database. Is my understanding correct?

Also, where will the Celery listener reside — in the worker service or in the API service? If we choose Redis as the Celery backend and use the worker service as the listener, I’m somewhat concerned about stability and performance, when the real-time API depends on the worker (which was originally designed for long-running jobs?).

That being said, is it possible to keep both the Celery and in-memory implementations?

xinlmain avatar Jul 16 '25 10:07 xinlmain

However, before the API call returns, it still needs to wait until all the writes are completed and the execution data is read from the database. Is my understanding correct?

It's not quite like that. Please take a look at https://github.com/langgenius/dify/blob/main/api/core/app/apps/workflow/generate_task_pipeline.py. In this script, self._workflow_response_converter is responsible for converting events into API responses, and it does not rely on the database. Meanwhile, self._workflow_cycle_manager handles the storage of WorkflowExecution and WorkflowNodeExecution, which is where we actually utilize our Repository. Here, database operations are sent to Redis and completed in a Celery worker, ensuring that API response times are not blocked by data processing.

Therefore, the Celery solution should achieve performance similar to the in-memory approach while still retaining access to log data.

laipz8200 avatar Jul 16 '25 13:07 laipz8200

However, before the API call returns, it still needs to wait until all the writes are completed and the execution data is read from the database. Is my understanding correct?

It's not quite like that. Please take a look at https://github.com/langgenius/dify/blob/main/api/core/app/apps/workflow/generate_task_pipeline.py. In this script, self._workflow_response_converter is responsible for converting events into API responses, and it does not rely on the database. Meanwhile, self._workflow_cycle_manager handles the storage of WorkflowExecution and WorkflowNodeExecution, which is where we actually utilize our Repository. Here, database operations are sent to Redis and completed in a Celery worker, ensuring that API response times are not blocked by data processing.

Therefore, the Celery solution should achieve performance similar to the in-memory approach while still retaining access to log data.

@laipz8200 Thanks, now I get it. Say when a QueueNodeSucceededEvent comes up, the generate_task_pipeline will first make a blocking call to handle method of workflow_cycle_manager

In handle_workflow_node_execution_succes's current implementation, it will first get some execution info from DB, do some processing, then save back into DB.

If we want to go the Celery way, does it mean we need to make the handle_workflow_node_execution_success async? Seems we cannot achieve the goal, without touching workflow_cycle_manager.py and generate_task_pipeline.py...

I’d appreciate it if you could correct me, cheers

xinlmain avatar Jul 16 '25 17:07 xinlmain

In handle_workflow_node_execution_succes's current implementation, it will first get some execution info from DB, do some processing, then save back into DB.

This was my oversight. I should avoid reading WorkflowExecution and WorkflowNodeExecution from the database by keeping them stored in memory. I have fixed this issue in #22597. Now, the WorkflowCycleManager just needs to use the save method, and we can safely transition it to Celery.

laipz8200 avatar Jul 17 '25 23:07 laipz8200

Great! I will be working on the Celery based repositories soon.

xinlmain avatar Jul 18 '25 00:07 xinlmain

@laipz8200 Done, updated summary, please have a look. Thanks!

xinlmain avatar Jul 27 '25 08:07 xinlmain

Hi! I'm the autofix logoautofix.ci troubleshooter bot.

It looks like you correctly set up a CI job that uses the autofix.ci GitHub Action, but the autofix.ci GitHub App has not been installed for this repository. This means that autofix.ci unfortunately does not have the permissions to fix this pull request. If you are the repository owner, please install the app and then restart the CI workflow! 😃

autofix-troubleshooter avatar Jul 28 '25 07:07 autofix-troubleshooter

@laipz8200 how to trigger the above test workflow? It seems to have been stuck for days.

xinlmain avatar Jul 31 '25 06:07 xinlmain

Hi @xinlmain, you can run unittests by using dev/pytest/pytest_unit_tests.sh

laipz8200 avatar Jul 31 '25 17:07 laipz8200

Hi @xinlmain, you can run unittests by using dev/pytest/pytest_unit_tests.sh

@laipz8200 Thanks, I finally got it all passed…… Please help trigger the test workflow again, appreciate so much for your patience!

xinlmain avatar Aug 01 '25 04:08 xinlmain

@laipz8200 Comments resolved. Now it looks a bit like the original in-memory repo, with an async celery path. Please help with the review, very much appreciated!

xinlmain avatar Aug 02 '25 14:08 xinlmain

Thank you. I'll do some tests later.

laipz8200 avatar Aug 03 '25 07:08 laipz8200

@laipz8200 Hope you are doing well! How is the test going? We're kind of waiting for this PR before upgrading to the latest version.

xinlmain avatar Aug 11 '25 02:08 xinlmain

@laipz8200 resolved conflicts, all clear now. Please help with the merge, thank you!

xinlmain avatar Aug 12 '25 02:08 xinlmain

@laipz8200 I'm back with some feedback 🤦🏻‍♀️

Using the celery repository we occasionally get assert exception here:

    def workflow_finish_to_stream_response(
        self,
        *,
        session: Session,
        task_id: str,
        workflow_execution: WorkflowExecution,
    ) -> WorkflowFinishStreamResponse:
        created_by = None
        workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_))
        **assert workflow_run is not None**

My guess is that as the writing of WorkflowRun is asynchronous, reading from DB at the end of the api session cannot be guaranteed. If that's true, should we modify this method to fit this situation?

xinlmain avatar Aug 15 '25 04:08 xinlmain

@laipz8200 I'm back with some feedback 🤦🏻‍♀️

Using the celery repository we occasionally get assert exception here:

    def workflow_finish_to_stream_response(
        self,
        *,
        session: Session,
        task_id: str,
        workflow_execution: WorkflowExecution,
    ) -> WorkflowFinishStreamResponse:
        created_by = None
        workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_))
        **assert workflow_run is not None**

My guess is that as the writing of WorkflowRun is asynchronous, reading from DB at the end of the api session cannot be guaranteed. If that's true, should we modify this method to fit this situation?

I implemented a fix that removes the dependency of DB: https://github.com/langgenius/dify/pull/24674

horochx avatar Aug 28 '25 03:08 horochx

Why did the speed slow down during my test? Is there a specific performance test report?

zengruizhao avatar Aug 29 '25 10:08 zengruizhao

Celery based repositories were not enabled by default in 1.8.1 release?

lianzhao avatar Sep 04 '25 04:09 lianzhao