dify
dify copied to clipboard
feat: add in-memory repo to improve workflow performance
Summary
Addresses #20147 See also #10982, See also https://github.com/langgenius/dify/pull/19325
The default workflow-run api is very slow on simple tasks, making it unacceptable for real time interactions. The api calls spend most of the time interacting with DB. In a scenario where the api is called by another service, there is not much sense to save each call into DB.
So here I added a fast mode for workflow api, it can be turned on by setting WORKFLOW_NODE_EXECUTION_REPO_MODE=memory. This enables deploying a dedicated api service for external api calls, with the tradeoff of not able to see execution detail in application logs from the web page.
Checklist
[!IMPORTANT]
Please review the checklist below before submitting your pull request.
- [ ] This change requires a documentation update, included: Dify Document
- [x] I understand that this PR may be closed in case there was no previous discussion or issues. (This doesn't apply to typos!)
- [x] I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
- [x] I've updated the documentation accordingly.
- [x] I ran
dev/reformat(backend) andcd web && npx lint-staged(frontend) to appease the lint gods
Great PR! However, the two issues(PRs) you’ve referenced are not currently open. Could you please create a proposal for this implementation? We’d like to discuss some implementation details there. You can refer to issue #19429 for guidance.
Great PR! However, the two issues(PRs) you’ve referenced are not currently open. Could you please create a proposal for this implementation? We’d like to discuss some implementation details there. You can refer to issue #19429 for guidance.
@laipz8200 Thanks, I have created this one #20147
+1, great PR. Some minor suggetions, 1. add some test for both implementations of the WorkflowNodeExecutionRepository , or only for InMemoryWorkflowNodeExecutionRepository , to ensure they are properly maintained and tested in future . 2. run
dev/reformatlocally for the sytle fixes and static type checks.
@bowenliang123 1 & 2 done. Also I added in-memory mode for WorkflowExecutionRepository, so the execution will merely have to interact with db.
I've tested both modes locally.
@QuantumGhost @laipz8200 Could you help give a review here?
Hello @xinlmain,
To ensure that users can switch persistence strategies in a more effective manner, we are currently making some preparations. Once we make progress, we will promptly share our results with you and proceed with merging this PR. In the meantime, we appreciate your patience.
Hello @xinlmain,
To ensure that users can switch persistence strategies in a more effective manner, we are currently making some preparations. Once we make progress, we will promptly share our results with you and proceed with merging this PR. In the meantime, we appreciate your patience.
Hello @laipz8200 , how is this going?
@xinlmain We are planning to merge #21458 soon, please check this solution. I look forward to hearing your advice.
@xinlmain We are planning to merge #21458 soon, please check this solution. I look forward to hearing your advice.
@laipz8200 I had a glimpse over #21458, if it's merged, I have to adjust my code to the new interfaces, right? If yes I'm OK with that.
if it's merged, I have to adjust my code to the new interfaces, right?
Yes. Additionally, would you be open to modifying the in-memory mode to asynchronous storage mode(with Celery)? I believe this could enhance performance without reducing functionality.
For the in-memory option or any other solutions, we currently prefer to provide them in a third-party repository and support them through dynamic imports.
Hi @xinlmain! I hope you’re doing well. I wanted to let you know that #21458 has been merged. Would you like to begin working on adding another option for the WorkflowNodeExecution and WorkflowRun storage? I’m looking forward to an implementation using Celery to speed up this storage action. You can either rebase your code to this PR or open a new one. Let me know what you prefer!
Hi @xinlmain! I hope you’re doing well. I wanted to let you know that #21458 has been merged. Would you like to begin working on adding another option for the
WorkflowNodeExecutionandWorkflowRunstorage? I’m looking forward to an implementation using Celery to speed up this storage action. You can either rebase your code to this PR or open a new one. Let me know what you prefer!
@laipz8200 Great, I'm working on it.
To be clear, the Celery approach you’re suggesting is to use Celery as a queue to make the database saving asynchronous. However, before the API call returns, it still needs to wait until all the writes are completed and the execution data is read from the database. Is my understanding correct?
Also, where will the Celery listener reside — in the worker service or in the API service? If we choose Redis as the Celery backend and use the worker service as the listener, I’m somewhat concerned about stability and performance, when the real-time API depends on the worker (which was originally designed for long-running jobs?).
That being said, is it possible to keep both the Celery and in-memory implementations?
However, before the API call returns, it still needs to wait until all the writes are completed and the execution data is read from the database. Is my understanding correct?
It's not quite like that. Please take a look at https://github.com/langgenius/dify/blob/main/api/core/app/apps/workflow/generate_task_pipeline.py. In this script, self._workflow_response_converter is responsible for converting events into API responses, and it does not rely on the database. Meanwhile, self._workflow_cycle_manager handles the storage of WorkflowExecution and WorkflowNodeExecution, which is where we actually utilize our Repository. Here, database operations are sent to Redis and completed in a Celery worker, ensuring that API response times are not blocked by data processing.
Therefore, the Celery solution should achieve performance similar to the in-memory approach while still retaining access to log data.
However, before the API call returns, it still needs to wait until all the writes are completed and the execution data is read from the database. Is my understanding correct?
It's not quite like that. Please take a look at https://github.com/langgenius/dify/blob/main/api/core/app/apps/workflow/generate_task_pipeline.py. In this script,
self._workflow_response_converteris responsible for converting events into API responses, and it does not rely on the database. Meanwhile,self._workflow_cycle_managerhandles the storage ofWorkflowExecutionandWorkflowNodeExecution, which is where we actually utilize ourRepository. Here, database operations are sent to Redis and completed in a Celery worker, ensuring that API response times are not blocked by data processing.Therefore, the Celery solution should achieve performance similar to the in-memory approach while still retaining access to log data.
@laipz8200 Thanks, now I get it. Say when a QueueNodeSucceededEvent comes up, the generate_task_pipeline will first make a blocking call to handle method of workflow_cycle_manager
In handle_workflow_node_execution_succes's current implementation, it will first get some execution info from DB, do some processing, then save back into DB.
If we want to go the Celery way, does it mean we need to make the handle_workflow_node_execution_success async? Seems we cannot achieve the goal, without touching workflow_cycle_manager.py and generate_task_pipeline.py...
I’d appreciate it if you could correct me, cheers
In handle_workflow_node_execution_succes's current implementation, it will first get some execution info from DB, do some processing, then save back into DB.
This was my oversight. I should avoid reading WorkflowExecution and WorkflowNodeExecution from the database by keeping them stored in memory. I have fixed this issue in #22597. Now, the WorkflowCycleManager just needs to use the save method, and we can safely transition it to Celery.
Great! I will be working on the Celery based repositories soon.
@laipz8200 Done, updated summary, please have a look. Thanks!
Hi! I'm the autofix.ci troubleshooter bot.
It looks like you correctly set up a CI job that uses the autofix.ci GitHub Action, but the autofix.ci GitHub App has not been installed for this repository. This means that autofix.ci unfortunately does not have the permissions to fix this pull request. If you are the repository owner, please install the app and then restart the CI workflow! 😃
@laipz8200 how to trigger the above test workflow? It seems to have been stuck for days.
Hi @xinlmain, you can run unittests by using dev/pytest/pytest_unit_tests.sh
Hi @xinlmain, you can run unittests by using
dev/pytest/pytest_unit_tests.sh
@laipz8200 Thanks, I finally got it all passed…… Please help trigger the test workflow again, appreciate so much for your patience!
@laipz8200 Comments resolved. Now it looks a bit like the original in-memory repo, with an async celery path. Please help with the review, very much appreciated!
Thank you. I'll do some tests later.
@laipz8200 Hope you are doing well! How is the test going? We're kind of waiting for this PR before upgrading to the latest version.
@laipz8200 resolved conflicts, all clear now. Please help with the merge, thank you!
@laipz8200 I'm back with some feedback 🤦🏻♀️
Using the celery repository we occasionally get assert exception here:
def workflow_finish_to_stream_response(
self,
*,
session: Session,
task_id: str,
workflow_execution: WorkflowExecution,
) -> WorkflowFinishStreamResponse:
created_by = None
workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_))
**assert workflow_run is not None**
My guess is that as the writing of WorkflowRun is asynchronous, reading from DB at the end of the api session cannot be guaranteed. If that's true, should we modify this method to fit this situation?
@laipz8200 I'm back with some feedback 🤦🏻♀️
Using the celery repository we occasionally get assert exception here:
def workflow_finish_to_stream_response( self, *, session: Session, task_id: str, workflow_execution: WorkflowExecution, ) -> WorkflowFinishStreamResponse: created_by = None workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_)) **assert workflow_run is not None**My guess is that as the writing of WorkflowRun is asynchronous, reading from DB at the end of the api session cannot be guaranteed. If that's true, should we modify this method to fit this situation?
I implemented a fix that removes the dependency of DB: https://github.com/langgenius/dify/pull/24674
Why did the speed slow down during my test? Is there a specific performance test report?
Celery based repositories were not enabled by default in 1.8.1 release?