sdk-go icon indicating copy to clipboard operation
sdk-go copied to clipboard

PubSub subsequent messages not properly ordered by key

Open rriolobos opened this issue 3 years ago • 2 comments

https://github.com/cloudevents/sdk-go/blob/7601392008ad787e9f27cf1d7898ff14fef1b236/protocol/pubsub/v2/protocol.go#L233

I tested pubsub ordering key with this library but I found an issue. When you send 3 or more messages, with same ordering key, receiver only waits for first one, second and third messages are consumed concurrently. I added a 5 second sleep in message handler in order to check that the total amount of time was 15 seconds as expected, but it was 10 second. It can also be checked with logs. When I try the same directly invoking pubsub receiver, it worked as expected (@skymeyer).

func receive(ctx context.Context, event event.Event) error {
	fmt.Printf("Event Context: %+v\n", event.Context)

	fmt.Printf("Protocol Context: %+v\n", pscontext.ProtocolContextFrom(ctx))

	data := &Example{}
	if err := event.DataAs(data); err != nil {
		fmt.Printf("Got Data Error: %s\n", err.Error())
	}

	time.Sleep(8 * time.Second)

	fmt.Printf("Data processed: %+v\n", data)

	fmt.Printf("----------------------------\n")

	return nil
}

func main() {
	ctx := context.Background()

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	t, err := cepubsub.New(context.Background(),
		cepubsub.WithProjectID(env.ProjectID),
		cepubsub.WithTopicID(env.TopicID),
		cepubsub.WithReceiveSettings(&pubsub.ReceiveSettings{}),
		cepubsub.WithSubscriptionID(env.SubscriptionID))
	if err != nil {
		log.Fatalf("failed to create pubsub protocol, %s", err.Error())
	}

	t.ReceiveSettings.NumGoroutines = 1

	c, err := cloudevents.NewClient(t)

	if err != nil {
		log.Fatalf("failed to create client, %s", err.Error())
	}

	log.Println("Created client, listening...")

	if err := c.StartReceiver(ctx, receive); err != nil {
		log.Fatalf("failed to start pubsub receiver, %s", err.Error())
	}
}

rriolobos avatar Feb 08 '22 07:02 rriolobos

@rriolobos I have updated your sample code a bit to try to reproduce and created two samples:

Ordering seems to work as expected on my end. Can you review a couple of things (ranked from most to less likely) as the message ordering seems to work for me:

Hope that helps shed some light. I have not tested letting cloudevents sdk create the topic/subscription dynamically. I had them preconfigured. Are you relying on the auto creation ?

skymeyer avatar Feb 08 '22 09:02 skymeyer

Hi @skymeyer,

First of all thanks for your answer.

You are right, I mean, when I set MaxOutstandingMessages=1 messages are properly ordered. In fact, all messages are ordered, both for same key and different one. Let's try with an example: Let's imagine every message is process in 5 seconds and prints 2 logs: start message key X 5s -> end message key X Imagine we have the following sequence without setting MaxOutstandigMessages=1:

A,B,C,A,A

I expect the following output; Start A StartB StartC EndA StartA EndB EndC EndA StartA EndA

I mean, hope messages with different key are processed concurrently and messages with same key are processes sequentially.

The output I'm obtaining: Start A StartB StartC EndA StartA StartA EndB EndC EndA EndA

After first A ack, A messages order is not kept anymore. Even more, if I send B,B after previos sequence, they are also processed concurrently instead on sequential as expected.

Is it possible to get this behavior based on some specific settings?

When testing this scenario with a receiver using directly the PubSub Receive() method, the output obtained is the expected one. Next you can see the code:

func main() {

	ctx := context.Background()

	var env envConfig

	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	client, err := pubsub.NewClient(ctx, env.ProjectID)
	if err != nil {
		fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	sub := client.Subscription(env.SubscriptionID)
	cctx, _ := context.WithCancel(ctx)
	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Printf("Got message: %q\n", string(msg.Data))
		time.Sleep(6 * time.Second)
		msg.Ack()
		fmt.Printf("Done: %q\n", string(msg.Data)))
	})
	if err != nil {
		fmt.Errorf("Receive: %v", err)
	}

}

Best regards.

rriolobos avatar Feb 08 '22 12:02 rriolobos

I believe the linked PR corrected this issue with the corresponding config set on the transport instance. Closing, please reopen if this was not solved.

n3wscott avatar Aug 30 '22 16:08 n3wscott