indexify
indexify copied to clipboard
Implement Comprehensive Monitoring in Python SDK
Implement Comprehensive Monitoring for Long-running Workflows
Problem Description
Indexify currently lacks a comprehensive built-in solution for monitoring long-running workflows. This makes it difficult for users to track the progress, performance, and resource usage of their pipelines, especially in production environments.
Current Limitations
-
Limited Progress Tracking: In
remote_client.py
, theinvoke_graph_with_object
method provides basic event information:print(f"[bold green]{event.event_name}[/bold green]: {event.payload}")
However, this doesn't give a clear picture of overall progress or estimated completion time.
-
No Performance Metrics: The
FunctionWorker
class infunction_worker.py
doesn't collect or report any performance metrics:class FunctionWorker: def __init__(self, workers: int = 1) -> None: self._executor: concurrent.futures.ProcessPoolExecutor = ( concurrent.futures.ProcessPoolExecutor(max_workers=workers) )
There's no tracking of execution time, memory usage, or CPU utilization.
-
Lack of Centralized Logging: The current logging is scattered and inconsistent. For example, in
agent.py
:console.print(f"[bold]task-reporter[/bold] uploading output of size: {len(completed_task.outputs or [])}")
This approach doesn't provide a centralized, queryable log of system events and errors.
-
No Real-time Monitoring Interface: There's no built-in way for users to view the current state of their workflows in real-time.
Benefits of Implementing Monitoring
- Improved Observability: Users will be able to track the progress of their workflows, identify bottlenecks, and estimate completion times.
- Performance Optimization: Collected metrics will help users optimize their workflows and resource allocation.
- Easier Debugging: Comprehensive logging and error reporting will make it easier to identify and fix issues in complex workflows.
- Resource Management: Monitoring resource usage will help prevent out-of-memory errors and optimize cloud resource allocation.
Proposed Solution
Implement a comprehensive monitoring system with the following components:
-
Metrics Collection:
- Add a
Metrics
class to collect and aggregate performance data. - Instrument key methods in
FunctionWorker
,Graph
, andRemoteClient
to collect metrics.
- Add a
-
Centralized Logging:
- Implement a
Logger
class that provides structured logging with different severity levels. - Replace print statements with calls to the logger.
- Add context information (e.g., graph name, function name) to log messages.
- Implement a
-
Progress Tracking:
- Extend the
Graph
class to include progress information for each node. - Implement a progress calculation algorithm that considers the graph structure.
- Modify
RemoteClient
to report progress updates.
- Extend the
-
Real-time Monitoring Interface:
- Create a
Monitor
class that aggregates metrics, logs, and progress information. - Implement a simple web interface using Flask or FastAPI to display real-time monitoring data.
- Create visualizations for metrics and progress (e.g., using Plotly).
- Create a
-
Alerting System:
- Add configurable alerts for specific events or metric thresholds.
- Implement notification mechanisms (e.g., email, Slack) for alerts.
-
**Testing
- Write unit tests for new classes and methods.
- Update existing tests to work with new monitoring system.
Related Issues
- #891 : Improve error handling in Python SDK