watermill-sql
watermill-sql copied to clipboard
Adding `comfylite3`
Adding the comfylite3
wrapper of the famous go-sqlite3 driver which compensate the lack of goroutine support by giving the illusion of it.
Most other libraries that re-implement the entire sqlite3 driver won't support the latest features, like the recent JSON datatype, sometimes you just want it all!
No dependency added, very similar to the mysql implementation, eventually someone else would want to add a specific sqlite implementation for a pure golang version since go-sqlite3 might require CGO!
At least, it gives more options! Using sqlite3 with watermill for small side projects is very cool, especially when using an adapter pattern to switch between a real mysql to sqlite3 with the same implementation!
Nothing fancy in the code! 😄
Also here an example
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/davidroman0O/comfylite3"
)
func main() {
db, err := comfylite3.Comfy(
comfylite3.WithMemory(),
)
if err != nil {
panic(err)
}
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultSQLite3Schema{},
OffsetsAdapter: sql.DefaultSQLite3OffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultSQLite3Schema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()
}
}