pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

Access uploaded files in pipelines

Open gqoew opened this issue 1 year ago • 24 comments

Hi there,

Being able to access uploaded files would be a great addition to pipelines. It would greatly expand the potential of pipelines, by not being limited with text input.

It would be also great to enable pipelines to return files in the chat as well.

Is there any plan to move this feature forward in the near future? Would be happy to test

Related issues: #66 #19 #81

gqoew avatar Jul 17 '24 08:07 gqoew

Looking forward to it. Hopefully, we can add the ability to process files in custom pipelines as soon as possible. This will greatly enhance the scalability of the project. Is there anything I can do? I'd like to help.

g453030291 avatar Jul 19 '24 08:07 g453030291

Hey, Is there a way to get the files the user has selected in the pipelines class ? currently the only arguments are "user_message, model_id, messages and body". In the default RAG pipeline information such as file names, collection_names are provided, basically information about which file/collection the user has selected in the message. Can this information also be accessed in Pipelines ?

chandan-artpark avatar Jul 25 '24 09:07 chandan-artpark

I have the same problem. If we don't have access to user-uploaded files, it limits a lot of functionality 😶It' s hard to get other parameters passed by the front end, such as whether a new session has been created (which bothers me, even if a new session is created, there is no way to restart a new context), likes or dislike, etc.

JiangYain avatar Aug 14 '24 04:08 JiangYain

you can access uploaded files by adding an inlet function, if you upload a file, you should see it in the body:

    async def inlet(self, body: dict, user: dict) -> dict:
        # This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API.
        print(f"inlet:{__name__}")

        print(body)
        print(user)

        return body

InquestGeronimo avatar Aug 20 '24 15:08 InquestGeronimo

