langflow
langflow copied to clipboard
How use async APIs ?
Describe the bug I wanna parallelly call this API: /api/v1/process/{flow_id} by 'sync:false'. But all APIs performed sequentially. My flow is simple like ConversationChain with ChatOpenAI.
I wanna service to 100 over users. This feature is neccessary to me.
I tried apply with celery. but this always return 'Pending'
I read this feature is developing : https://docs.langflow.org/guides/async-tasks#introduction
How can I use this feature?
Hey @songjinu, nice to see you back! Hope all is well on your end. π
To parallelly call the API /api/v1/process/{flow_id}
for over 100 users using the async feature in Langflow, you can utilize the Promise.all
method in JavaScript. This method takes an array of promises and returns a single Promise that resolves when all of the promises in the array have resolved or when the array contains no promises. It rejects with the reason of the first promise that rejects.
Here's an example of how you might implement this:
import { api } from "../../controllers/API/api";
/**
* Calls the process API for a given flow ID.
* @param {string} flowId - The ID of the flow.
* @returns {Promise<AxiosResponse<any>>} The API response.
*/
async function processFlow(flowId: string) {
return await api.get(`/api/v1/process/${flowId}`);
}
/**
* Parallelly processes multiple flows.
* @param {string[]} flowIds - An array of flow IDs to process.
* @returns {Promise<AxiosResponse<any>[]>} An array of API responses.
*/
async function processFlowsParallel(flowIds: string[]) {
const promises = flowIds.map(flowId => processFlow(flowId));
return await Promise.all(promises);
}
// Example usage
const flowIds = ['flowId1', 'flowId2', 'flowId3']; // Replace these with actual flow IDs
processFlowsParallel(flowIds)
.then(responses => {
console.log('All flows processed:', responses);
})
.catch(error => {
console.error('Error processing flows:', error);
});
In this example, processFlow
is a function that takes a flowId
and returns a promise for calling the /api/v1/process/{flow_id}
endpoint. processFlowsParallel
takes an array of flowIds
, maps them to an array of promises using processFlow
, and then uses Promise.all
to execute them in parallel. This approach should work assuming your API and server infrastructure can handle the concurrent load.
Regarding the current implementation of the /api/v1/process/{flow_id}
endpoint in the Langflow codebase, it is an asynchronous function named process
. This function is designed to process an input with a given flow_id
. It accepts several parameters including session
, flow_id
, inputs
, tweaks
, clear_cache
, session_id
, task_service
, api_key_user
, sync
, and session_service
. The function performs various operations such as loading session data, building and generating results from a graph, and handling exceptions. It returns a ProcessResponse
which includes the result of the processing, the status, a task response, the session ID, and the backend name used. You can find more details about this in the source code.
As for the async feature development in the Langflow codebase, it is currently "in development." Contributions to this implementation are welcome. You can track the progress of this feature in the documentation.
I hope this helps! If you have any other questions, feel free to ask.
Sources
- src/frontend/src/controllers/API/index.ts
- src/backend/langflow/api/v1/endpoints.py
- docs/docs/guides/async-tasks.mdx
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a π if this is helpful and π if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.