sdk-go icon indicating copy to clipboard operation
sdk-go copied to clipboard

returning protocol.ResultACK in receiver doesn't acknowledge the event

Open presmihaylov opened this issue 2 years ago • 17 comments

Given this example within a consumer using google pubsub:

	err = cloudEventsClient.StartReceiver(ctx, func(ctx context.Context, event cloudevents.Event) protocol.Result {
		log.Infow("received event from topic", "event.id", event.Context.GetID())

		var data map[string]interface{}
		if err := event.DataAs(&data); err != nil {
			log.Errorw("error decoding data", "err", err)
		}

		log.Infow("decoded event", "event.id", event.Context.GetID(), "event.data", data)

                 // this will lead to the event being consumed by this receiver infinitely
		return protocol.ResultACK

                 // this, otoh, will consume the event once
		// return nil
	})

returning protocol.ResultACK will actually not acknowledge the message and it will be retried infinitely. Returningnil on the other hand, acknowledges the message, but this behavior feels wonky.

Is this expected?

presmihaylov avatar Feb 03 '23 10:02 presmihaylov

Before I dig deeper, can you please provide the exact module (e.g. v2) and version/commit you're using for the above code? Is there any error printed which would help me debugging the code path? For example, do you see cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err)?

embano1 avatar Feb 06 '23 07:02 embano1

Example receiver:

package main

import (
	"context"
	"fmt"
	"log"

	cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/cloudevents/sdk-go/v2/event"
	"github.com/cloudevents/sdk-go/v2/protocol"
)

func receive(ctx context.Context, event event.Event) error {
	fmt.Printf("------------------ Received MSG ------------------\n")
	return protocol.ResultACK
}

func main() {
	ctx := context.Background()
	t, err := cepubsub.New(context.Background(),
		cepubsub.WithProjectID("helloworld"),
		cepubsub.AllowCreateTopic(true),
		cepubsub.AllowCreateSubscription(true),
		cepubsub.WithSubscriptionAndTopicID("helloworld", "helloworld"))
	if err != nil {
		log.Fatalf("failed to create pubsub protocol, %s", err.Error())
	}
	c, err := cloudevents.NewClient(t)

	if err != nil {
		log.Fatalf("failed to create client, %s", err.Error())
	}

	log.Println("Created client, listening...")

	if err := c.StartReceiver(ctx, receive); err != nil {
		log.Fatalf("failed to start pubsub receiver, %s", err.Error())
	}
}

Example sender:

package main

import (
	"context"
	"log"
	"os"

	cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
	cloudevents "github.com/cloudevents/sdk-go/v2"
)

// Example is a basic data struct.
type Example struct {
	Sequence int    `json:"id"`
	Message  string `json:"message"`
}

func main() {
	t, err := cepubsub.New(context.Background(),
		cepubsub.WithProjectID("helloworld"),
		cepubsub.AllowCreateTopic(true),
		cepubsub.AllowCreateSubscription(true),
		cepubsub.WithTopicID("helloworld"))
	if err != nil {
		log.Printf("failed to create pubsub transport, %s", err.Error())
		os.Exit(1)
	}
	c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
	if err != nil {
		log.Printf("failed to create client, %s", err.Error())
		os.Exit(1)
	}

	event := cloudevents.NewEvent()
	event.SetType("com.cloudevents.sample.sent")
	event.SetSource("github.com/cloudevents/sdk-go/samples/pubsub/sender/")
	_ = event.SetData("application/json", &Example{
		Sequence: 0,
		Message:  "HELLO",
	})

	if result := c.Send(context.Background(), event); cloudevents.IsUndelivered(result) {
		log.Printf("failed to send: %v", err)
		os.Exit(1)
	} else {
		log.Printf("sent, accepted: %t", cloudevents.IsACK(result))
	}

	os.Exit(0)
}

Run both while local gpubsub is running. Here's output from receiver:

$ go run *.go
2023/02/11 13:59:06 Created client, listening...
{"level":"info","ts":1676116746.776515,"logger":"fallback","caller":"[email protected]/protocol.go:195","msg":"starting subscriber for Topic \"helloworld\", Subscription \"helloworld\""}
{"level":"info","ts":1676116746.777045,"logger":"fallback","caller":"[email protected]/protocol.go:198","msg":"conn is&{true true helloworld 0x1400011e340 helloworld <nil> helloworld <nil> {0 0} <nil> <nil> <nil> false }"}
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
...

presmihaylov avatar Feb 11 '23 12:02 presmihaylov

So you send exactly one event and your receiver is stuck in this receive loop, right? Can you please provide detailed information on the module tag/commit used for all the imports?

embano1 avatar Feb 12 '23 07:02 embano1

Returningnil on the other hand, acknowledges the message, but this behavior feels wonky.

Btw: I wonder why this feels wonky though? The handler is not the transport. Hence the underlying transport typically handles ACK/NACK.

embano1 avatar Feb 12 '23 07:02 embano1

The point is that returning protocol.ResultACK doesn't actually acknowledge the message and it is retransmitted forever (as shown in final output).

here are the versions of imports I'm using, based on go.mod:

github.com/cloudevents/sdk-go/protocol/pubsub/v2 v2.13.0
github.com/cloudevents/sdk-go/v2 v2.13.0

full go.mod is here

presmihaylov avatar Feb 12 '23 07:02 presmihaylov

The point is that returning protocol.ResultACK doesn't actually acknowledge the message

Yeah, sorry (restated my question as I got confused with the other issue you have :) )

embano1 avatar Feb 12 '23 07:02 embano1

Btw: I wonder why this feels wonky though? The handler is not the transport. Hence the underlying transport typically handles ACK/NACK.

