asynq
asynq copied to clipboard
[FEATURE REQUEST] Add message headers
I'm struggling a bit with passing trace id with the task. Cause there is no way to get it inside middleware without unmarshaling the payload. This adds unnecessary dependencies to middleware code.
I suggest adding map[string]string
with headers alongside with payload which lets users pass meta info alongside with payload.
P.S.: Big thank you for your work. Sorry if i missed something in the docs and this actually possible.
Unfortunately not. I inject trace_id and other meta info into context to use it inside Handlers but it didn't save me from exposing the inner structure of concrete handlers to middleware which I'm wanna avoid.
Hmm I see. Maybe similar to this feature request #326?
Could you give me a more concrete example of what you're trying to achieve? Code snippet would be very helpful here :)
In our project we run asynq server and client in two different processes. And to pass traceID i was forced to write middleware like this:
type TaskType1 struct {
TraceID string
OtherData []byte
URL string
}
type TaskType2 struct {
TraceID string
Name string
FullAddress string
URL string
}
func TraceInjectorMiddleware(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
var traceID string
switch t.Type() {
case "type1":
task := TaskType1{}
_ = json.Unmarshal(t.Payload(), &task)
traceID = task.TraceID
case "type2":
task := TaskType2{}
_ = json.Unmarshal(t.Payload(), &task)
traceID = task.TraceID
}
return h.ProcessTask(context.WithValue(ctx, "traceID", traceID), t)
})
}
I suggest adding headers:
// Task represents a unit of work to be performed.
type Task struct {
// typename indicates the type of task to be performed.
typename string
// payload holds data needed to perform the task.
payload []byte
headers map[string]string
// opts holds options for the task.
opts []Option
// w is the ResultWriter for the task.
w *ResultWriter
}
func (t *Task) Headers() map[string]string { return t.headers }
And this change will help to simplify middleware like this:
func TraceInjectorMiddleware(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
traceID, ok := t.Headers()["traceID"]
if ok {
ctx = context.WithValue(ctx, "traceID", traceID)
}
return h.ProcessTask(ctx, t)
})
}
Would a partial unmarshal work here? Since you dont need the others fields, just unmarshal on a struct with the fields that you need and are common to the tasks.
type TaskType1 struct {
TraceID string
OtherData []byte
URL string
}
type TaskType2 struct {
TraceID string
Name string
FullAddress string
URL string
}
type TaskWithTraceID struct {
TraceID string
}
func TraceInjectorMiddleware(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
var traceID string
task := TaskWithTraceID{}
_ = json.Unmarshal(t.Payload(), &task)
traceID = task.TraceID
return h.ProcessTask(context.WithValue(ctx, "traceID", traceID), t)
})
}
Or you could access the payload using another library for json handling like: https://github.com/tidwall/gjson
@crossworth thank you for your suggestion. But I also want to exclude this info from the payload to not affect uniqueness of the task. For example, we have a few places in code which can add tasks to the same queue. Those tasks will have different trace ID and different payloads. To avoid duplications of work I have to use the TaskID. This will work perfectly with your solution. But I believe that meta info in headers more explicit and more simple solution.
Sorry for the delay in response here!
Let me think about this for a bit more since once a feature is added, it's hard to remove if we decide otherwise later.
@hibiken Any news? I long for this feature. Thanks for your work
@hibiken Any news? If you accept this feature, I can continue to implement it in the PR https://github.com/hibiken/asynq/pull/606
pls delegate the project owning to active contributors