pipelines
pipelines copied to clipboard
Limit concurrency in pipeline
I'm currently running pipeline with class Pipeline, having def init_db_connection. Which runs on startup in async def on_startup(). Is There any way to limit concurrency?
class Pipeline:
class Valves(BaseModel):
DB_HOST: str
DB_PORT: str
DB_USER: str
DB_PASSWORD: str
DB_DATABASE: str
DB_TABLES: List[str]
VLLM_HOST: str
OPENAI_API_KEY: str
TEXT_TO_SQL_MODEL: str
def __init__(self):
self.name = "01 Database RAG Pipeline vLLM llama"
self.conn = None
self.cur = None
self.nlsql_response = ""
self.valves = ...
def init_db_connection(self):
connection_params = {
'dbname': self.valves.DB_DATABASE,
'user': self.valves.DB_USER,
'password': self.valves.DB_PASSWORD,
'host': self.valves.DB_HOST.split('//')[-1], # Remove the http:// or https:// prefix if present
'port': self.valves.DB_PORT
}
try:
self.conn = psycopg2.connect(**connection_params)
print("Connection to PostgreSQL established successfully")
except Exception as e:
print(f"Error connecting to PostgreSQL: {e}")
# Create a cursor object
self.cur = self.conn.cursor()
# Query to get the list of tables
self.cur.execute("""
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
AND table_schema NOT IN ('information_schema', 'pg_catalog');
""")
# Fetch and print the table names
tables = self.cur.fetchall()
print("Tables in the database:")
for schema, table in tables:
print(f"{schema}.{table}")
# Query to get the column names
self.cur.execute("""SELECT json_object_keys(to_json(json_populate_record(NULL::public.do_10vc_speed, '{}'::JSON)))""")
#Fetch and print the column names
columns = self.cur.fetchall()
print("Columns in the database:")
print(f"{columns}")
async def on_startup(self):
self.init_db_connection()
async def on_shutdown(self):
self.cur.close()
self.conn.close()