langflow icon indicating copy to clipboard operation
langflow copied to clipboard

How use async APIs ?

Open songjinu opened this issue 10 months ago β€’ 1 comments

Describe the bug I wanna parallelly call this API: /api/v1/process/{flow_id} by 'sync:false'. But all APIs performed sequentially. My flow is simple like ConversationChain with ChatOpenAI.

I wanna service to 100 over users. This feature is neccessary to me.

I tried apply with celery. but this always return 'Pending'

I read this feature is developing : https://docs.langflow.org/guides/async-tasks#introduction

How can I use this feature?

songjinu avatar Mar 25 '24 10:03 songjinu

Hey @songjinu, nice to see you back! Hope all is well on your end. πŸš€

To parallelly call the API /api/v1/process/{flow_id} for over 100 users using the async feature in Langflow, you can utilize the Promise.all method in JavaScript. This method takes an array of promises and returns a single Promise that resolves when all of the promises in the array have resolved or when the array contains no promises. It rejects with the reason of the first promise that rejects.

Here's an example of how you might implement this:

import { api } from "../../controllers/API/api";

/**
 * Calls the process API for a given flow ID.
 * @param {string} flowId - The ID of the flow.
 * @returns {Promise<AxiosResponse<any>>} The API response.
 */
async function processFlow(flowId: string) {
  return await api.get(`/api/v1/process/${flowId}`);
}

/**
 * Parallelly processes multiple flows.
 * @param {string[]} flowIds - An array of flow IDs to process.
 * @returns {Promise<AxiosResponse<any>[]>} An array of API responses.
 */
async function processFlowsParallel(flowIds: string[]) {
  const promises = flowIds.map(flowId => processFlow(flowId));
  return await Promise.all(promises);
}

// Example usage
const flowIds = ['flowId1', 'flowId2', 'flowId3']; // Replace these with actual flow IDs
processFlowsParallel(flowIds)
  .then(responses => {
    console.log('All flows processed:', responses);
  })
  .catch(error => {
    console.error('Error processing flows:', error);
  });

In this example, processFlow is a function that takes a flowId and returns a promise for calling the /api/v1/process/{flow_id} endpoint. processFlowsParallel takes an array of flowIds, maps them to an array of promises using processFlow, and then uses Promise.all to execute them in parallel. This approach should work assuming your API and server infrastructure can handle the concurrent load.

Regarding the current implementation of the /api/v1/process/{flow_id} endpoint in the Langflow codebase, it is an asynchronous function named process. This function is designed to process an input with a given flow_id. It accepts several parameters including session, flow_id, inputs, tweaks, clear_cache, session_id, task_service, api_key_user, sync, and session_service. The function performs various operations such as loading session data, building and generating results from a graph, and handling exceptions. It returns a ProcessResponse which includes the result of the processing, the status, a task response, the session ID, and the backend name used. You can find more details about this in the source code.

As for the async feature development in the Langflow codebase, it is currently "in development." Contributions to this implementation are welcome. You can track the progress of this feature in the documentation.

I hope this helps! If you have any other questions, feel free to ask.

Sources

About Dosu

This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a πŸ‘ if this is helpful and πŸ‘Ž if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

dosubot[bot] avatar Mar 25 '24 10:03 dosubot[bot]