watermill icon indicating copy to clipboard operation
watermill copied to clipboard

amqp message rejected if header is not string

Open danielllek opened this issue 5 years ago • 10 comments

I've got message with header that is not string, and it gets rejected by watermill: [watermill] 2019/04/17 15:48:14.371452 subscriber.go:245: level=ERROR msg="Processing message failed, sending nack" amqp_exchange_name=x amqp_queue_name=y err="metadata x-death is not a string, but []interface {}....

x-death header is added by dead letter, and I would like to process this message again after some time in DL.

My flow looks like this:

  1. try to process message,
  2. after few NACKs message gets moved to dead letter queue not to block main queue,
  3. there's policy on DL that moves message back to main queue after some few minutes to try to process it again.

Most simple solution is to ignore headers that are not strings and not add them to Metadata - this would work me.

I'm willing to make fix/PR, but please advise if that's good solution or should I do it in some other way (maybe flattening

danielllek avatar Apr 17 '19 14:04 danielllek

Metadata are strings because almost all of Pub/Subs are supporting only simple types for metadata. So the main thing that we need to keep is to make amqp.Marshaler transparent, so you can make multiple Pub/Subs pipelines.

So to solve it and keep backward compatibility we can make custom marshaling of x-death to string. What values it contains? Some ints? And do you know about any other headers in rabbit which are not string?

Unfortunately it will not work for all non-string headers, but the question is that is there any default non-string headers? If someone have something custom, he should probably to implement custom marshaller.

roblaszczak avatar Apr 17 '19 18:04 roblaszczak

Until now it's the only one that makes problems, my example value (dump by spew):

([]interface {})[

(amqp.Table)map[
time:(time.Time)2019-04-16 18:50:19 +0000 UTC 
count:(int64)642 
exchange:(string)TEST.E.Fanout.xxx.Worker.DL 
queue:(string)TEST.Q.xxx.Worker.DL 
reason:(string)expired 
routing-keys:([]interface {})[(string)TEST.Q.xxx.Worker]
] 

(amqp.Table)map[
count:(int64)642 
exchange:(string) 
queue:(string)TEST.Q.xxx.Worker 
reason:(string)rejected 
routing-keys:([]interface {})[(string)TEST.Q.xxx.Worker] 
time:(time.Time)2019-04-16 18:49:49 +0000 UTC
]
]

danielllek avatar Apr 18 '19 07:04 danielllek

OK, custom marshaler works for me, thanks. But nevertheless watermill shouldn't fail on standard headers ;)

danielllek avatar Apr 18 '19 13:04 danielllek

But nevertheless watermill shouldn't fail on standard headers ;) Yup, it is not so good :)

I have couple ideas how it can be fixed, but nothing sounds good:

  1. Using gob to marshal Rabbit's metadata to message metadata, but it will break all string headers
  2. Storing some metada fields in ctx, but if someone will overwrite ctx we will lose this information
  3. We can use for some "special" metadata fields (maybe some prefix in name) which will serialized with gob, to keep compatibility with simple string fields. All other fields will be not marshaled.
  4. We can make explicit blacklist of ignored fields, but we will lose transparency

But 3 and 4 sounds most reasonable. What do you think?

roblaszczak avatar Apr 18 '19 14:04 roblaszczak

clarifying 3:

  1. normal headers will be strings like currently
  2. complex types will be gob'ed and key will be something like: x-death__gob?

If you mean something like that, I like that :)

danielllek avatar Apr 19 '19 09:04 danielllek

Exactly :)

roblaszczak avatar Apr 19 '19 10:04 roblaszczak

Does anyone know how many data types of a header can be other than string, in Golang context and AMQP context?

checkmunza avatar Jun 03 '23 15:06 checkmunza

Does anyone know how many data types of a header can be other than string, in Golang context and AMQP context?

I think I found how each AMQP type is mapped to a Go type. https://github.com/rabbitmq/amqp091-go/blob/5c9eb2241d43a1d2ff3ae0c98b7153a38625973d/read.go#L142-L159

I think gob cannot be used in this case because it is possible for an AMQP type to be mapped to nil in Go, and gob cannot encode nil values.

checkmunza avatar Jun 03 '23 16:06 checkmunza