asynq icon indicating copy to clipboard operation
asynq copied to clipboard

[FEATURE REQUEST] Add message headers

Open AlexeyBelezeko opened this issue 2 years ago • 12 comments

I'm struggling a bit with passing trace id with the task. Cause there is no way to get it inside middleware without unmarshaling the payload. This adds unnecessary dependencies to middleware code.

I suggest adding map[string]string with headers alongside with payload which lets users pass meta info alongside with payload.

P.S.: Big thank you for your work. Sorry if i missed something in the docs and this actually possible.

AlexeyBelezeko avatar Feb 01 '22 09:02 AlexeyBelezeko

@AlexeyBelezeko Thank you for opening a feature request!

Is it possible to use context values?

hibiken avatar Feb 01 '22 14:02 hibiken

Unfortunately not. I inject trace_id and other meta info into context to use it inside Handlers but it didn't save me from exposing the inner structure of concrete handlers to middleware which I'm wanna avoid.

AlexeyBelezeko avatar Feb 02 '22 10:02 AlexeyBelezeko

Hmm I see. Maybe similar to this feature request #326?

Could you give me a more concrete example of what you're trying to achieve? Code snippet would be very helpful here :)

hibiken avatar Feb 03 '22 01:02 hibiken

In our project we run asynq server and client in two different processes. And to pass traceID i was forced to write middleware like this:

type TaskType1 struct {
	TraceID   string
	OtherData []byte
	URL       string
}

type TaskType2 struct {
	TraceID     string
	Name        string
	FullAddress string
	URL         string
}

func TraceInjectorMiddleware(h asynq.Handler) asynq.Handler {
	return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
		var traceID string
		switch t.Type() {
		case "type1":
			task := TaskType1{}
			_ = json.Unmarshal(t.Payload(), &task)
			traceID = task.TraceID
		case "type2":
			task := TaskType2{}
			_ = json.Unmarshal(t.Payload(), &task)
			traceID = task.TraceID
		}
		return h.ProcessTask(context.WithValue(ctx, "traceID", traceID), t)
	})
}

I suggest adding headers:

// Task represents a unit of work to be performed.
type Task struct {
	// typename indicates the type of task to be performed.
	typename string

	// payload holds data needed to perform the task.
	payload []byte

	headers map[string]string

	// opts holds options for the task.
	opts []Option

	// w is the ResultWriter for the task.
	w *ResultWriter
}

func (t *Task) Headers() map[string]string { return t.headers }

And this change will help to simplify middleware like this:

func TraceInjectorMiddleware(h asynq.Handler) asynq.Handler {
	return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
		traceID, ok := t.Headers()["traceID"]
		if ok {
			ctx = context.WithValue(ctx, "traceID", traceID)
		}
		return h.ProcessTask(ctx, t)
	})
}

AlexeyBelezeko avatar Feb 04 '22 12:02 AlexeyBelezeko

Would a partial unmarshal work here? Since you dont need the others fields, just unmarshal on a struct with the fields that you need and are common to the tasks.

type TaskType1 struct {
	TraceID   string
	OtherData []byte
	URL       string
}

type TaskType2 struct {
	TraceID     string
	Name        string
	FullAddress string
	URL         string
}

type TaskWithTraceID struct {
	TraceID string
}

func TraceInjectorMiddleware(h asynq.Handler) asynq.Handler {
	return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
		var traceID string
		task := TaskWithTraceID{}
		_ = json.Unmarshal(t.Payload(), &task)
		traceID = task.TraceID
		return h.ProcessTask(context.WithValue(ctx, "traceID", traceID), t)
	})
}

Or you could access the payload using another library for json handling like: https://github.com/tidwall/gjson

crossworth avatar Feb 04 '22 13:02 crossworth

@crossworth thank you for your suggestion. But I also want to exclude this info from the payload to not affect uniqueness of the task. For example, we have a few places in code which can add tasks to the same queue. Those tasks will have different trace ID and different payloads. To avoid duplications of work I have to use the TaskID. This will work perfectly with your solution. But I believe that meta info in headers more explicit and more simple solution.

AlexeyBelezeko avatar Feb 04 '22 16:02 AlexeyBelezeko

Sorry for the delay in response here!

Let me think about this for a bit more since once a feature is added, it's hard to remove if we decide otherwise later.

hibiken avatar Feb 09 '22 03:02 hibiken

@hibiken Any news? I long for this feature. Thanks for your work

zhangzitao avatar Oct 08 '22 06:10 zhangzitao

@hibiken Any news? If you accept this feature, I can continue to implement it in the PR https://github.com/hibiken/asynq/pull/606

rapita avatar May 10 '23 11:05 rapita

pls delegate the project owning to active contributors

dennypenta avatar May 10 '23 12:05 dennypenta