The thing I was referring to as wonky is the fact that protocol.ResultACK results in retry, not that nil results in acknowledgement.

presmihaylov avatar Feb 12 '23 07:02 presmihaylov

The thing I was referring to as wonky is the fact that protocol.ResultACK results in retry, not that nil results in acknowledgement.

IIRC it is because protocol.ResultACK is also an error type and the code may just check for error and retry. That's why I suggest returning nil if there's no error (and let the transport handle ACK).

embano1 avatar Feb 12 '23 07:02 embano1

yeah, I did change it to retuning nil to workaround the issue.

It just took me a while to figure out why protocol.ResultACK wasn't working, especially given this func in the library, which I assumed should be used to separate a normal error from the ResultACK error:

// IsACK true means the recipient acknowledged the event.
func IsACK(target Result) bool {
	// special case, nil target also means ACK.
	if target == nil {
		return true
	}

	return ResultIs(target, ResultACK)
}

presmihaylov avatar Feb 12 '23 07:02 presmihaylov

yeah, I did change it to retuning nil to workaround the issue.

It just took me a while to figure out why protocol.ResultACK wasn't working, especially given this func in the library, which I assumed should be used to separate a normal error from the ResultACK error:

// IsACK true means the recipient acknowledged the event.
func IsACK(target Result) bool {
	// special case, nil target also means ACK.
	if target == nil {
		return true
	}

	return ResultIs(target, ResultACK)
}

IIRC, this one is supposed to be used by the underlying protocol (senders/receivers) to make decisions about retries - not by the handler, though.

embano1 avatar Feb 13 '23 07:02 embano1

Still valid. Went the same path as @preslavmihaylov

I was debugging the code to see where the issue happen.

  1. The Invoker returns error, which is handled by the defer function: https://github.com/cloudevents/sdk-go/blob/310da9018ea36b7c6faa3c06aa95c8d8f7a834db/v2/client/invoker.go#L57-L60
  2. This calls the Message Finish method, which checks err != nil and then calls Nack. https://github.com/cloudevents/sdk-go/blob/310da9018ea36b7c6faa3c06aa95c8d8f7a834db/protocol/pubsub/v2/message.go#L125-L132

arxeiss avatar Aug 08 '23 13:08 arxeiss

@arxeiss as discussed above what are you returning in your receiver handler?

embano1 avatar Aug 09 '23 06:08 embano1

I'm using protocol.ResultACK just as the owner of the ticket.

Yes, I read your message:

IIRC, this one is supposed to be used by the underlying protocol (senders/receivers) to make decisions about retries - not by the handler, though.

However, when I check documentation of StartReceiver I see this. Where return type could be protocol.Result. If we should return nil for ACK, then function declaration shouldn't support Protocol return type, which indicates that we could return protocol.ResultACK.

https://github.com/cloudevents/sdk-go/blob/310da9018ea36b7c6faa3c06aa95c8d8f7a834db/v2/client/client.go#L33-L50

arxeiss avatar Aug 09 '23 08:08 arxeiss

Where return type could be protocol.Result. If we should return nil for ACK, then function declaration shouldn't support Protocol return type, which indicates that we could return protocol.ResultACK.

That signature is not incorrect, but a bit misleading. If you look at what protocol.Result is, it's an (interface type) definition of type error, i.e., Go's standard convention for error handling returning an error in functions (which can be nil).

As described above, protocol.Result(N)ACK is a concrete but low-level type you shouldn't be using. Instead, if you want to use a protocol.Result in your code (instead of nil), use the protocol-specific type e.g. http.Result (where applicable).

However, and coming back to this issue, it seems the pubsub implementation has a bug here as it treats any result, incl. ACK, as error. Other protocol implementations have a different error handling strategy here and I'm not familiar with PubSub so I'll have to defer a fix to someone familiar with PubSub.

Alternatively, the workaround is to return nil instead of a Result.

embano1 avatar Aug 09 '23 11:08 embano1

Yes, you are right. If you search deeper, you notice it is fulfilling error interface. But if you check just doc block of function and intellisense in IDE suggest you protocol.ResultACK you might not feel the need to check it.

So in my opinion, there are 2 things to change

  1. Fix error handling taking protocol.ResultACK as not error for PubSub
  2. Fix documentation for StartReceiver and replace protocol.Result with error. Then it is clear that developer should return nil and not protocol.Result.

Do you agree?

arxeiss avatar Aug 09 '23 11:08 arxeiss

Yes, you are right. If you search deeper, you notice it is fulfilling error interface. But if you check just doc block of function and intellisense in IDE suggest you protocol.ResultACK you might not feel the need to check it.

Totally agree that this is misleading in our docs.

Fix error handling taking protocol.ResultACK as not error for PubSub

Yes.

Fix documentation for StartReceiver and replace protocol.Result with error. Then it is clear that developer should return nil and not protocol.Result.

Yes, with a minor adjustment to include an example such as http.Result as a valid transport response/error in the doc string. We should just be more clear here.

embano1 avatar Aug 11 '23 06:08 embano1

Hello, I've encounter the same issue here. Pub/Sub does not support properly protocol.Result. Here is a PR that fixes the issue with related tests: https://github.com/cloudevents/sdk-go/pull/1064

Note the 4 cases proposed:

Function result Acked/Nacked Finish result
nil ACKED nil
any error except cloudevents.Result NACKED the original error
cloudevents.Result (param acked) ACKED nil
cloudevents.Result (param nacked) NACKED the protocol.Result

chapurlatn avatar Jun 10 '24 12:06 chapurlatn

This issue should be closed, as #1064 is merged

arxeiss avatar Nov 21 '25 14:11 arxeiss