Possibility to report more metrics in MetricsCollector
I would like to monitor more metrics, than collected by standard PrometheusMetricsCollector. In order to do that, I would prepare custom MetricsCollector and TaskContext. The latter would gather metrics for each task and report it to the scheduler, which would pass them to some subclass of MetricsCollector class.
For example, I would like to monitor local tasks running in subprocesses. To do that, I had created classes:
class TaskContext:
"""Calculates time and memory for task's process"""
def __init__(self, task_process):
self._task_process = task_process
self._start = None
def __enter__(self):
self._start = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
assert self._start is not None
elapsed = time.perf_counter() - self._start
used_memory = max(
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss, resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss
)
self._task_process.status_reporter.report_task_statistics({"memory": used_memory / 1024, "elapsed": elapsed})
and
class MetricCollector(PrometheusMetricsCollector):
"""PrometheusMetricsCollector extended to collect task's elapsed time and used RAM"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.task_run_execution_time = Gauge(
'luigi_task_run_execution_time_seconds',
'luigi task run method execution time in seconds',
self.labels,
registry=self.registry
)
self.task_execution_memory = Gauge(
'luigi_task_max_memory_megabytes',
'luigi task run method max memory usage in megabytes',
self.labels,
registry=self.registry
)
def handle_task_statistics(self, task, statistics):
if "elapsed" in statistics:
self.task_run_execution_time.labels(**self._generate_task_labels(task)).set(statistics["elapsed"])
if "memory" in statistics:
self.task_execution_memory.labels(**self._generate_task_labels(task)).set(statistics["memory"])
Proper luigi.cfg:
[scheduler]
metrics_collector = custom
metrics_custom_import = metrics_collector.MetricCollector
[worker]
task_process_context = task_context.TaskContext
[prometheus]
use_task_family_in_labels = true
task_parameters_to_use_in_labels = ["x","y"]
For the above mentioned example to work, luigi has to be modified by adding report_task_statistics method to scheduler and exposed to TaskStatusReporter. Is this small and generic enough approach, to merge it into luigi?
I created implementation in https://github.com/spotify/luigi/pull/3343