ergo icon indicating copy to clipboard operation
ergo copied to clipboard

Define an abstraction over buntdb

Open progval opened this issue 2 years ago • 0 comments

This will allow implementing other backends in the future, such as SQL databases.

This is a WIP, but comments are welcome (especially as this is the first time I actually wrote code in Go)

progval avatar Jun 11 '22 13:06 progval

This has been sitting idly for a while, and I don't think I will have time to work on it in the near future; so I am closing this PR. Anyone interested in it, feel free to resurrect it.

Here is a dump of my WIP code to add a sqlite backend
diff --git a/irc/config.go b/irc/config.go
index 75fd1f27..78363020 100644
--- a/irc/config.go
+++ b/irc/config.go
@@ -300,6 +300,13 @@ func (t *ThrottleConfig) UnmarshalYAML(unmarshal func(interface{}) error) (err e
 	return
 }
 
+type DatastoreConfig struct {
+	Type        string
+	Path        string
+	AutoUpgrade bool
+	MySQL       mysql.Config
+}
+
 type AccountConfig struct {
 	Registration          AccountRegistrationConfig
 	AuthenticationEnabled bool `yaml:"authentication-enabled"`
@@ -623,11 +630,7 @@ type Config struct {
 
 	LockFile string `yaml:"lock-file"`
 
-	Datastore struct {
-		Path        string
-		AutoUpgrade bool
-		MySQL       mysql.Config
-	}
+	Datastore DatastoreConfig
 
 	Accounts AccountConfig
 
diff --git a/irc/database.go b/irc/database.go
index 47989b7d..85ed89e2 100644
--- a/irc/database.go
+++ b/irc/database.go
@@ -36,36 +36,41 @@ type SchemaChange struct {
 	Changer        SchemaChanger
 }
 
-func checkDBReadyForInit(path string) error {
-	_, err := os.Stat(path)
+func checkDBReadyForInit(config DatastoreConfig) error {
+	_, err := os.Stat(config.Path)
 	if err == nil {
-		return fmt.Errorf("Datastore already exists (delete it manually to continue): %s", path)
+		return fmt.Errorf("Datastore already exists (delete it manually to continue): %s", config.Path)
 	} else if !os.IsNotExist(err) {
-		return fmt.Errorf("Datastore path %s is inaccessible: %w", path, err)
+		return fmt.Errorf("Datastore path %s is inaccessible: %w", config.Path, err)
 	}
 	return nil
 }
 
 // InitDB creates the database, implementing the `oragono initdb` command.
-func InitDB(path string) error {
-	if err := checkDBReadyForInit(path); err != nil {
+func InitDB(config DatastoreConfig) error {
+	if err := checkDBReadyForInit(config); err != nil {
 		return err
 	}
 
-	if err := initializeDB(path); err != nil {
+	if err := initializeDB(config); err != nil {
 		return fmt.Errorf("Could not save datastore: %w", err)
 	}
 	return nil
 }
 
 // internal database initialization code
-func initializeDB(path string) error {
-	store, err := kv.BuntdbOpen(path)
+func initializeDB(config DatastoreConfig) error {
+	store, err := kv.Open(kv.Config{Type: config.Type, Path: config.Path})
 	if err != nil {
 		return err
 	}
+	err = store.Initialize()
 	defer store.Close()
 
+	if err != nil {
+		return err
+	}
+
 	err = store.Update(func(tx kv.Tx) error {
 		// set schema version
 		tx.Set(keySchemaVersion, strconv.Itoa(latestDbSchema), nil)
@@ -83,7 +88,7 @@ func OpenDatabase(config *Config) (kv.Store, error) {
 
 // open the database, giving it at most one chance to auto-upgrade the schema
 func openDatabaseInternal(config *Config, allowAutoupgrade bool) (db kv.Store, err error) {
-	db, err = kv.BuntdbOpen(config.Datastore.Path)
+	db, err = kv.Open(kv.Config{Type: config.Datastore.Type, Path: config.Datastore.Path})
 	if err != nil {
 		return
 	}
diff --git a/irc/import.go b/irc/import.go
index 0bf252f0..cd4df8c8 100644
--- a/irc/import.go
+++ b/irc/import.go
@@ -227,12 +227,12 @@ func ImportDB(config *Config, infile string) (err error) {
 		return err
 	}
 
-	err = checkDBReadyForInit(config.Datastore.Path)
+	err = checkDBReadyForInit(config.Datastore)
 	if err != nil {
 		return err
 	}
 
-	db, err := kv.BuntdbOpen(config.Datastore.Path)
+	db, err := kv.Open(kv.Config{Type: config.Datastore.Type, Path: config.Datastore.Path})
 	if err != nil {
 		return err
 	}
diff --git a/irc/kv/buntdb.go b/irc/kv/buntdb.go
index 46752f78..43497bc8 100644
--- a/irc/kv/buntdb.go
+++ b/irc/kv/buntdb.go
@@ -16,6 +16,8 @@ import (
 func convertError(err error) error {
 	if err == buntdb.ErrNotFound {
 		return ErrNotFound
+	} else if err == buntdb.ErrTxNotWritable {
+		return ErrTxNotWritable
 	} else {
 		return err
 	}
@@ -71,6 +73,11 @@ func BuntdbOpen(path string) (Store, error) {
 	return BuntdbStore{store}, err
 }
 
+func (db BuntdbStore) Initialize() error {
+	// Nothing to do
+	return nil
+}
+
 func (db BuntdbStore) Close() error {
 	return db.db.Close()
 }
diff --git a/irc/kv/interface.go b/irc/kv/interface.go
index b2cccc6c..9ded206f 100644
--- a/irc/kv/interface.go
+++ b/irc/kv/interface.go
@@ -12,7 +12,8 @@ import (
 )
 
 var (
-	ErrNotFound = errors.New("not found")
+	ErrNotFound      = errors.New("not foundbbb")
+	ErrTxNotWritable = errors.New("tx not writable")
 )
 
 type SetOptions struct {
@@ -32,6 +33,7 @@ type Tx interface {
 }
 
 type Store interface {
+	Initialize() error
 	Close() error
 	Update(fn func(tx Tx) error) error
 	View(fn func(tx Tx) error) error
diff --git a/irc/server.go b/irc/server.go
index d6dc0890..18da25bc 100644
--- a/irc/server.go
+++ b/irc/server.go
@@ -594,6 +594,8 @@ func (server *Server) applyConfig(config *Config) (err error) {
 		// enforce configs that can't be changed after launch:
 		if server.name != config.Server.Name {
 			return fmt.Errorf("Server name cannot be changed after launching the server, rehash aborted")
+		} else if oldConfig.Datastore.Type != config.Datastore.Type {
+			return fmt.Errorf("Datastore type cannot be changed after launching the server, rehash aborted")
 		} else if oldConfig.Datastore.Path != config.Datastore.Path {
 			return fmt.Errorf("Datastore path cannot be changed after launching the server, rehash aborted")
 		} else if globalCasemappingSetting != config.Server.Casemapping {
@@ -813,14 +815,16 @@ func (server *Server) loadDatastore(config *Config) error {
 	// open the datastore and load server state for which it (rather than config)
 	// is the source of truth
 
-	_, err := os.Stat(config.Datastore.Path)
-	if os.IsNotExist(err) {
+	if !kv.Exists(kv.Config{Type: config.Datastore.Type, Path: config.Datastore.Path}) {
 		server.logger.Warning("server", "database does not exist, creating it", config.Datastore.Path)
-		err = initializeDB(config.Datastore.Path)
+		err := initializeDB(config.Datastore)
 		if err != nil {
 			return err
 		}
 	}
+	if !kv.Exists(kv.Config{Type: config.Datastore.Type, Path: config.Datastore.Path}) {
+		server.logger.Warning("server", "still does not exist", config.Datastore.Path)
+	}
 
 	db, err := OpenDatabase(config)
 	if err == nil {
 dev-irc@particle  ~/ergo   kv-backends ●  cat irc/kv/init.go
// Copyright (c) 2022 Valentin Lorentz
// released under the MIT license

package kv

import (
	"fmt"
	"os"
)

type Config struct {
	Type string
	// Path is a DSN when using MySQL
	// TODO: Make the config more user-friendly/consistent than a DSN
	Path string
}

func Open(config Config) (Store, error) {
	if config.Type == "" || config.Type == "buntdb" {
		return BuntdbOpen(config.Path)
	} else if config.Type == "sqlite3" {
		return SqliteOpen(config.Path)
	} else if config.Type == "mysql" {
		return MysqlOpen(config.Path)
	} else {
		return nil, fmt.Errorf("invalid datastore type: %s", config.Type)
	}
}

func Exists(config Config) bool {
	if config.Type == "" || config.Type == "buntdb" || config.Type == "sqlite3" {
		_, err := os.Stat(config.Path)
		if os.IsNotExist(err) {
			return false
		}
		return true
	} else if config.Type == "mysql" {
		return true // TODO
	} else {
		return true
	}
}
 dev-irc@particle  ~/ergo   kv-backends ●  cat irc/kv/mysql.go
// Copyright (c) 2022 Valentin Lorentz
// released under the MIT license

// This file implements the Store abstraction using sqlite3.

package kv

import (
	"database/sql"
)

/**********************
 * Database
 */

type MysqlStore struct {
	db *sql.DB
}

func MysqlOpen(path string) (Store, error) {
	db, err := sql.Open("mysql", path)
	return MysqlStore{db}, err
}

func (db MysqlStore) Initialize() error {
	err := db.db.Ping()
	if err != nil {
		return err
	}
	// TODO
	return nil
}

func (db MysqlStore) Close() error {
	return db.db.Close()
}

func (kv MysqlStore) Update(fn func(tx Tx) error) error {
	return nil
}

func (kv MysqlStore) View(fn func(tx Tx) error) error {
	return nil
}
 dev-irc@particle  ~/ergo   kv-backends ●  cat irc/kv/sqlite.go
// Copyright (c) 2022 Valentin Lorentz
// released under the MIT license

// This file implements the Store abstraction using sqlite3.

package kv

import (
	"context"
	"database/sql"
	"fmt"
	"strings"
	"time"

	_ "github.com/mattn/go-sqlite3"
)

/**********************
 * Transactions
 */
type SqliteTx struct {
	tx       *sql.Tx
	ctx      context.Context
	readOnly bool
}

func (tx SqliteTx) AscendPrefix(prefix string, iterator func(key, value string) bool) error {
	fmt.Println("sqlite AscendPrefix", prefix)
	now := time.Now().Unix()

	rows, err := tx.tx.QueryContext(tx.ctx, `
        select key, value from kv
        where key >= ? and (expiry is null or expiry > ?)
        order by key
    `, prefix, now)

	if err != nil {
		return err
	}

	for rows.Next() {
		var key string
		var value string

		err = rows.Scan(&key, &value)
		if err != nil {
			return err
		}
		if !strings.HasPrefix(key, prefix) {
			// TODO: do this directly in SQL
			return nil
		}
		fmt.Println("sqlite AscendPrefix result", key, value)
		if !iterator(key, value) {
			return nil
		}
	}

	return nil
}

func (tx SqliteTx) Delete(key string) (val string, err error) {
	fmt.Println("sqlite Delete", key)
	if tx.readOnly {
		return "", ErrTxNotWritable
	}
	return "TODO", nil // TODO
}

func (tx SqliteTx) Get(key string, ignoreExpired ...bool) (val string, err error) {
	now := time.Now().Unix()
	err = tx.tx.QueryRowContext(tx.ctx, `
        select value from kv
        where key = ? and (expiry is null or expiry > ?)
    `, key, now).Scan(&val)
	if err == sql.ErrNoRows {
		fmt.Println("sqlite Get not found", key)
		return "", ErrNotFound
	} else if err != nil {
		fmt.Println("sqlite Get err", err)
		return "", err
	}
	fmt.Println("sqlite Get end", key, val)

	return val, nil
}

func (tx SqliteTx) Set(key string, value string, opts *SetOptions) (previousValue string, replaced bool, err error) {
	fmt.Println("sqlite Set", key, value)
	if tx.readOnly {
		return "", false, ErrTxNotWritable
	}
	var expiry sql.NullInt64
	if opts != nil && opts.Expires {
		expiry = sql.NullInt64{Int64: time.Now().Add(opts.TTL).Unix(), Valid: true}
	} else {
		expiry = sql.NullInt64{Valid: false}
	}
	err = tx.tx.QueryRowContext(tx.ctx, `
        insert into kv (key, expiry, value) values (?, ?, ?)
        on conflict(key) do update set
            expiry=excluded.expiry,
            value=excluded.value
        returning kv.value
    `, key, expiry, value).Scan(&previousValue)

	fmt.Println("sqlite Set end", key, err)
	if err == sql.ErrNoRows {
		return "", false, nil
	} else if err != nil {
		return "", false, err
	} else {
		return previousValue, true, nil
	}
}

/**********************
 * Database
 */

type SqliteStore struct {
	db *sql.DB
}

func SqliteOpen(path string) (Store, error) {
	db, err := sql.Open("sqlite3", path)
	fmt.Println("sqlite open", path)
	return SqliteStore{db}, err
}
func (db SqliteStore) Initialize() error {
	fmt.Println("sqlite init")
	err := db.db.Ping()
	if err != nil {
		return err
	}
	ctx := context.TODO()
	tx, err := db.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: false})
	if err != nil {
		return err
	}
	_, err = tx.Exec(`
        CREATE TABLE kv (
            key         text        PRIMARY KEY,
            expiry      integer,                    -- UNIX timestamp
            value       blob        NOT NULL
        );
    `)
	if err != nil {
		return err
	}
	err = tx.Commit()
	fmt.Println("sqlite init end", err)
	return err
}

func (db SqliteStore) Close() error {
	fmt.Println("sqlite close")
	return db.db.Close()
}

func (db SqliteStore) Update(fn func(tx Tx) error) error {
	fmt.Println("sqlite update")
	ctx := context.TODO()
	tx, err := db.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: false})
	if err != nil {
		return err
	}

	err = fn(SqliteTx{tx: tx, ctx: ctx})

	if err != nil {
		tx.Rollback()
		return err
	}

	fmt.Println("sqlite commit")
	err = tx.Commit()
	fmt.Println("sqlite committed", err)

	if err != nil {
		fmt.Println("sqlite rollback")
		tx.Rollback()
		return err
	}

	return nil
}

func (db SqliteStore) View(fn func(tx Tx) error) error {
	fmt.Println("sqlite view")

	ctx := context.TODO()
	tx, err := db.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: false})
	if err != nil {
		return err
	}

	// Defer a rollback in case anything fails.
	defer tx.Rollback()

	return fn(SqliteTx{tx: tx, ctx: ctx})
}

progval avatar Aug 28 '22 13:08 progval