watermill
watermill copied to clipboard
amqp message rejected if header is not string
I've got message with header that is not string, and it gets rejected by watermill:
[watermill] 2019/04/17 15:48:14.371452 subscriber.go:245: level=ERROR msg="Processing message failed, sending nack" amqp_exchange_name=x amqp_queue_name=y err="metadata x-death is not a string, but []interface {}....
x-death
header is added by dead letter, and I would like to process this message again after some time in DL.
My flow looks like this:
- try to process message,
- after few NACKs message gets moved to dead letter queue not to block main queue,
- there's policy on DL that moves message back to main queue after some few minutes to try to process it again.
Most simple solution is to ignore headers that are not strings and not add them to Metadata - this would work me.
I'm willing to make fix/PR, but please advise if that's good solution or should I do it in some other way (maybe flattening
Metadata are strings because almost all of Pub/Subs are supporting only simple types for metadata. So the main thing that we need to keep is to make amqp.Marshaler
transparent, so you can make multiple Pub/Subs pipelines.
So to solve it and keep backward compatibility we can make custom marshaling of x-death
to string. What values it contains? Some ints? And do you know about any other headers in rabbit which are not string?
Unfortunately it will not work for all non-string headers, but the question is that is there any default non-string headers? If someone have something custom, he should probably to implement custom marshaller.
Until now it's the only one that makes problems, my example value (dump by spew):
([]interface {})[
(amqp.Table)map[
time:(time.Time)2019-04-16 18:50:19 +0000 UTC
count:(int64)642
exchange:(string)TEST.E.Fanout.xxx.Worker.DL
queue:(string)TEST.Q.xxx.Worker.DL
reason:(string)expired
routing-keys:([]interface {})[(string)TEST.Q.xxx.Worker]
]
(amqp.Table)map[
count:(int64)642
exchange:(string)
queue:(string)TEST.Q.xxx.Worker
reason:(string)rejected
routing-keys:([]interface {})[(string)TEST.Q.xxx.Worker]
time:(time.Time)2019-04-16 18:49:49 +0000 UTC
]
]
OK, custom marshaler works for me, thanks. But nevertheless watermill shouldn't fail on standard headers ;)
But nevertheless watermill shouldn't fail on standard headers ;) Yup, it is not so good :)
I have couple ideas how it can be fixed, but nothing sounds good:
- Using gob to marshal Rabbit's metadata to message metadata, but it will break all string headers
- Storing some metada fields in ctx, but if someone will overwrite ctx we will lose this information
- We can use for some "special" metadata fields (maybe some prefix in name) which will serialized with gob, to keep compatibility with simple string fields. All other fields will be not marshaled.
- We can make explicit blacklist of ignored fields, but we will lose transparency
But 3 and 4 sounds most reasonable. What do you think?
clarifying 3:
- normal headers will be strings like currently
- complex types will be gob'ed and key will be something like:
x-death__gob
?
If you mean something like that, I like that :)
Exactly :)
Does anyone know how many data types of a header can be other than string, in Golang context and AMQP context?
Does anyone know how many data types of a header can be other than string, in Golang context and AMQP context?
I think I found how each AMQP type is mapped to a Go type. https://github.com/rabbitmq/amqp091-go/blob/5c9eb2241d43a1d2ff3ae0c98b7153a38625973d/read.go#L142-L159
I think gob cannot be used in this case because it is possible for an AMQP type to be mapped to nil in Go, and gob cannot encode nil values.