kit
kit copied to clipboard
Add AWS' Simple Queue Service support for transport
Hi,
@forepaas, we are currently starting a new project using go-kit and wanted to use AWS' Simple Query Service as a transport layer between some of our microservices.
I took the liberty of forking the project and implementing the support of SQS in go-kit's transport. This was discussed on issue https://github.com/go-kit/kit/issues/858 but was never implemented.
If you have any remarks, suggestions or questions please let me know.
@0marq SQS Consumer. Each sqs message has a default flow: read / process / delete. I see that in your implementation the deleting remain on the user part in different ways and it is not obvious how to do better (ConsumerFinalizerFunc or ConsumerResponseFunc). If a message is not deleted, then it will be processed X time, depends on sqs queue configuration.
Here should be a more obvious way: Add a ConsumerOption with DeleteMessage call which can be used beforeDecode or afterEncode, I can imagine when someone wants to remove and take all the responsibility on the processing message or someone want to delete after successful processing.
SQS (Simple Queue Service), not query.
I actually made my own implementation and using it on my production. The signature like below:
//main.go
//build subscriber
subscriber := sqstransport.NewSubscriber(endpoint, dec, enc, opts...)
for {
out, _ := sqsClient.ReceiveMessage(ctx, input) //handle error
for _, msg := range out.Messages {
// Its caller responsibility if want to have many workers because its more easier to scale.
// Now you are free to extend the visibility timeout, or using hearbeat suggested by AWS.
subscriber.ServeMessage(&msg)
}
}
What do you think?
I actually made my own implementation and using it on my production. The signature like below:
//main.go //build subscriber subscriber := sqstransport.NewSubscriber(endpoint, dec, enc, opts...) for { out, _ := sqsClient.ReceiveMessage(ctx, input) //handle error for _, msg := range out.Messages { // Its caller responsibility if want to have many workers because its more easier to scale. // Now you are free to extend the visibility timeout, or using hearbeat suggested by AWS. subscriber.ServeMessage(&msg) } }
What do you think?
This would lighten the consumer's responsibilities because we would no longer need to handler visibility timeouts, while allowing multiple message processing.
In the beginning I was thinking to go towards this approach but I ended up building a code more complete but also more complex, my motivation was that the consumer should not only process the messages but also fetch the messages. I don't know what's go-kit's opinion on this @peterbourgon ?
There's a great deal of complexity in this PR that I'm not qualified to judge, because I'm not a user of SQS. But I'll write my general expectations. And remember: Go kit transports aren't meant to be feature-complete clients of the technology they wrap — they're meant to provide a simple RPC-style (single request, single response) interface to their underlying transport.
I would expect any kind of message broker or queue transport package to have a New{Subscriber, Consumer, ...} constructor that took enough configuration information to identify a single topic/stream/whatever of messages of the same schema. I would expect the transport to consume messages one-by-one from the topic. Each message should be fed to a DecodeRequestFunc that took the message in its native type and produced a request interface{}. That request should be fed to the endpoint and the result received and fed to an EncodeResponseFunc that (probably optionally) produced a native message type which would be published as a response. Acking or Nacking the original message is I guess implementation dependent.
All of the details I'm seeing in this PR about working with batches of messages, retries, the entire concept of "left messages", handling additional message states, etc. etc. are in my opinion out of scope for a Go kit transport. These details should not be exposed to users.
I understand. I'll probably go towards @xyluet approach which is a lot less complex on Go kit's side :
- The user is responsible of calling ReceiveMessage in his main.
- The user will call Go kit's consumer.ServeMessage to serve each received message.
Given the new input from @peterbourgon I have simplified the code base : Receiving messages, handling multiple messages and updating visibilityTimeouts are left to the user. We simply provide a Consumer.ServeMessage and Producer.Endpoint allowing the normal Go kit message processing and publishing flow, with a certain number of built in before/after & encode/decode functions.
Is this still being worked on? I'd love to use it for a project
My two cents: I don't necessarily think that go-kit has to be the central repository of every possible transport implementation. The readme could mention third-party implementations for easy discoverability.
Another thing that I'd require from these submissions is that they are already released in a third-party repository, have a few users in production, etc. Go-kit is released about once a year, so it's not the right place for testing new transports IMHO.
Totally agree. As documented in #843 when generics lands Go kit will begin a process of paring down its core offerings to a lot less than is currently present.