returning protocol.ResultACK in receiver doesn't acknowledge the event
Given this example within a consumer using google pubsub:
err = cloudEventsClient.StartReceiver(ctx, func(ctx context.Context, event cloudevents.Event) protocol.Result {
log.Infow("received event from topic", "event.id", event.Context.GetID())
var data map[string]interface{}
if err := event.DataAs(&data); err != nil {
log.Errorw("error decoding data", "err", err)
}
log.Infow("decoded event", "event.id", event.Context.GetID(), "event.data", data)
// this will lead to the event being consumed by this receiver infinitely
return protocol.ResultACK
// this, otoh, will consume the event once
// return nil
})
returning protocol.ResultACK will actually not acknowledge the message and it will be retried infinitely.
Returningnil on the other hand, acknowledges the message, but this behavior feels wonky.
Is this expected?
Before I dig deeper, can you please provide the exact module (e.g. v2) and version/commit you're using for the above code? Is there any error printed which would help me debugging the code path? For example, do you see cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err)?
Example receiver:
package main
import (
"context"
"fmt"
"log"
cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
)
func receive(ctx context.Context, event event.Event) error {
fmt.Printf("------------------ Received MSG ------------------\n")
return protocol.ResultACK
}
func main() {
ctx := context.Background()
t, err := cepubsub.New(context.Background(),
cepubsub.WithProjectID("helloworld"),
cepubsub.AllowCreateTopic(true),
cepubsub.AllowCreateSubscription(true),
cepubsub.WithSubscriptionAndTopicID("helloworld", "helloworld"))
if err != nil {
log.Fatalf("failed to create pubsub protocol, %s", err.Error())
}
c, err := cloudevents.NewClient(t)
if err != nil {
log.Fatalf("failed to create client, %s", err.Error())
}
log.Println("Created client, listening...")
if err := c.StartReceiver(ctx, receive); err != nil {
log.Fatalf("failed to start pubsub receiver, %s", err.Error())
}
}
Example sender:
package main
import (
"context"
"log"
"os"
cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
// Example is a basic data struct.
type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}
func main() {
t, err := cepubsub.New(context.Background(),
cepubsub.WithProjectID("helloworld"),
cepubsub.AllowCreateTopic(true),
cepubsub.AllowCreateSubscription(true),
cepubsub.WithTopicID("helloworld"))
if err != nil {
log.Printf("failed to create pubsub transport, %s", err.Error())
os.Exit(1)
}
c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Printf("failed to create client, %s", err.Error())
os.Exit(1)
}
event := cloudevents.NewEvent()
event.SetType("com.cloudevents.sample.sent")
event.SetSource("github.com/cloudevents/sdk-go/samples/pubsub/sender/")
_ = event.SetData("application/json", &Example{
Sequence: 0,
Message: "HELLO",
})
if result := c.Send(context.Background(), event); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", err)
os.Exit(1)
} else {
log.Printf("sent, accepted: %t", cloudevents.IsACK(result))
}
os.Exit(0)
}
Run both while local gpubsub is running. Here's output from receiver:
$ go run *.go
2023/02/11 13:59:06 Created client, listening...
{"level":"info","ts":1676116746.776515,"logger":"fallback","caller":"[email protected]/protocol.go:195","msg":"starting subscriber for Topic \"helloworld\", Subscription \"helloworld\""}
{"level":"info","ts":1676116746.777045,"logger":"fallback","caller":"[email protected]/protocol.go:198","msg":"conn is&{true true helloworld 0x1400011e340 helloworld <nil> helloworld <nil> {0 0} <nil> <nil> <nil> false }"}
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
...
So you send exactly one event and your receiver is stuck in this receive loop, right? Can you please provide detailed information on the module tag/commit used for all the imports?
Returningnil on the other hand, acknowledges the message, but this behavior feels wonky.
Btw: I wonder why this feels wonky though? The handler is not the transport. Hence the underlying transport typically handles ACK/NACK.
The point is that returning protocol.ResultACK doesn't actually acknowledge the message and it is retransmitted forever (as shown in final output).
here are the versions of imports I'm using, based on go.mod:
github.com/cloudevents/sdk-go/protocol/pubsub/v2 v2.13.0
github.com/cloudevents/sdk-go/v2 v2.13.0
full go.mod is here
The point is that returning protocol.ResultACK doesn't actually acknowledge the message
Yeah, sorry (restated my question as I got confused with the other issue you have :) )
Btw: I wonder why this feels wonky though? The handler is not the transport. Hence the underlying transport typically handles ACK/NACK.
The thing I was referring to as wonky is the fact that protocol.ResultACK results in retry, not that nil results in acknowledgement.
The thing I was referring to as wonky is the fact that
protocol.ResultACKresults in retry, not thatnilresults in acknowledgement.
IIRC it is because protocol.ResultACK is also an error type and the code may just check for error and retry. That's why I suggest returning nil if there's no error (and let the transport handle ACK).
yeah, I did change it to retuning nil to workaround the issue.
It just took me a while to figure out why protocol.ResultACK wasn't working, especially given this func in the library, which I assumed should be used to separate a normal error from the ResultACK error:
// IsACK true means the recipient acknowledged the event.
func IsACK(target Result) bool {
// special case, nil target also means ACK.
if target == nil {
return true
}
return ResultIs(target, ResultACK)
}
yeah, I did change it to retuning
nilto workaround the issue.It just took me a while to figure out why
protocol.ResultACKwasn't working, especially given this func in the library, which I assumed should be used to separate a normal error from theResultACKerror:// IsACK true means the recipient acknowledged the event. func IsACK(target Result) bool { // special case, nil target also means ACK. if target == nil { return true } return ResultIs(target, ResultACK) }
IIRC, this one is supposed to be used by the underlying protocol (senders/receivers) to make decisions about retries - not by the handler, though.
Still valid. Went the same path as @preslavmihaylov
I was debugging the code to see where the issue happen.
- The Invoker returns error, which is handled by the defer function: https://github.com/cloudevents/sdk-go/blob/310da9018ea36b7c6faa3c06aa95c8d8f7a834db/v2/client/invoker.go#L57-L60
- This calls the Message Finish method, which checks
err != niland then calls Nack. https://github.com/cloudevents/sdk-go/blob/310da9018ea36b7c6faa3c06aa95c8d8f7a834db/protocol/pubsub/v2/message.go#L125-L132
@arxeiss as discussed above what are you returning in your receiver handler?
I'm using protocol.ResultACK just as the owner of the ticket.
Yes, I read your message:
IIRC, this one is supposed to be used by the underlying protocol (senders/receivers) to make decisions about retries - not by the handler, though.
However, when I check documentation of StartReceiver I see this.
Where return type could be protocol.Result. If we should return nil for ACK, then function declaration shouldn't support Protocol return type, which indicates that we could return protocol.ResultACK.
https://github.com/cloudevents/sdk-go/blob/310da9018ea36b7c6faa3c06aa95c8d8f7a834db/v2/client/client.go#L33-L50
Where return type could be protocol.Result. If we should return nil for ACK, then function declaration shouldn't support Protocol return type, which indicates that we could return protocol.ResultACK.
That signature is not incorrect, but a bit misleading. If you look at what protocol.Result is, it's an (interface type) definition of type error, i.e., Go's standard convention for error handling returning an error in functions (which can be nil).
As described above, protocol.Result(N)ACK is a concrete but low-level type you shouldn't be using. Instead, if you want to use a protocol.Result in your code (instead of nil), use the protocol-specific type e.g. http.Result (where applicable).
However, and coming back to this issue, it seems the pubsub implementation has a bug here as it treats any result, incl. ACK, as error. Other protocol implementations have a different error handling strategy here and I'm not familiar with PubSub so I'll have to defer a fix to someone familiar with PubSub.
Alternatively, the workaround is to return nil instead of a Result.
Yes, you are right. If you search deeper, you notice it is fulfilling error interface. But if you check just doc block of function and intellisense in IDE suggest you protocol.ResultACK you might not feel the need to check it.
So in my opinion, there are 2 things to change
- Fix error handling taking
protocol.ResultACKas not error for PubSub - Fix documentation for
StartReceiverand replaceprotocol.Resultwitherror. Then it is clear that developer should return nil and notprotocol.Result.
Do you agree?
Yes, you are right. If you search deeper, you notice it is fulfilling error interface. But if you check just doc block of function and intellisense in IDE suggest you protocol.ResultACK you might not feel the need to check it.
Totally agree that this is misleading in our docs.
Fix error handling taking protocol.ResultACK as not error for PubSub
Yes.
Fix documentation for StartReceiver and replace protocol.Result with error. Then it is clear that developer should return nil and not protocol.Result.
Yes, with a minor adjustment to include an example such as http.Result as a valid transport response/error in the doc string. We should just be more clear here.
Hello, I've encounter the same issue here. Pub/Sub does not support properly protocol.Result. Here is a PR that fixes the issue with related tests: https://github.com/cloudevents/sdk-go/pull/1064
Note the 4 cases proposed:
| Function result | Acked/Nacked | Finish result |
|---|---|---|
| nil | ACKED | nil |
| any error except cloudevents.Result | NACKED | the original error |
| cloudevents.Result (param acked) | ACKED | nil |
| cloudevents.Result (param nacked) | NACKED | the protocol.Result |
This issue should be closed, as #1064 is merged