pipelines
pipelines copied to clipboard
Access uploaded files in pipelines
Hi there,
Being able to access uploaded files would be a great addition to pipelines. It would greatly expand the potential of pipelines, by not being limited with text input.
It would be also great to enable pipelines to return files in the chat as well.
Is there any plan to move this feature forward in the near future? Would be happy to test
Related issues: #66 #19 #81
Looking forward to it. Hopefully, we can add the ability to process files in custom pipelines as soon as possible. This will greatly enhance the scalability of the project. Is there anything I can do? I'd like to help.
Hey, Is there a way to get the files the user has selected in the pipelines class ? currently the only arguments are "user_message, model_id, messages and body". In the default RAG pipeline information such as file names, collection_names are provided, basically information about which file/collection the user has selected in the message. Can this information also be accessed in Pipelines ?
I have the same problem. If we don't have access to user-uploaded files, it limits a lot of functionality 😶It' s hard to get other parameters passed by the front end, such as whether a new session has been created (which bothers me, even if a new session is created, there is no way to restart a new context), likes or dislike, etc.
you can access uploaded files by adding an inlet function, if you upload a file, you should see it in the body:
async def inlet(self, body: dict, user: dict) -> dict:
# This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API.
print(f"inlet:{__name__}")
print(body)
print(user)
return body
@InquestGeronimo Sorry for pinging you, but did the API change? Some weeks ago I tried to make a example pipeline, and it errored out as soon as I attached an image (#66). Is it now "supported"?
That's the main thing that's holding me back from integrating pipelines instead of OpenAI so far. I don't want to loose image capabilities.
EDIT: Looks like something HAS changed! The pipeline doesn't error out anymore. Yay! Guess I'll be using Pipelines now!
@tjbck Care to close this issue? I'm not OP but I guess this is solved.
Here is a hacky way to access uploaded files.
Define an inlet function as suggested by @InquestGeronimo and query
async def inlet(self, body: dict, user: dict) -> dict:
print(f"Received body: {body}")
files = body.get("files", [])
for file in files:
content_url = file["url"] + "/content"
print(f"file available at {content_url}")
# read the file content as binary and do something ...
return body
Hi @InquestGeronimo , you solution works for me. Thank ! I still have an issue, it seems the data is not given has it is, do you know why ? Is there a way to get the original file content ?
Here is the original data:
PlayerID;FirstName;LastName;Team;Position;Goals;Assists;Appearances
1;Leo;Messi;Paris Saint-Germain;Forward;672;305;786
2;Cristiano;Ronaldo;Al Nassr;Forward;700;223;900
3;Neymar;Da Silva Santos;Al Hilal;Forward;398;200;600
4;Kylian;Mbappe;Paris Saint-Germain;Forward;300;150;400
5;Robert;Lewandowski;FC Barcelona;Forward;500;150;700
6;Kevin;De Bruyne;Manchester City;Midfielder;100;200;500
7;Luka;Modric;Real Madrid;Midfielder;120;170;600
8;N'Golo;Kante;Chelsea;Midfielder;30;80;400
9;Ruben;Dias;Manchester City;Defender;10;20;250
10;Virgil;Van Dijk;Liverpool;Defender;20;15;250
And here is what I got from the pipeline:
{
"id": "6547e61d-dc1d-4544-a4fa-b796d40303e5",
"user_id": "80ce7079-c367-41e5-89f7-7de8534b90e4",
"hash": "75e40889b84327411325d75964484104733eb18c58ff14ff1d0c8f057defa1e0",
"filename": "6547e61d-dc1d-4544-a4fa-b796d40303e5_players.csv",
"data": {
"content": "PlayerID: 1\nFirstName: Leo\nLastName: Messi\nTeam: Paris Saint-Germain\nPosition: Forward\nGoals: 672\nAssists: 305\nAppearances: 786 PlayerID: 2\nFirstName: Cristiano\nLastName: Ronaldo\nTeam: Al Nassr\nPosition: Forward\nGoals: 700\nAssists: 223\nAppearances: 900 PlayerID: 3\nFirstName: Neymar\nLastName: Da Silva Santos\nTeam: Al Hilal\nPosition: Forward\nGoals: 398\nAssists: 200\nAppearances: 600 PlayerID: 4\nFirstName: Kylian\nLastName: Mbappe\nTeam: Paris Saint-Germain\nPosition: Forward\nGoals: 300\nAssists: 150\nAppearances: 400 PlayerID: 5\nFirstName: Robert\nLastName: Lewandowski\nTeam: FC Barcelona\nPosition: Forward\nGoals: 500\nAssists: 150\nAppearances: 700 PlayerID: 6\nFirstName: Kevin\nLastName: De Bruyne\nTeam: Manchester City\nPosition: Midfielder\nGoals: 100\nAssists: 200\nAppearances: 500 PlayerID: 7\nFirstName: Luka\nLastName: Modric\nTeam: Real Madrid\nPosition: Midfielder\nGoals: 120\nAssists: 170\nAppearances: 600 PlayerID: 8\nFirstName: N'Golo\nLastName: Kante\nTeam: Chelsea\nPosition: Midfielder\nGoals: 30\nAssists: 80\nAppearances: 400 PlayerID: 9\nFirstName: Ruben\nLastName: Dias\nTeam: Manchester City\nPosition: Defender\nGoals: 10\nAssists: 20\nAppearances: 250 PlayerID: 10\nFirstName: Virgil\nLastName: Van Dijk\nTeam: Liverpool\nPosition: Defender\nGoals: 20\nAssists: 15\nAppearances: 250"
},
"meta": {
"name": "players.csv",
"content_type": "text/csv",
"size": 579,
"path": "/app/backend/data/uploads/6547e61d-dc1d-4544-a4fa-b796d40303e5_players.csv",
"collection_name": "file-6547e61d-dc1d-4544-a4fa-b796d40303e5"
},
"created_at": 1729251218,
"updated_at": 1729251218
}
Regards
i have "solved" the issue with this approach. This works when files are uploaded inside the chat
class Pipeline:
class Valves(BaseModel):
myValves...
def __init__(self):
self.name = "pipeline_custom_name"
self.valves = self._initialize_valves()
self.file_contents = {}
def _initialize_valves(self) -> Valves:
"""Initialize valves using environment variables."""
return self.Valves(
my valves init
)
async def on_startup(self):
"""Called when the server is started."""
logger.info(f"Server {self.name} is starting.")
async def on_shutdown(self):
"""Called when the server is stopped."""
logger.info(f"Server {self.name} is shutting down.")
async def on_valves_updated(self):
"""Called when the valves are updated."""
logger.info("Valves updated.")
async def inlet(self, body: dict, user: dict) -> dict:
"""Modifies form data before the OpenAI API request."""
logger.info("Processing inlet request")
# Extract file info for all files in the body
# here i have created an inmemory dictionary to link users to their owned files
file_info = self._extract_file_info(body)
self.file_contents[user["id"]] = file_info
return body
def _extract_file_info(self, body: dict) -> list:
"""Extracts the file info from the request body for all files."""
files = []
for file_data in body.get("files", []):
file = file_data["file"]
file_id = file["id"]
filename = file["filename"]
file_content = file["data"]["content"]
# Create a OIFile object and append it to the list
files.append(OIFile(file_id, filename, file_content))
return files
def pipe(
self, body: dict, user_message: str, model_id: str, messages: List[dict]
) -> Union[str, Generator, Iterator]:
logger.info("Starting PIPE process")
# Extract parameters from body with default fallbacks
stream = body.get("stream", True)
max_tokens = body.get("max_tokens", self.valves.LLM_MAX_TOKENS)
temperature = body.get("temperature", self.valves.LLM_TEMPERATURE)
# Extract user ID from the body
user = body.get("user", {})
user_id = user.get("id", "")
# Extract user files if available
if user_id in self.file_contents:
user_files = self.file_contents[user_id]
else:
user_files = None
DO YOUR STUFF
return result
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
print(f"outlet:{__name__}")
print(f"Received body: {body}")
if user["id"] in self.file_contents:
del self.file_contents[user["id"]]
return body
```
Openwebui call the inlet, the pipe and the outlet every time the user send a query to the pipeline.
If you create a custom model (from the UI) using as base_model your pipeline, openWEBUI only call the pipe method (I don't understand why).
@sir3mat I tried the above method and printed just the body dictionary to see what is being passed. This dictionary seems to be only containing all the chat info like RAG prompt and context for the user query. It is not passing the complete document, which we want to do, so as to be able to perform our own custom retrieval through our pipeline
To reproduce:
from typing import List, Union, Generator, Iterator
class Pipeline:
def init(self):
self.name = "00 Repeater Example"
pass
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup")
pass
async def on_shutdown(self):
# This function is called when the server is shutdown.
print(f"on_shutdown")
pass
async def inlet(self, body: dict, user: dict) -> dict:
return body
def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]:
return (f"Type of body: {type(body)} \n {body}") #user_message to the UI
The body data which is given to the pipe(), does not contain details like filename, collection_name, so you have to get the details from the inlet function and store it in a variable like this
self.inlet_details = []
class Pipeline:
async def inlet(self, body: dict, user: dict) -> dict:
print(f"Received body: {body}")
files = body.get("files", [])
for file in files:
self.inlet_details.append({
"filename": file.get("filename", "unknown"),
"url": file.get("url", "unknown"),
})
using these additional details you can lookup the file in the uploads dir and get the content, there is also an alternative way where you can send a request to a webui endpoint using an API key from your account in webui settings. Hope this helps.
@jeandelest I also have this issue. Open Webui is segmenting my CSV file and making it newlines instead of commas. Let me know if you have found a solution.
Same issue for uploaded html files. the file["content"]["data"] variable does not contain the raw HTML data and instead some parsed version of it instead - how stupid
@chandan-artpark I don't think you can directly read from the uploads directory, since the pipelines container is a separate container to open-webui. I guess you would have to mount the uploads folder into the pipelines container.
I have same issue. Acess raw files and handle AIGC requests inside pipeline seems not possible still with latest build v0.5.20.
You can get the raw files by cloning the volume from the open webui container in the pipelines container.
I’ve been trying out different ways to upload files. I’m using the inlet function shared by @InquestGeronimo and the pipe function from @sir3mat. The goal is to send file info to an n8n workflow through a webhook.
I’ve tried sending the file URL and even the content, but nothing seems to work—it just doesn’t go through.
Here’s the code I’m using (based on Cole Medin’s n8n pipe):
"""
title: n8n Pipe Function
author: Cole Medin
author_url: https://www.youtube.com/@ColeMedin
version: 0.2.0
This module defines a Pipe class that utilizes N8N for an Agent
"""
from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time
import requests
def extract_event_info(event_emitter) -> tuple[Optional[str], Optional[str]]:
if not event_emitter or not event_emitter.__closure__:
return None, None
for cell in event_emitter.__closure__:
if isinstance(request_info := cell.cell_contents, dict):
chat_id = request_info.get("chat_id")
message_id = request_info.get("message_id")
return chat_id, message_id
return None, None
class Pipe:
class Valves(BaseModel):
n8n_url: str = Field(
default="https://n8n.[your domain].com/webhook/[your webhook URL]"
)
n8n_bearer_token: str = Field(default="...")
input_field: str = Field(default="chatInput")
response_field: str = Field(default="output")
emit_interval: float = Field(
default=2.0, description="Interval in seconds between status emissions"
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions"
)
def __init__(self):
self.type = "pipe"
self.id = "n8n_pipe"
self.name = "N8N Pipe"
self.valves = self.Valves()
self.last_emit_time = 0
self.file_contents = {}
pass
async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
current_time = time.time()
if (
__event_emitter__
and self.valves.enable_status_indicator
and (
current_time - self.last_emit_time >= self.valves.emit_interval or done
)
):
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
)
self.last_emit_time = current_time
async def inlet(self, body: dict, user: dict) -> dict:
print(f"Received body: {body}")
files = body.get("files", [])
for file in files:
content_url = file["url"] + "/content"
print(f"file available at {content_url}")
self.file_contents[user["id"]] = content_url
return body
async def pipe(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
__event_call__: Callable[[dict], Awaitable[dict]] = None,
) -> Optional[dict]:
await self.emit_status(
__event_emitter__, "info", "/Calling N8N Workflow...", False
)
chat_id, _ = extract_event_info(__event_emitter__)
messages = body.get("messages", [])
# Extract user ID from the body
user = body.get("user", {})
user_id = user.get("id", "")
# Verify a message is available
if messages:
question = messages[-1]["content"]
try:
# Invoke N8N workflow
headers = {
"Authorization": f"Bearer {self.valves.n8n_bearer_token}",
"Content-Type": "application/json",
}
payload = {"sessionId": f"{chat_id}"}
payload[self.valves.input_field] = question
payload["files"] = self.file_contents
response = requests.post(
self.valves.n8n_url, json=payload, headers=headers
)
if response.status_code == 200:
n8n_response = response.json()[self.valves.response_field]
else:
raise Exception(f"Error: {response.status_code} - {response.text}")
# Set assitant message with chain reply
body["messages"].append({"role": "assistant", "content": n8n_response})
except Exception as e:
await self.emit_status(
__event_emitter__,
"error",
f"Error during sequence execution: {str(e)}",
True,
)
return {"error": str(e)}
# If no message is available alert user
else:
await self.emit_status(
__event_emitter__,
"error",
"No messages found in the request body",
True,
)
body["messages"].append(
{
"role": "assistant",
"content": "No messages found in the request body",
}
)
await self.emit_status(__event_emitter__, "info", "Complete", True)
return n8n_response
I’ve tested a bunch of things, but still can’t figure it out. Has anyone run into this or know what might be going wrong? Any help would be really appreciated
I’ve been trying out different ways to upload files. I’m using the inlet function shared by @InquestGeronimo and the pipe function from @sir3mat. The goal is to send file info to an n8n workflow through a webhook.
I’ve tried sending the file URL and even the content, but nothing seems to work—it just doesn’t go through.
Here’s the code I’m using (based on Cole Medin’s n8n pipe):
""" title: n8n Pipe Function author: Cole Medin author_url: https://www.youtube.com/@ColeMedin version: 0.2.0 This module defines a Pipe class that utilizes N8N for an Agent """ from typing import Optional, Callable, Awaitable from pydantic import BaseModel, Field import os import time import requests def extract_event_info(event_emitter) -> tuple[Optional[str], Optional[str]]: if not event_emitter or not event_emitter.__closure__: return None, None for cell in event_emitter.__closure__: if isinstance(request_info := cell.cell_contents, dict): chat_id = request_info.get("chat_id") message_id = request_info.get("message_id") return chat_id, message_id return None, None class Pipe: class Valves(BaseModel): n8n_url: str = Field( default="https://n8n.[your domain].com/webhook/[your webhook URL]" ) n8n_bearer_token: str = Field(default="...") input_field: str = Field(default="chatInput") response_field: str = Field(default="output") emit_interval: float = Field( default=2.0, description="Interval in seconds between status emissions" ) enable_status_indicator: bool = Field( default=True, description="Enable or disable status indicator emissions" ) def __init__(self): self.type = "pipe" self.id = "n8n_pipe" self.name = "N8N Pipe" self.valves = self.Valves() self.last_emit_time = 0 self.file_contents = {} pass async def emit_status( self, __event_emitter__: Callable[[dict], Awaitable[None]], level: str, message: str, done: bool, ): current_time = time.time() if ( __event_emitter__ and self.valves.enable_status_indicator and ( current_time - self.last_emit_time >= self.valves.emit_interval or done ) ): await __event_emitter__( { "type": "status", "data": { "status": "complete" if done else "in_progress", "level": level, "description": message, "done": done, }, } ) self.last_emit_time = current_time async def inlet(self, body: dict, user: dict) -> dict: print(f"Received body: {body}") files = body.get("files", []) for file in files: content_url = file["url"] + "/content" print(f"file available at {content_url}") self.file_contents[user["id"]] = content_url return body async def pipe( self, body: dict, __user__: Optional[dict] = None, __event_emitter__: Callable[[dict], Awaitable[None]] = None, __event_call__: Callable[[dict], Awaitable[dict]] = None, ) -> Optional[dict]: await self.emit_status( __event_emitter__, "info", "/Calling N8N Workflow...", False ) chat_id, _ = extract_event_info(__event_emitter__) messages = body.get("messages", []) # Extract user ID from the body user = body.get("user", {}) user_id = user.get("id", "") # Verify a message is available if messages: question = messages[-1]["content"] try: # Invoke N8N workflow headers = { "Authorization": f"Bearer {self.valves.n8n_bearer_token}", "Content-Type": "application/json", } payload = {"sessionId": f"{chat_id}"} payload[self.valves.input_field] = question payload["files"] = self.file_contents response = requests.post( self.valves.n8n_url, json=payload, headers=headers ) if response.status_code == 200: n8n_response = response.json()[self.valves.response_field] else: raise Exception(f"Error: {response.status_code} - {response.text}") # Set assitant message with chain reply body["messages"].append({"role": "assistant", "content": n8n_response}) except Exception as e: await self.emit_status( __event_emitter__, "error", f"Error during sequence execution: {str(e)}", True, ) return {"error": str(e)} # If no message is available alert user else: await self.emit_status( __event_emitter__, "error", "No messages found in the request body", True, ) body["messages"].append( { "role": "assistant", "content": "No messages found in the request body", } ) await self.emit_status(__event_emitter__, "info", "Complete", True) return n8n_responseI’ve tested a bunch of things, but still can’t figure it out. Has anyone run into this or know what might be going wrong? Any help would be really appreciated
If you are using a Pipe class (type of function) you need to add files param in pipe() methods
Otherwise you need to use a Pipeline class and you can use the inlet
@sir3mat I just tried adding the files parameter directly to my pipe() method, but it still doesn’t work. Do you think I should consider switching to the Pipeline class instead?
@sir3mat I just tried adding the files parameter directly to my pipe() method, but it still doesn’t work. Do you think I should consider switching to the Pipeline class instead?
Sorry, due to MD formatting of GitHub response it parsed wrong param name
The param name is not "files" but "__files__" like this one https://github.com/open-webui/open-webui/blob/a3bb7df61058e690a76cebb7681bd5390e77d226/backend/open_webui/functions.py#L231
I just happened to have spend my morning writing some documentation about what I call "special arguments" like __file__. The PR is open there and might be of help to you. Also any feedback is appreciated.
@sir3mat you just saved me, i've been looking for a solution for weeks
Hey all, I also want to access the raw file bytes in my pipeline but can't piece it together based on the thread. @ignchap27 @thiswillbeyourgithub @sir3mat can you pls clarify where the __files__ parameter is supposed to go to access the raw file contents from a custom pipeline?
Hey all, I also want to access the raw file bytes in my pipeline but can't piece it together based on the thread. @ignchap27 @thiswillbeyourgithub @sir3mat can you pls clarify where the
__files__parameter is supposed to go to access the raw file contents from a custom pipeline?
If you are using a Pipeline class u need the inlet like this https://github.com/open-webui/pipelines/issues/164#issuecomment-2461707626
If you are using a Pipe class (a type of owui Function) you need to use __files__ param in pipe() method
Thank you, though I don't see any bytes in the file_info. I was able to get the bytes by sending an API request to OpenwebUI backend using the file-specific URL that's in the body:
url = os.environ["OPENWEBUI_URL"] + f["url"] + "/content"
response = requests.get(
url,
headers={
"Authorization": f"Bearer {os.environ['OPENWEBUI_API_KEY']}"
},
)
In the inlet, the url can be found in body["files"][i]["url']
The endpoint is predefined and lives on OpenWebUI's side, you can access user uploaded files through {OPENWEBUI_URL}/api/v1/files/{file_id}/content.
One thing to note is that, you need to send the Authorization header along with the request with value Bearer {API_KEY}.
You can retrieve the API KEY from the OpenWebUI's frontend: Settings -> Account -> API Keys -> JWT Token.
Currently, there is now way of knowing which file is just uploaded for the particular request.