luigi icon indicating copy to clipboard operation
luigi copied to clipboard

Possibility to report more metrics in MetricsCollector

Open mateka opened this issue 11 months ago • 1 comments

I would like to monitor more metrics, than collected by standard PrometheusMetricsCollector. In order to do that, I would prepare custom MetricsCollector and TaskContext. The latter would gather metrics for each task and report it to the scheduler, which would pass them to some subclass of MetricsCollector class.

For example, I would like to monitor local tasks running in subprocesses. To do that, I had created classes:

class TaskContext:
    """Calculates time and memory for task's process"""
    def __init__(self, task_process):
        self._task_process = task_process
        self._start = None

    def __enter__(self):
        self._start = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        assert self._start is not None
        elapsed = time.perf_counter() - self._start
        used_memory = max(
            resource.getrusage(resource.RUSAGE_SELF).ru_maxrss, resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss
        )
        self._task_process.status_reporter.report_task_statistics({"memory": used_memory / 1024, "elapsed": elapsed})

and

class MetricCollector(PrometheusMetricsCollector):
   """PrometheusMetricsCollector extended to collect task's elapsed time and used RAM"""
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.task_run_execution_time = Gauge(
            'luigi_task_run_execution_time_seconds',
            'luigi task run method execution time in seconds',
            self.labels,
            registry=self.registry
        )
        self.task_execution_memory = Gauge(
            'luigi_task_max_memory_megabytes',
            'luigi task run method max memory usage in megabytes',
            self.labels,
            registry=self.registry
        )
    
    def handle_task_statistics(self, task, statistics):
        if "elapsed" in statistics:
            self.task_run_execution_time.labels(**self._generate_task_labels(task)).set(statistics["elapsed"])
        if "memory" in statistics:
            self.task_execution_memory.labels(**self._generate_task_labels(task)).set(statistics["memory"])

Proper luigi.cfg:

[scheduler]
metrics_collector = custom
metrics_custom_import = metrics_collector.MetricCollector

[worker]
task_process_context = task_context.TaskContext

[prometheus]
use_task_family_in_labels = true
task_parameters_to_use_in_labels = ["x","y"]

For the above mentioned example to work, luigi has to be modified by adding report_task_statistics method to scheduler and exposed to TaskStatusReporter. Is this small and generic enough approach, to merge it into luigi?

mateka avatar Feb 14 '25 15:02 mateka

I created implementation in https://github.com/spotify/luigi/pull/3343

mateka avatar Feb 14 '25 15:02 mateka