feature: add river inspector that returns various info about the system
Motivation
I want to periodically upload metrics about jobs count, state, and kind.
The API that allows that would be great, but at the same time I have a workaround and I think this is a low prio issue.
API
I think about something similar as asynq inspector, but only with few methods like:
- JobsCountByState
- JobsCountByKind
- etc...
This API might be utilized by riverui, because right now riverui uses its own sql queries
API dosen't need to guarantee backward compatibility as it will probably change in upcoming versions.
Possible solutions
- River middleware that sends telemetry data about each job.
This would work, but many projects enable sampling to reduce the cost of storing large amount of data.
This approach requires a leader election to make sure only one instance sends data. Btw - I think exposing a leader election as an API would be a great value added by river.
- Collector
This is the most common setup and collector could aggregate metrics. But if you want to keep things simple then instances send the telemetry data straight to the external service.
- Perioding job + sql
Right now I use periodic job and custom sql queries to fetch the data and send it once a while.
Yeah, something like this seems like a good idea.
One problem that was raised recently is that some of those job count queries can get quite slow when job tables become too large. It makes me wonder if we should have some kind of system that tries to count things incrementally and keep track of stats, which would work fine, except then you either have only per-client stats, or need to aggregate them somehow.
We could also leverage the river_client table that already exists (I added it some time ago for future use, but haven't put it to work yet) so have clients write stats periodically which could be aggregated by any client easily/cheaply.
This would be awesome.
Today, I've implemented this myself as a periodic job + an event listener to get the following stats:
From an event listener:
- Events (job status moved PLUS queue pause/unpause)
- Time from when a job started to when it's completed/canceled (NOT the same as duration worked, this includes retries/etc. It's total time enqueued)
- Time waiting in queue (this is time before it gets worked)
- Run duration (this is the same as duration of job being worked)
From our MetricWorker:
- Jobs in state [whether worked or not!]
Only some of those are replaced by github.com/riverqueue/rivercontrib/otelriver.
Explicitly, otelriver produces the following metrics to my understanding:
- Number of jobs inserted
- Number of job batches inserted
- Duration of job batch insertion
- Number of jobs worked
- Duration of jobs being worked (this is only the duration of the Work() call, not the time from enqueue to completed, which I also find interesting)
The drawback of this approach is that it:
- Is a little annoying to maintain, other people's code is free ;)
- The periodic job pollutes the river job list, making riverui a little harder to use, and I imagine slows down any query that's using river. Eventually, I'll have to manually remove these jobs :scream:.
But, it's nice to have some healthcheck that river is working, and that we aren't succeeding at inserts, but never running the Work(). I feel that getting a periodic count of non-terminal jobs helps us monitor that (e.g. if an item is stuck in Available, we know our workers aren't working!)
Minimal copy of what we do:
NOTE: both the PeriodicJob registration and the go listenToQueueMetrics should happen on every host; the PeriodicJob registration river already dedupes, and the queue events function is entirely in-process, NOT using postgres notify / etc (so it only gets events that happened from this process)
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
)
// MetricReporting is a river job arg that is used to report metrics.
type MetricReporting struct{}
// MetricsQueue is the queue name for the _separate_ queue that runs metric reporting.
const MetricsQueue = "metrics"
// Kind is a string that uniquely identifies the type of job. This must be provided on your job arguments struct.
func (MetricReporting) Kind() string { return "MetricReporting" }
// InsertOpts returns the options to use when inserting this job into the queue.
func (MetricReporting) InsertOpts() river.InsertOpts {
return river.InsertOpts{
// we use a separate queue for metric reporting
// because we want to be able to pause the default queue for circuit breaking purposes
// but still have metrics coming in (to help us realize that the circuit has been broken)
Queue: MetricsQueue,
}
}
// MetricReporter is the worker for Metrics reporting
// MetricReporter will look for all _non-terminal_ jobs in all queues, and report their count.
type MetricReporter struct {
metric *prometheus.GaugeVec
river.WorkerDefaults[MetricReporting]
}
// NewMetricReporter returns a Metric Reporting Worker.
func NewMetricReporter() *MetricReporter {
return &MetricReporter{
metric: promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "myapp",
Subsystem: "river",
Name: "jobs",
Help: "The number of jobs in each [non-terminal] state",
}, []string{"job", "state"}),
}
}
// Work publishes one set of metrics.
func (w *MetricReporter) Work(ctx context.Context, _ *river.Job[MetricReporting]) error {
count := map[string]map[rivertype.JobState]int{}
jobListParams := river.NewJobListParams().States(rivertype.JobStateAvailable, rivertype.JobStatePending, rivertype.JobStateRetryable, rivertype.JobStateRunning, rivertype.JobStateScheduled)
for {
res, err := river.ClientFromContext[pgx.Tx](ctx).JobList(ctx, jobListParams)
if err != nil {
return fmt.Errorf("%w while listing jobs in metrics worker", err)
}
for _, j := range res.Jobs {
if count[j.Kind] == nil {
count[j.Kind] = map[rivertype.JobState]int{}
}
count[j.Kind][j.State]++
}
if res.LastCursor == nil {
break
}
jobListParams = jobListParams.After(res.LastCursor)
}
for kind, states := range count {
for state, c := range states {
w.metric.WithLabelValues(kind, string(state)).Set(float64(c))
}
}
return nil
}
// listenToQueueMetrics runs an HTTP server until the context is canceled that serves prometheus metrics on job runs.
func listenToQueueMetrics[TTx any](ctx context.Context, riverClient *river.Client[TTx]) {
events, cancel := riverClient.Subscribe(
river.EventKindJobCancelled,
river.EventKindJobCancelled,
river.EventKindJobFailed,
river.EventKindJobSnoozed,
river.EventKindQueuePaused,
river.EventKindQueueResumed,
)
defer cancel()
// objectives are how we ask prometheus for percentile buckets.
objectives := map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
counter := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "myapp",
Subsystem: "river",
Name: "events_total",
Help: "The number of events that have occurred.",
}, []string{"event", "job"})
completeDurationSummary := promauto.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "myapp",
Subsystem: "river",
Name: "complete",
Help: "Time it took to set the job completed, discarded, or errored.",
Objectives: objectives,
}, []string{"event", "job"})
queueWaitDurationSummary := promauto.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "myapp",
Subsystem: "river",
Name: "wait",
Help: "Time the job spent waiting in available state before starting execution.",
Objectives: objectives,
}, []string{"event", "job"})
runDurationSummary := promauto.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "myapp",
Subsystem: "river",
Name: "run",
Help: "Time job spent running (measured around job worker.)",
Objectives: objectives,
}, []string{"event", "job"})
for {
select {
case <-ctx.Done():
return
case event, ok := <-events:
if !ok {
return
}
labels := []string{string(event.Kind)}
// QueuePause/QueueResume events will not have an associated Job!
if event.Job != nil {
labels = append(labels, event.Job.Kind)
}
counter.WithLabelValues(labels...).Inc()
// QueuePause/QueueResume events will not have an associated JobStats!
if event.JobStats != nil {
completeDurationSummary.WithLabelValues(labels...).Observe(float64(event.JobStats.CompleteDuration.Milliseconds()))
queueWaitDurationSummary.WithLabelValues(labels...).Observe(float64(event.JobStats.CompleteDuration.Milliseconds()))
runDurationSummary.WithLabelValues(labels...).Observe(float64(event.JobStats.CompleteDuration.Milliseconds()))
}
}
}
}
func main() {
ctx := context.Background()
pool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
log.Fatal(err)
}
workers := river.NewWorkers()
river.AddWorker(workers, NewMetricReporter())
cl, err := river.NewClient(
riverpgxv5.New(pool),
&river.Config{
Workers: workers,
PeriodicJobs: []*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(time.Minute),
func() (river.JobArgs, *river.InsertOpts) {
return MetricReporting{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true},
),
},
},
)
if err != nil {
log.Fatal(err)
}
go listenToQueueMetrics(ctx, cl)
if err := cl.Start(ctx); err != nil {
log.Fatal(err)
}
}
Having something like that built into river would be :heart:
Oh, to say a little more explicitly, based on @krhubert's opening:
subscriptions This approach requires a leader election to make sure only one instance sends data. Btw - I think exposing a leader election as an API would be a great value added by river.
Subscriptions should not need leader election -- from my last reading of the code, the metrics are per-process, not per postgres schema. So you need a metric aggregator (e.g. have grafana or datadog use sum() instead of avg() for some things). Today in my codebase we have every worker and every insertOnly client listen to the subscriptions with the above code sample, and I didn't see any duplicative events.
Following on from a related discussion here:
I've Implemented a prometheus metrics Collector - whose Collect() method ranges over all possible job states and job kinds, and queries the riverclient via JobList().
For each retrieved job kind/state combo I update a prometheus Gauge metric.
This collector is registered, and scraped periodically according to my prometheus scrape interval.
This way I get gauge metrics for the jobs of each kind, in each state. I can then eg alert off jobs of a particular type being stuck in state="retryable" for over an hour, or whatever (if this represents a bad kind of failure).
Another thing you can do to get some job stat metrics is subscribe to events using the river client's .Subscribe(). Then, listening on the returned channel, every time an event is emitted, update "run duration" , "complete duration" and "queue wait duration" gauge metrics, as well as a "riverJobEvent" counter metric (labelled by event kind and job kind).