Enable "execute hooks" for worked jobs
Why?
- Opening a span for tracing and stashing it on the
ctx - Generic "starting job" logging
(Distinct from https://riverqueue.com/docs/subscriptions because this needs to be synchronous / needs direct access to ctx.)
User experience
Define an optional hooks interface
// WorkerWithHooks is a job worker that provides hooks.
type WorkerWithHooks[T JobArgs] interface {
PreRun(ctx context.Context, job *Job[T]) (context.Context, error) // Naming inspired by cobra's PreRun
}
and then use a type assertion to check if this optional interface is satisfied by a river.Worker.
Implementation
E.g. in jobExecutor{}.execute() you could make it possible to run a hook like
ctx, err := e.WorkUnit.PreWork(ctx)
And you could invoke in wrapperWorkUnit{}.PreWork() similar to Work()
// noOpPreRun is a `PreRun()` function that does nothing.
func noOpPreRun[T JobArgs](ctx context.Context, _ *Job[T]) (context.Context, error) {
return ctx, nil
}
func (w *workUnitFactoryWrapper[T]) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit {
preRun := noOpPreRun[T]
// To make this type assertion cheaper, it should probably happen when
// `workUnitFactoryWrapper{}` is created (not when `MakeUnit()` is invoked)
wh, ok := w.worker.(WorkerWithHooks[T])
if ok && wh.PreRun != nil {
preRun = wh.PreRun
}
return &wrapperWorkUnit[T]{jobRow: jobRow, worker: w.worker, preRun: preRun}
}
// wrapperWorkUnit implements workUnit for a job and Worker.
type wrapperWorkUnit[T JobArgs] struct {
job *Job[T] // not set until after UnmarshalJob is invoked
jobRow *rivertype.JobRow
worker Worker[T]
preRun func(ctx context.Context, job *Job[T]) (context.Context, error)
}
Thanks @dhermes.
I'll have to think about this one a bit more. I do like the idea of providing a reasonably easy way for getting generic telemetry, but would like to avoid the job API from becoming too sprawling and difficult to understand. The addition of one function wouldn't seem to be a problem, but once you've added PreRun, it'd feel kind of weird not to also have PostRun (the latter being somewhat more difficult to implement correctly in case it accidentally caused a failure for a successfully completed job), and things might expand from there. Cobra might act as a good example for what to try and avoid — it's fully featured alright, but has a massive surface area and is difficult to grok.
In some of our internal jobs/handlers, we do something like:
type MyWorker struct {
servicebase.BaseService
}
func (w *MyWorker) Run(ctx context.Context) {
ctx := w.BaseStart(ctx)
defer w.BaseFinish(ctx)
...
}
It does take a little more boilerplate in each job/handler, but it's more visible, and has presented very few problems over the years. I'd be tempted to propose a similar convention for River.
I'll have to think about this one a bit more
SGTM. Thanks for being thoughtful and deliberate! Using the struct-embedding trick will certainly work great for now, thanks.
I'm new to river (and go in general) but also am implementing telemetry in my workers.
They way I'm approaching it is the worker struct has a private tracer and on the work function, the span is created from the tracer. I also create a child logger using the span context so all logging is traceable.
@dbhoot @dhermes we settled on this pattern for telemetry
https://gfx.cafe/util/go/-/blob/master/fxriver/traceWorker.go?ref_type=heads https://gfx.cafe/util/go/-/blob/master/fxriver/hack.go?ref_type=heads
we wrap every worker with a traceworker at registration time with a custom registration struct
https://gfx.cafe/util/go/-/blob/master/fxriver/river.go?ref_type=heads#L153
not sure if we will wrap more hooks in the future.
@elee1766
Is my reading of this correct that the trace worker pulls the globally registered tracer from the trace provider? How does this work when telemetry is not enabled (for example a dev environment or local).
I'm essentially doing the same thing -- for the most part. I have a private begin function i call on every Work.
import (
"context"
"go.opentelemetry.io/otel/trace"
)
func begin(ictx context.Context, t trace.Tracer, name string) (context.Context, func(...trace.SpanEndOption)) {
if t == nil {
return ictx, func(o ...trace.SpanEndOption) {}
}
ctx, span := t.Start(ictx, name)
return ctx, span.End
}
func (w *Someworker) Work(
ictx context.Context,
job *river.Job[SomeArgs],
) error {
ctx, cleanup := begin(ictx, w.tracer, fmt.Sprintf("%s:%d", job.Args.Kind(), job.ID))
defer cleanup()
logger := log.LoggerWithTraceContext(w.log, ctx)
...
Looks like a fairly straightforward migration
@elee1766
Is my reading of this correct that the trace worker pulls the globally registered tracer from the trace provider? How does this work when telemetry is not enabled (for example a dev environment or local).
I'm essentially doing the same thing -- for the most part. I have a private begin function i call on every Work.
import ( "context" "go.opentelemetry.io/otel/trace" ) func begin(ictx context.Context, t trace.Tracer, name string) (context.Context, func(...trace.SpanEndOption)) { if t == nil { return ictx, func(o ...trace.SpanEndOption) {} } ctx, span := t.Start(ictx, name) return ctx, span.End }func (w *Someworker) Work( ictx context.Context, job *river.Job[SomeArgs], ) error { ctx, cleanup := begin(ictx, w.tracer, fmt.Sprintf("%s:%d", job.Args.Kind(), job.ID)) defer cleanup() logger := log.LoggerWithTraceContext(w.log, ctx) ... Looks like a fairly straightforward migration
yes exactly, it's using global trace initialized elsewhere. in development, it's all noop tracers if no initialization done.
I wanted to avoid the mistake of forgetting the hook, hence forcibly doing it at registration time.
Please check out the job insertion and worker middleware added in #632 / v0.13.0-rc.1. I believe that should solve the use cases I'm aware of here. Please also report back if you try it and it works or doesn't! :pray: