asynq icon indicating copy to clipboard operation
asynq copied to clipboard

[FEATURE REQUEST] Add Metadata/Headers Field for Distributed Tracing in asynq.Task

Open alabarjasteh opened this issue 1 year ago • 2 comments

Is your feature request related to a problem? Please describe. I am attempting to integrate distributed tracing into the task scheduler, but currently, there is no proper way to transfer tracing information between clients and workers. I am looking for something analogous to headers in HTTP or RabbitMQ or metadata in gRPC, which could be used to inject tracing context on the client (producer) side and be extracted on the server (consumer) side. Without such headers, I have to add a "tracingCarrier" to every task struct and unmarshal the Asynq payload into that struct.

Here is an example from the Quickstart modified as explained above:

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/hibiken/asynq"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/propagation"
)

const TypeEmailDelivery = "email:deliver"

type EmailDeliveryPayload struct {
	Metadata

	UserID     int
	TemplateID string
}

var _ propagation.TextMapCarrier = EmailDeliveryPayload{}

func NewEmailDeliveryTask(ctx context.Context, userID int, tmplID string) (*asynq.Task, error) {
	md := Metadata{}
	md = inject(ctx, md)

	payload, err := json.Marshal(EmailDeliveryPayload{Metadata: md, UserID: userID, TemplateID: tmplID})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeEmailDelivery, payload), nil
}


func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
	var p EmailDeliveryPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
	}

	ctx = extract(ctx, p.Metadata)
	_, span := taskTracer.Start(ctx, "task handler")
	defer span.End()

	log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
	// Email delivery code ...

	return nil
}

Describe the solution you'd like I suggest adding a Metadata or Header field of type map[string]interface{} in the asynq.Task structure. This field would serve as a suitable place to store data related to tracing.

alabarjasteh avatar Nov 25 '23 09:11 alabarjasteh

#547 #774

shuqingzai avatar Dec 22 '23 01:12 shuqingzai

👍 I would like to have a similar feature as well.

al-bglhk avatar Feb 27 '24 02:02 al-bglhk