watermill-sql icon indicating copy to clipboard operation
watermill-sql copied to clipboard

Adding `comfylite3`

Open davidroman0O opened this issue 10 months ago • 1 comments

Adding the comfylite3 wrapper of the famous go-sqlite3 driver which compensate the lack of goroutine support by giving the illusion of it.

Most other libraries that re-implement the entire sqlite3 driver won't support the latest features, like the recent JSON datatype, sometimes you just want it all!

No dependency added, very similar to the mysql implementation, eventually someone else would want to add a specific sqlite implementation for a pure golang version since go-sqlite3 might require CGO!

At least, it gives more options! Using sqlite3 with watermill for small side projects is very cool, especially when using an adapter pattern to switch between a real mysql to sqlite3 with the same implementation!

Nothing fancy in the code! 😄

davidroman0O avatar Apr 19 '24 17:04 davidroman0O

Also here an example


package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
         "github.com/davidroman0O/comfylite3"
)

func main() {

	db, err := comfylite3.Comfy(
		comfylite3.WithMemory(),
	)
	if err != nil {
		panic(err)
	}

	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultSQLite3Schema{},
			OffsetsAdapter:   sql.DefaultSQLite3OffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultSQLite3Schema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
	}
}

func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

		// we need to Acknowledge that we received and processed the message,
		// otherwise, it will be resent over and over again.
		msg.Ack()
	}
}

davidroman0O avatar Apr 19 '24 18:04 davidroman0O