pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

How can I debug pipelines using breakpoints?

Open risjain opened this issue 10 months ago • 0 comments

Hello,

I am really struggling with developing my custom pipelines - and want to use the traditional breakpoint approach to debug using VSCode - is that doable?

I tried to run the docker container inside VM but got terribly lost with the process of modifying dockerfile to allow breakpoints.

Finally, I tried to convert the start.sh to a python file - that I could use to achieve the same outcomes.

import os
import argparse
import subprocess
import shutil
import requests
from pathlib import Path

class PipelineManager:
    def __init__(self):
        self.port = os.getenv("PORT", "9099")
        self.host = os.getenv("HOST", "0.0.0.0")
        self.pipelines_dir = Path(os.getenv("PIPELINES_DIR", "pipelines"))
        self.reset_pipelines = os.getenv("RESET_PIPELINES_DIR", "false").lower() == "true"
        self.pipelines_urls = os.getenv("PIPELINES_URLS", "").split(';')

    def reset_pipelines_dir(self):
        if self.reset_pipelines:
            print(f"Resetting pipelines directory: {self.pipelines_dir}")
            if self.pipelines_dir.exists():
                shutil.rmtree(self.pipelines_dir)
            self.pipelines_dir.mkdir(parents=True, exist_ok=True)

    def install_requirements(self, requirements_path):
        req_file = Path(requirements_path)
        if req_file.exists():
            print(f"Installing requirements from {req_file}")
            subprocess.run(["pip", "install", "-r", str(req_file)], check=True)
        else:
            print(f"Requirements file not found: {req_file}")

    def download_pipelines(self):
        for url in self.pipelines_urls:
            url = url.strip('"')
            print(f"Downloading from {url}")
            
            if url.startswith("https://github.com"):
                if '/blob/' in url:
                    self._download_github_file(url)
                elif '/tree/' in url:
                    self._clone_github_repo(url)
            elif url.endswith('.py'):
                self._download_python_file(url)

    def _download_github_file(self, url):
        raw_url = url.replace('/blob/', '/raw/')
        filename = raw_url.split('/')[-1]
        dest = self.pipelines_dir / filename
        
        response = requests.get(raw_url)
        dest.write_text(response.text)
        print(f"Downloaded {filename}")

    def _clone_github_repo(self, url):
        repo_url, tree_part = url.split('/tree/')
        subprocess.run([
            "git", "clone", "--depth", "1", 
            "--filter=blob:none", "--sparse", repo_url, 
            str(self.pipelines_dir)
        ], check=True)
        
        subprocess.run(
            ["git", "sparse-checkout", "set", tree_part],
            cwd=self.pipelines_dir,
            check=True
        )

    def _download_python_file(self, url):
        filename = url.split('/')[-1]
        dest = self.pipelines_dir / filename
        
        response = requests.get(url)
        dest.write_text(response.text)
        print(f"Downloaded {filename}")

    def install_frontmatter_requirements(self):
        for file in self.pipelines_dir.glob("*.py"):
            content = file.read_text()
            if '"""' in content:
                requirements = self._parse_frontmatter(content)
                if requirements:
                    print(f"Installing requirements from {file.name}")
                    subprocess.run(["pip", "install"] + requirements, check=True)

    def _parse_frontmatter(self, content):
        frontmatter = content.split('"""')[1]
        for line in frontmatter.split('\n'):
            if line.lower().startswith('requirements:'):
                return line.split(':')[1].strip().split()
        return None

    def run_server(self):
        subprocess.run([
            "uvicorn", "main:app", 
            "--host", self.host, 
            "--port", self.port, 
            "--forwarded-allow-ips", "*"
        ], check=True)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--mode", choices=["setup", "run", "full"], default="full")
    args = parser.parse_args()

    manager = PipelineManager()
    
    if args.mode in ["setup", "full"]:
        manager.reset_pipelines_dir()
        manager.download_pipelines()
        manager.install_frontmatter_requirements()
    
    if args.mode in ["run", "full"]:
        manager.run_server()

None of these are working. Please help me debug this.

risjain avatar Mar 01 '25 03:03 risjain