watermill icon indicating copy to clipboard operation
watermill copied to clipboard

Swap message after sometime

Open dramirezp opened this issue 1 year ago • 1 comments

I am using Watermill to develop software where I send a message, and it goes through service1, service2, and the last service. I use a slice to control the order of the messages (FIFO, as GoChannel should respect FIFO). After several runs, I am encountering an issue where Watermill is swapping messages. For example, I send message A and message B, but in the last service, message B arrives first and then message A. Attached is a small script where this problem is reflected. It seems to be a race condition because it doesn't always happen, but when the script is run and the issue occurs, it shows a message like this: Slice value 68ad9d74-c0eb-476f-9cc0-5da98d947b61 value in message f01fff7d-1fb6-45a4-bda6-07e021511d3f.

/*
This application is a test of Watermill, a Go library for working efficiently with message streams.
Sending and recieving menssages from a channel.
*/

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"sync"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var pubSub1 *gochannel.GoChannel
var safeSlice *SafeSlice

// Safe Slice struct just for control of the messages
type SafeSlice struct {
	mu    sync.Mutex
	slice []string
}

func (s *SafeSlice) Append(value string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.slice = append(s.slice, value)
}

func (s *SafeSlice) Get(index int) (string, bool) {
	s.mu.Lock()
	defer s.mu.Unlock()
	if index < 0 || index >= len(s.slice) {
		return "Index out of scope", false
	}
	return s.slice[index], true
}

func (s *SafeSlice) Remove(index int) bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	if index < 0 || index >= len(s.slice) {
		return false
	}
	s.slice = append(s.slice[:index], s.slice[index+1:]...)
	return true
}

// service1 function is a handler for the "service-1" service. It appends the message UUID to the
// safe slice and publishes the message to the "service-2-input" channel.
func service1(msg *message.Message) error {
	safeSlice.Append(msg.UUID)
	err := pubSub1.Publish("service-2-input", msg)
	if err != nil {
		panic(err)
	}

	return nil
}

// service2 function is a handler for the "service-2" service. It receives a message, performs
// some logic, and returns a slice of messages.
func service2(msg *message.Message) ([]*message.Message, error) {
	fmt.Printf("Message in service 2 %v\n", msg)

	// Add some logic

	return message.Messages{msg}, nil
}

// service_last function is a handler for the "service_last" service. It compares the message
// UUID with the first UUID in the safe slice and removes the first UUID if they match.
func service_last(msg *message.Message) error {
	uuid, _ := safeSlice.Get(0)

	fmt.Printf("service_last %v\n", msg)

	if msg.UUID == uuid {
		fmt.Println("OK")
		safeSlice.Remove(0)
	} else {
		fmt.Printf("Slice value %s value in message %s\n", uuid, msg.UUID)
		os.Exit(0)
	}

	return nil
}

func main() {

	logger := watermill.NewStdLogger(true, true)
	safeSlice = &SafeSlice{}

	pubSub1 = gochannel.NewGoChannel(gochannel.Config{}, logger)

	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		log.Fatalf("could not create router: %v", err)
	}

	// Create handlers for each service
	router.AddNoPublisherHandler("service-1", "service-1-input", pubSub1, service1)
	router.AddHandler("service-2", "service-2-input", pubSub1, "service_last-input", pubSub1, service2)
	router.AddNoPublisherHandler("service_last", "service_last-input", pubSub1, service_last)

	// Start the router
	go func() {
		if err := router.Run(context.Background()); err != nil {
			log.Fatalf("could not run router: %v", err)
		}
	}()

	time.Sleep(1 * time.Second)

	for {
		// Publish a message to start the pipeline
		msg := message.NewMessage(watermill.NewUUID(), []byte{})
		if err := pubSub1.Publish("service-1-input", msg); err != nil {
			log.Fatalf("could not publish message: %v", err)
		}

		//time.Sleep(1000 * time.Millisecond)
	}

	// Allow some time for the message to be processed
	select {}
}

dramirezp avatar Jul 24 '24 15:07 dramirezp

The GoChannel approach is using no go-routines internally if you don't explicitly add Multiplier.

I tried running the code and seems like there's some problem indeed.

yashb042 avatar Sep 13 '24 09:09 yashb042