pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

Limit concurrency in pipeline

Open Sebulba46 opened this issue 9 months ago • 0 comments

I'm currently running pipeline with class Pipeline, having def init_db_connection. Which runs on startup in async def on_startup(). Is There any way to limit concurrency?

class Pipeline:
    class Valves(BaseModel):
        DB_HOST: str
        DB_PORT: str
        DB_USER: str
        DB_PASSWORD: str
        DB_DATABASE: str
        DB_TABLES: List[str]
        VLLM_HOST: str
        OPENAI_API_KEY: str
        TEXT_TO_SQL_MODEL: str

    def __init__(self):
        self.name = "01 Database RAG Pipeline vLLM llama"
        self.conn = None
        self.cur = None
        self.nlsql_response = ""

        self.valves = ...


  def init_db_connection(self):
          connection_params = {
              'dbname': self.valves.DB_DATABASE,
              'user': self.valves.DB_USER,
              'password': self.valves.DB_PASSWORD,
              'host': self.valves.DB_HOST.split('//')[-1],  # Remove the http:// or https:// prefix if present
              'port': self.valves.DB_PORT
          }
  
          try:
              self.conn = psycopg2.connect(**connection_params)
              print("Connection to PostgreSQL established successfully")
          except Exception as e:
              print(f"Error connecting to PostgreSQL: {e}")
  
          # Create a cursor object
          self.cur = self.conn.cursor()
  
          # Query to get the list of tables
          self.cur.execute("""
              SELECT table_schema, table_name
              FROM information_schema.tables
              WHERE table_type = 'BASE TABLE'
              AND table_schema NOT IN ('information_schema', 'pg_catalog');
          """)
  
          # Fetch and print the table names
          tables = self.cur.fetchall()
          print("Tables in the database:")
          for schema, table in tables:
              print(f"{schema}.{table}")
  
  
          # Query to get the column names
          self.cur.execute("""SELECT json_object_keys(to_json(json_populate_record(NULL::public.do_10vc_speed, '{}'::JSON)))""")
  
          #Fetch and print the column names
          columns = self.cur.fetchall()
          print("Columns in the database:")
          print(f"{columns}")
  
  
      async def on_startup(self):
          self.init_db_connection()
  
      async def on_shutdown(self):
          self.cur.close()
          self.conn.close()

Sebulba46 avatar Mar 06 '25 09:03 Sebulba46