@InquestGeronimo Sorry for pinging you, but did the API change? Some weeks ago I tried to make a example pipeline, and it errored out as soon as I attached an image (#66). Is it now "supported"?

That's the main thing that's holding me back from integrating pipelines instead of OpenAI so far. I don't want to loose image capabilities.


EDIT: Looks like something HAS changed! The pipeline doesn't error out anymore. Yay! Guess I'll be using Pipelines now!

@tjbck Care to close this issue? I'm not OP but I guess this is solved.

Fusseldieb avatar Aug 28 '24 16:08 Fusseldieb

Here is a hacky way to access uploaded files. Define an inlet function as suggested by @InquestGeronimo and query + /content

async def inlet(self, body: dict, user: dict) -> dict:
    print(f"Received body: {body}")
    files = body.get("files", [])
    for file in files:
        content_url = file["url"] + "/content"
        print(f"file available at {content_url}")
        # read the file content as binary and do something ...
    return body

S1M0N38 avatar Oct 04 '24 16:10 S1M0N38

Hi @InquestGeronimo , you solution works for me. Thank ! I still have an issue, it seems the data is not given has it is, do you know why ? Is there a way to get the original file content ?

Here is the original data:

PlayerID;FirstName;LastName;Team;Position;Goals;Assists;Appearances
1;Leo;Messi;Paris Saint-Germain;Forward;672;305;786
2;Cristiano;Ronaldo;Al Nassr;Forward;700;223;900
3;Neymar;Da Silva Santos;Al Hilal;Forward;398;200;600
4;Kylian;Mbappe;Paris Saint-Germain;Forward;300;150;400
5;Robert;Lewandowski;FC Barcelona;Forward;500;150;700
6;Kevin;De Bruyne;Manchester City;Midfielder;100;200;500
7;Luka;Modric;Real Madrid;Midfielder;120;170;600
8;N'Golo;Kante;Chelsea;Midfielder;30;80;400
9;Ruben;Dias;Manchester City;Defender;10;20;250
10;Virgil;Van Dijk;Liverpool;Defender;20;15;250

And here is what I got from the pipeline:

{
  "id": "6547e61d-dc1d-4544-a4fa-b796d40303e5",
  "user_id": "80ce7079-c367-41e5-89f7-7de8534b90e4",
  "hash": "75e40889b84327411325d75964484104733eb18c58ff14ff1d0c8f057defa1e0",
  "filename": "6547e61d-dc1d-4544-a4fa-b796d40303e5_players.csv",
  "data": {
    "content": "PlayerID: 1\nFirstName: Leo\nLastName: Messi\nTeam: Paris Saint-Germain\nPosition: Forward\nGoals: 672\nAssists: 305\nAppearances: 786 PlayerID: 2\nFirstName: Cristiano\nLastName: Ronaldo\nTeam: Al Nassr\nPosition: Forward\nGoals: 700\nAssists: 223\nAppearances: 900 PlayerID: 3\nFirstName: Neymar\nLastName: Da Silva Santos\nTeam: Al Hilal\nPosition: Forward\nGoals: 398\nAssists: 200\nAppearances: 600 PlayerID: 4\nFirstName: Kylian\nLastName: Mbappe\nTeam: Paris Saint-Germain\nPosition: Forward\nGoals: 300\nAssists: 150\nAppearances: 400 PlayerID: 5\nFirstName: Robert\nLastName: Lewandowski\nTeam: FC Barcelona\nPosition: Forward\nGoals: 500\nAssists: 150\nAppearances: 700 PlayerID: 6\nFirstName: Kevin\nLastName: De Bruyne\nTeam: Manchester City\nPosition: Midfielder\nGoals: 100\nAssists: 200\nAppearances: 500 PlayerID: 7\nFirstName: Luka\nLastName: Modric\nTeam: Real Madrid\nPosition: Midfielder\nGoals: 120\nAssists: 170\nAppearances: 600 PlayerID: 8\nFirstName: N'Golo\nLastName: Kante\nTeam: Chelsea\nPosition: Midfielder\nGoals: 30\nAssists: 80\nAppearances: 400 PlayerID: 9\nFirstName: Ruben\nLastName: Dias\nTeam: Manchester City\nPosition: Defender\nGoals: 10\nAssists: 20\nAppearances: 250 PlayerID: 10\nFirstName: Virgil\nLastName: Van Dijk\nTeam: Liverpool\nPosition: Defender\nGoals: 20\nAssists: 15\nAppearances: 250"
  },
  "meta": {
    "name": "players.csv",
    "content_type": "text/csv",
    "size": 579,
    "path": "/app/backend/data/uploads/6547e61d-dc1d-4544-a4fa-b796d40303e5_players.csv",
    "collection_name": "file-6547e61d-dc1d-4544-a4fa-b796d40303e5"
  },
  "created_at": 1729251218,
  "updated_at": 1729251218
}

Regards

jeandelest avatar Oct 18 '24 13:10 jeandelest

i have "solved" the issue with this approach. This works when files are uploaded inside the chat

class Pipeline:
    class Valves(BaseModel):
        myValves...

    def __init__(self):
        self.name = "pipeline_custom_name"
        self.valves = self._initialize_valves()
        self.file_contents = {}

    def _initialize_valves(self) -> Valves:
        """Initialize valves using environment variables."""
        return self.Valves(
            my valves init
        )

    async def on_startup(self):
        """Called when the server is started."""
        logger.info(f"Server {self.name} is starting.")

    async def on_shutdown(self):
        """Called when the server is stopped."""
        logger.info(f"Server {self.name} is shutting down.")

    async def on_valves_updated(self):
        """Called when the valves are updated."""
        logger.info("Valves updated.")


    async def inlet(self, body: dict, user: dict) -> dict:
        """Modifies form data before the OpenAI API request."""
        logger.info("Processing inlet request")

        # Extract file info for all files in the body
        # here i have created an inmemory dictionary to link users to their owned files
        file_info = self._extract_file_info(body)
        self.file_contents[user["id"]] = file_info
        return body

    def _extract_file_info(self, body: dict) -> list:
        """Extracts the file info from the request body for all files."""
        files = []
        for file_data in body.get("files", []):
            file = file_data["file"]
            file_id = file["id"]
            filename = file["filename"]
            file_content = file["data"]["content"]

            # Create a OIFile object and append it to the list
            files.append(OIFile(file_id, filename, file_content))

        return files
        
    def pipe(
        self, body: dict, user_message: str, model_id: str, messages: List[dict]
    ) -> Union[str, Generator, Iterator]:
        
        logger.info("Starting PIPE process")

        # Extract parameters from body with default fallbacks
        stream = body.get("stream", True)
        max_tokens = body.get("max_tokens", self.valves.LLM_MAX_TOKENS)
        temperature = body.get("temperature", self.valves.LLM_TEMPERATURE)

        # Extract user ID from the body
        user = body.get("user", {})
        user_id = user.get("id", "")

        # Extract user files if available
        if user_id in self.file_contents:
            user_files = self.file_contents[user_id]
        else:
            user_files = None
        
        DO YOUR STUFF
        return result

    async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
        print(f"outlet:{__name__}")
        print(f"Received body: {body}")
      
        if user["id"] in self.file_contents:
            del self.file_contents[user["id"]]

        return body

        ```
        
Openwebui call the inlet, the pipe and the outlet every time the user send a query to the pipeline.
If you create a custom model (from the UI) using as base_model your pipeline, openWEBUI only call the pipe method (I don't understand why).

sir3mat avatar Nov 07 '24 09:11 sir3mat

@sir3mat I tried the above method and printed just the body dictionary to see what is being passed. This dictionary seems to be only containing all the chat info like RAG prompt and context for the user query. It is not passing the complete document, which we want to do, so as to be able to perform our own custom retrieval through our pipeline

To reproduce:

from typing import List, Union, Generator, Iterator
    

class Pipeline:
    def init(self):
        self.name = "00 Repeater Example"
        pass

    async def on_startup(self):
        # This function is called when the server is started.
        print(f"on_startup")
        pass

    async def on_shutdown(self):
        # This function is called when the server is shutdown.
        print(f"on_shutdown")
        pass

    async def inlet(self, body: dict, user: dict) -> dict:
        return body

    
    def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]:
        return (f"Type of body: {type(body)} \n {body}") #user_message to the UI

rigvedrs avatar Jan 03 '25 18:01 rigvedrs

The body data which is given to the pipe(), does not contain details like filename, collection_name, so you have to get the details from the inlet function and store it in a variable like this

self.inlet_details = []

class Pipeline:
 async def inlet(self, body: dict, user: dict) -> dict:
        print(f"Received body: {body}")
        files = body.get("files", [])
        for file in files:
            self.inlet_details.append({
                "filename": file.get("filename", "unknown"),
                "url": file.get("url", "unknown"),
    })

using these additional details you can lookup the file in the uploads dir and get the content, there is also an alternative way where you can send a request to a webui endpoint using an API key from your account in webui settings. Hope this helps.

chandan-artpark avatar Jan 06 '25 07:01 chandan-artpark

@jeandelest I also have this issue. Open Webui is segmenting my CSV file and making it newlines instead of commas. Let me know if you have found a solution.

savvaki avatar Mar 18 '25 05:03 savvaki

Same issue for uploaded html files. the file["content"]["data"] variable does not contain the raw HTML data and instead some parsed version of it instead - how stupid

srajangarg avatar Mar 18 '25 05:03 srajangarg

@chandan-artpark I don't think you can directly read from the uploads directory, since the pipelines container is a separate container to open-webui. I guess you would have to mount the uploads folder into the pipelines container.

savvaki avatar Mar 18 '25 05:03 savvaki

I have same issue. Acess raw files and handle AIGC requests inside pipeline seems not possible still with latest build v0.5.20.

wangjiyang avatar Mar 19 '25 10:03 wangjiyang

You can get the raw files by cloning the volume from the open webui container in the pipelines container.

InquestGeronimo avatar Mar 19 '25 20:03 InquestGeronimo

I’ve been trying out different ways to upload files. I’m using the inlet function shared by @InquestGeronimo and the pipe function from @sir3mat. The goal is to send file info to an n8n workflow through a webhook.

I’ve tried sending the file URL and even the content, but nothing seems to work—it just doesn’t go through.

Here’s the code I’m using (based on Cole Medin’s n8n pipe):

"""
title: n8n Pipe Function
author: Cole Medin
author_url: https://www.youtube.com/@ColeMedin
version: 0.2.0

This module defines a Pipe class that utilizes N8N for an Agent
"""

from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time
import requests


def extract_event_info(event_emitter) -> tuple[Optional[str], Optional[str]]:
    if not event_emitter or not event_emitter.__closure__:
        return None, None
    for cell in event_emitter.__closure__:
        if isinstance(request_info := cell.cell_contents, dict):
            chat_id = request_info.get("chat_id")
            message_id = request_info.get("message_id")
            return chat_id, message_id
    return None, None


class Pipe:
    class Valves(BaseModel):
        n8n_url: str = Field(
            default="https://n8n.[your domain].com/webhook/[your webhook URL]"
        )
        n8n_bearer_token: str = Field(default="...")
        input_field: str = Field(default="chatInput")
        response_field: str = Field(default="output")
        emit_interval: float = Field(
            default=2.0, description="Interval in seconds between status emissions"
        )
        enable_status_indicator: bool = Field(
            default=True, description="Enable or disable status indicator emissions"
        )

    def __init__(self):
        self.type = "pipe"
        self.id = "n8n_pipe"
        self.name = "N8N Pipe"
        self.valves = self.Valves()
        self.last_emit_time = 0
        self.file_contents = {}
        pass

    async def emit_status(
        self,
        __event_emitter__: Callable[[dict], Awaitable[None]],
        level: str,
        message: str,
        done: bool,
    ):
        current_time = time.time()
        if (
            __event_emitter__
            and self.valves.enable_status_indicator
            and (
                current_time - self.last_emit_time >= self.valves.emit_interval or done
            )
        ):
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {
                        "status": "complete" if done else "in_progress",
                        "level": level,
                        "description": message,
                        "done": done,
                    },
                }
            )
            self.last_emit_time = current_time

    async def inlet(self, body: dict, user: dict) -> dict:
        print(f"Received body: {body}")
        files = body.get("files", [])
        for file in files:
            content_url = file["url"] + "/content"
            print(f"file available at {content_url}")

            self.file_contents[user["id"]] = content_url
        return body

    async def pipe(
        self,
        body: dict,
        __user__: Optional[dict] = None,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
        __event_call__: Callable[[dict], Awaitable[dict]] = None,
    ) -> Optional[dict]:
        await self.emit_status(
            __event_emitter__, "info", "/Calling N8N Workflow...", False
        )
        chat_id, _ = extract_event_info(__event_emitter__)
        messages = body.get("messages", [])

        # Extract user ID from the body
        user = body.get("user", {})
        user_id = user.get("id", "")

        # Verify a message is available
        if messages:
            question = messages[-1]["content"]
            try:
                # Invoke N8N workflow
                headers = {
                    "Authorization": f"Bearer {self.valves.n8n_bearer_token}",
                    "Content-Type": "application/json",
                }
                payload = {"sessionId": f"{chat_id}"}
                payload[self.valves.input_field] = question
                payload["files"] = self.file_contents
                response = requests.post(
                    self.valves.n8n_url, json=payload, headers=headers
                )

                if response.status_code == 200:
                    n8n_response = response.json()[self.valves.response_field]
                else:
                    raise Exception(f"Error: {response.status_code} - {response.text}")

                # Set assitant message with chain reply
                body["messages"].append({"role": "assistant", "content": n8n_response})
            except Exception as e:
                await self.emit_status(
                    __event_emitter__,
                    "error",
                    f"Error during sequence execution: {str(e)}",
                    True,
                )
                return {"error": str(e)}
        # If no message is available alert user
        else:
            await self.emit_status(
                __event_emitter__,
                "error",
                "No messages found in the request body",
                True,
            )
            body["messages"].append(
                {
                    "role": "assistant",
                    "content": "No messages found in the request body",
                }
            )

        await self.emit_status(__event_emitter__, "info", "Complete", True)
        return n8n_response

I’ve tested a bunch of things, but still can’t figure it out. Has anyone run into this or know what might be going wrong? Any help would be really appreciated

ignchap27 avatar May 09 '25 01:05 ignchap27

I’ve been trying out different ways to upload files. I’m using the inlet function shared by @InquestGeronimo and the pipe function from @sir3mat. The goal is to send file info to an n8n workflow through a webhook.

I’ve tried sending the file URL and even the content, but nothing seems to work—it just doesn’t go through.

Here’s the code I’m using (based on Cole Medin’s n8n pipe):

"""
title: n8n Pipe Function
author: Cole Medin
author_url: https://www.youtube.com/@ColeMedin
version: 0.2.0

This module defines a Pipe class that utilizes N8N for an Agent
"""

from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time
import requests


def extract_event_info(event_emitter) -> tuple[Optional[str], Optional[str]]:
    if not event_emitter or not event_emitter.__closure__:
        return None, None
    for cell in event_emitter.__closure__:
        if isinstance(request_info := cell.cell_contents, dict):
            chat_id = request_info.get("chat_id")
            message_id = request_info.get("message_id")
            return chat_id, message_id
    return None, None


class Pipe:
    class Valves(BaseModel):
        n8n_url: str = Field(
            default="https://n8n.[your domain].com/webhook/[your webhook URL]"
        )
        n8n_bearer_token: str = Field(default="...")
        input_field: str = Field(default="chatInput")
        response_field: str = Field(default="output")
        emit_interval: float = Field(
            default=2.0, description="Interval in seconds between status emissions"
        )
        enable_status_indicator: bool = Field(
            default=True, description="Enable or disable status indicator emissions"
        )

    def __init__(self):
        self.type = "pipe"
        self.id = "n8n_pipe"
        self.name = "N8N Pipe"
        self.valves = self.Valves()
        self.last_emit_time = 0
        self.file_contents = {}
        pass

    async def emit_status(
        self,
        __event_emitter__: Callable[[dict], Awaitable[None]],
        level: str,
        message: str,
        done: bool,
    ):
        current_time = time.time()
        if (
            __event_emitter__
            and self.valves.enable_status_indicator
            and (
                current_time - self.last_emit_time >= self.valves.emit_interval or done
            )
        ):
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {
                        "status": "complete" if done else "in_progress",
                        "level": level,
                        "description": message,
                        "done": done,
                    },
                }
            )
            self.last_emit_time = current_time

    async def inlet(self, body: dict, user: dict) -> dict:
        print(f"Received body: {body}")
        files = body.get("files", [])
        for file in files:
            content_url = file["url"] + "/content"
            print(f"file available at {content_url}")

            self.file_contents[user["id"]] = content_url
        return body

    async def pipe(
        self,
        body: dict,
        __user__: Optional[dict] = None,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
        __event_call__: Callable[[dict], Awaitable[dict]] = None,
    ) -> Optional[dict]:
        await self.emit_status(
            __event_emitter__, "info", "/Calling N8N Workflow...", False
        )
        chat_id, _ = extract_event_info(__event_emitter__)
        messages = body.get("messages", [])

        # Extract user ID from the body
        user = body.get("user", {})
        user_id = user.get("id", "")

        # Verify a message is available
        if messages:
            question = messages[-1]["content"]
            try:
                # Invoke N8N workflow
                headers = {
                    "Authorization": f"Bearer {self.valves.n8n_bearer_token}",
                    "Content-Type": "application/json",
                }
                payload = {"sessionId": f"{chat_id}"}
                payload[self.valves.input_field] = question
                payload["files"] = self.file_contents
                response = requests.post(
                    self.valves.n8n_url, json=payload, headers=headers
                )

                if response.status_code == 200:
                    n8n_response = response.json()[self.valves.response_field]
                else:
                    raise Exception(f"Error: {response.status_code} - {response.text}")

                # Set assitant message with chain reply
                body["messages"].append({"role": "assistant", "content": n8n_response})
            except Exception as e:
                await self.emit_status(
                    __event_emitter__,
                    "error",
                    f"Error during sequence execution: {str(e)}",
                    True,
                )
                return {"error": str(e)}
        # If no message is available alert user
        else:
            await self.emit_status(
                __event_emitter__,
                "error",
                "No messages found in the request body",
                True,
            )
            body["messages"].append(
                {
                    "role": "assistant",
                    "content": "No messages found in the request body",
                }
            )

        await self.emit_status(__event_emitter__, "info", "Complete", True)
        return n8n_response

I’ve tested a bunch of things, but still can’t figure it out. Has anyone run into this or know what might be going wrong? Any help would be really appreciated

If you are using a Pipe class (type of function) you need to add files param in pipe() methods

Otherwise you need to use a Pipeline class and you can use the inlet

sir3mat avatar May 09 '25 04:05 sir3mat

@sir3mat I just tried adding the files parameter directly to my pipe() method, but it still doesn’t work. Do you think I should consider switching to the Pipeline class instead?

ignchap27 avatar May 09 '25 14:05 ignchap27

@sir3mat I just tried adding the files parameter directly to my pipe() method, but it still doesn’t work. Do you think I should consider switching to the Pipeline class instead?

Sorry, due to MD formatting of GitHub response it parsed wrong param name The param name is not "files" but "__files__" like this one https://github.com/open-webui/open-webui/blob/a3bb7df61058e690a76cebb7681bd5390e77d226/backend/open_webui/functions.py#L231

sir3mat avatar May 09 '25 14:05 sir3mat

I just happened to have spend my morning writing some documentation about what I call "special arguments" like __file__. The PR is open there and might be of help to you. Also any feedback is appreciated.

thiswillbeyourgithub avatar May 09 '25 14:05 thiswillbeyourgithub

@sir3mat you just saved me, i've been looking for a solution for weeks

ignchap27 avatar May 09 '25 14:05 ignchap27

Hey all, I also want to access the raw file bytes in my pipeline but can't piece it together based on the thread. @ignchap27 @thiswillbeyourgithub @sir3mat can you pls clarify where the __files__ parameter is supposed to go to access the raw file contents from a custom pipeline?

istranic avatar May 12 '25 16:05 istranic

Hey all, I also want to access the raw file bytes in my pipeline but can't piece it together based on the thread. @ignchap27 @thiswillbeyourgithub @sir3mat can you pls clarify where the __files__ parameter is supposed to go to access the raw file contents from a custom pipeline?

If you are using a Pipeline class u need the inlet like this https://github.com/open-webui/pipelines/issues/164#issuecomment-2461707626

If you are using a Pipe class (a type of owui Function) you need to use __files__ param in pipe() method

sir3mat avatar May 12 '25 19:05 sir3mat

Thank you, though I don't see any bytes in the file_info. I was able to get the bytes by sending an API request to OpenwebUI backend using the file-specific URL that's in the body:

    url = os.environ["OPENWEBUI_URL"] + f["url"] + "/content"
    response = requests.get(
        url,
        headers={
            "Authorization": f"Bearer {os.environ['OPENWEBUI_API_KEY']}"
        },
    )

In the inlet, the url can be found in body["files"][i]["url']

istranic avatar May 12 '25 20:05 istranic

The endpoint is predefined and lives on OpenWebUI's side, you can access user uploaded files through {OPENWEBUI_URL}/api/v1/files/{file_id}/content.

One thing to note is that, you need to send the Authorization header along with the request with value Bearer {API_KEY}. You can retrieve the API KEY from the OpenWebUI's frontend: Settings -> Account -> API Keys -> JWT Token.

Currently, there is now way of knowing which file is just uploaded for the particular request.

0x0wen avatar Oct 30 '25 11:10 0x0wen