Query errors with riverdatabasesql, Bun, and pgdriver
I'm completely new to River and immediately ran into the following issue when playing with the examples against Pg 16.4:
msg="Elector: Error attempting to elect" err="ERROR: cannot cast type bigint to interval (SQLSTATE=42846)"
The offending LeaderAttemptElect SQL is attempting to cast an integer time.Duration parameter into a Pg interval which is not allowed. Weird that nobody else appears to have hit the same issue, I wonder where I've gone wrong!?
Anyway, I'm currently working around this by patching all 4 instances of now() + $2 in the river source with now() + make_interval(secs => $2), as recommended in this issue: https://github.com/sqlc-dev/sqlc/issues/429#issuecomment-2144399011
@NathanBaulch I’m not sure how this could have slipped past our testing 🤔 can you confirm that you’re using the dbsql driver, as well as the underlying db driver you’re using (pgx or other)?
OK, that was a bit of a rabbit hole but it turns out the bun sql driver my project uses doesn't prepare statements before execution (for alleged perf reasons). That step is required in order to infer that the @ttl parameter should be interpreted as an interval (oid 1186), otherwise the underlying int64 is used.
Is there any interest in supporting non-prepare-ing drivers? I'd be happy to send a PR that replaces ::interval with make_interval(...).
Ah, it seems like you're using Bun's own pgdriver which I don't think we've tried, so it seems likely to be an issue specific to that. We have a fair bit of test coverage for the drivers including the LeaderAttemptElect query, but that uses pgx under the hood via its stdlib database/sql package.
Have you tried Bun with pgx, or is that not an option for some reason?
I've switched to pgx for now, hopefully no side effects in my project! I guess this issue can serve as a warning to other Bun users.
Exact same issue with riverdatabasesql.
time=2025-01-01T16:08:07.106+05:00 level=ERROR msg="Elector: Error attempting to elect" attempt=190 client_id=MacBook-Pro_local_2024_12_31T14_54_23_558092 err="ERROR: cannot cast type bigint to interval (SQLSTATE=42846)" sleep_duration=8m51.567188925s
Any help or guidance is highly appreciated
Note: I am using gorm as my ORM in most of the project
Are either of you guys able to provide some kind of basic repro that demonstrates the error?
If all we need to do is replace an ::interval with a make_interval, that seems super plausible to do, but it would be nice if we had something to check that it's actually working in the test suite because otherwise this'll probably regress fast.
Sure thing, sorry about the delay. Here's a standalone unit test that uses https://github.com/testcontainers/testcontainers-go to provision a clean DB.
func Test_Bun_Driver_LeaderAttemptElect(t *testing.T) {
ctx := context.Background()
pg := must(postgres.Run(ctx, "postgres:alpine", postgres.BasicWaitStrategies()))
defer testcontainers.TerminateContainer(pg)
c := must(pgdriver.NewDriver().OpenConnector(pg.MustConnectionString(ctx, "sslmode=disable")))
db := sql.OpenDB(c)
defer db.Close()
driver := riverdatabasesql.New(db)
migrator := must(rivermigrate.New(driver, nil))
must(migrator.Migrate(ctx, rivermigrate.DirectionUp, nil))
must(driver.GetExecutor().LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{LeaderID: "foo", TTL: time.Second}))
// BOOM!
}
func must[T any](val T, err error) T { if err != nil { panic(err) } return val }
FWIW, go.mod:
require (
github.com/riverqueue/river v0.15.0
github.com/riverqueue/river/riverdriver v0.15.0
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.15.0
github.com/testcontainers/testcontainers-go v0.35.0
github.com/testcontainers/testcontainers-go/modules/postgres v0.35.0
github.com/uptrace/bun/driver/pgdriver v1.2.9
)
Note that a fix for this might also address #566.
Thanks!
Okay so I spent some time on this, and I was able to get it working for a single SQL operation (QueueCreateOrSetUpdatedAt in this case), which demonstrates that this is at least probably possible.
Partial diff:
diff --git a/driver_test.go b/driver_test.go
index c9e0203..4c28144 100644
--- a/driver_test.go
+++ b/driver_test.go
@@ -9,6 +9,7 @@ import (
"time"
"github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/stretchr/testify/require"
@@ -67,6 +68,45 @@ func TestDriverRiverPgxV5(t *testing.T) {
})
}
+func TestDriverRiverPgxV5_QueryExecModeSimpleProtocol(t *testing.T) {
+ t.Parallel()
+
+ ctx := context.Background()
+
+ dbPool := riverinternaltest.TestDB(ctx, t)
+
+ config := dbPool.Config().Copy()
+ config.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
+
+ dbPool, err := pgxpool.NewWithConfig(ctx, config)
+ require.NoError(t, err)
+
+ riverdrivertest.Exercise(ctx, t,
+ func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
+ t.Helper()
+
+ // TODO: Can this be merged with the pool above?
+ dbPool := riverinternaltest.TestDB(ctx, t)
+
+ config := dbPool.Config().Copy()
+ config.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
+
+ dbPool, err := pgxpool.NewWithConfig(ctx, config)
+ require.NoError(t, err)
+
+ return riverpgxv5.New(dbPool)
+ },
+ func(ctx context.Context, t *testing.T) riverdriver.Executor {
+ t.Helper()
+
+ tx, err := dbPool.Begin(ctx)
+ require.NoError(t, err)
+ t.Cleanup(func() { _ = tx.Rollback(ctx) })
+
+ return riverpgxv5.New(nil).UnwrapExecutor(tx)
+ })
+}
+
func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
const (
clientID = "test-client-id"
diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go
index ae179c3..186a324 100644
--- a/riverdriver/river_driver_interface.go
+++ b/riverdriver/river_driver_interface.go
@@ -442,7 +442,7 @@ type NotifyManyParams struct {
}
type QueueCreateOrSetUpdatedAtParams struct {
- Metadata []byte
+ Metadata *string
Name string
PausedAt *time.Time
UpdatedAt *time.Time
diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go
index 1b02010..bfd512f 100644
--- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go
+++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go
@@ -20,7 +20,7 @@ INSERT INTO river_queue(
updated_at
) VALUES (
now(),
- coalesce($1::jsonb, '{}'::jsonb),
+ jsonb(coalesce($1::text, '{}')),
$2::text,
coalesce($3::timestamptz, NULL),
coalesce($4::timestamptz, now())
@@ -31,7 +31,7 @@ RETURNING name, created_at, metadata, paused_at, updated_at
`
type QueueCreateOrSetUpdatedAtParams struct {
- Metadata string
+ Metadata *string
Name string
PausedAt *time.Time
UpdatedAt *time.Time
diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml b/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml
index f8ebbee..67f48af 100644
--- a/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml
+++ b/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml
@@ -45,6 +45,12 @@ sql:
- db_type: "pg_catalog.interval"
go_type: "time.Duration"
+ - db_type: "text"
+ go_type:
+ type: "string"
+ pointer: true
+ nullable: true
+
- db_type: "timestamptz"
go_type: "time.Time"
diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go
index 9d70913..1213337 100644
--- a/riverdriver/riverdatabasesql/river_database_sql_driver.go
+++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go
@@ -770,7 +770,7 @@ func (e *Executor) PGAdvisoryXactLock(ctx context.Context, key int64) (*struct{}
func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverdriver.QueueCreateOrSetUpdatedAtParams) (*rivertype.Queue, error) {
queue, err := dbsqlc.New().QueueCreateOrSetUpdatedAt(ctx, e.dbtx, &dbsqlc.QueueCreateOrSetUpdatedAtParams{
- Metadata: valutil.ValOrDefault(string(params.Metadata), "{}"),
+ Metadata: params.Metadata,
Name: params.Name,
PausedAt: params.PausedAt,
UpdatedAt: params.UpdatedAt,
diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
index 0c4da1a..a7ae7ea 100644
--- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
+++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
@@ -15,7 +15,7 @@ INSERT INTO river_queue(
updated_at
) VALUES (
now(),
- coalesce(@metadata::jsonb, '{}'::jsonb),
+ jsonb(coalesce(sqlc.narg('metadata')::text, '{}')),
@name::text,
coalesce(sqlc.narg('paused_at')::timestamptz, NULL),
coalesce(sqlc.narg('updated_at')::timestamptz, now())
diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go
index 6351e85..94ae63e 100644
--- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go
+++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go
@@ -21,7 +21,7 @@ INSERT INTO river_queue(
updated_at
) VALUES (
now(),
- coalesce($1::jsonb, '{}'::jsonb),
+ jsonb(coalesce($1::text, '{}')),
$2::text,
coalesce($3::timestamptz, NULL),
coalesce($4::timestamptz, now())
@@ -32,7 +32,7 @@ RETURNING name, created_at, metadata, paused_at, updated_at
`
type QueueCreateOrSetUpdatedAtParams struct {
- Metadata []byte
+ Metadata *string
Name string
PausedAt *time.Time
UpdatedAt *time.Time
diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml b/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml
index 17ff029..8c6cd75 100644
--- a/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml
+++ b/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml
@@ -36,6 +36,12 @@ sql:
- db_type: "pg_catalog.interval"
go_type: "time.Duration"
+ - db_type: "text"
+ go_type:
+ type: "string"
+ pointer: true
+ nullable: true
+
- db_type: "timestamptz"
go_type: "time.Time"
Probably the most important part is the change to the SQL operation:
--- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
+++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
@@ -15,7 +15,7 @@ INSERT INTO river_queue(
updated_at
) VALUES (
now(),
- coalesce(@metadata::jsonb, '{}'::jsonb),
+ jsonb(coalesce(sqlc.narg('metadata')::text, '{}')),
@name::text,
coalesce(sqlc.narg('paused_at')::timestamptz, NULL),
coalesce(sqlc.narg('updated_at')::timestamptz, now())
I'll tell you though, getting this working in a way that both the Pgx and database/sql drivers will accept was like pulling teeth, and took quite a few iterations. The pattern would be similar for other SQL operations, but there's probably 50 other callsites that need fixup, so this would be a pretty substantial project.
I'd never touched QueryExecModeSimpleProtocol before, but given how limited its capabilities are, I'd hazard a guess that it was never meant to be used substantially or for anything too complex.
Another important issue with switching from the Bun driver to pgx is my app can no longer use pgdriver.NewListener for its own internal pub/sub needs, since this function casts the provided DB driver to bun.Driver. I basically have no choice but to create two DB drivers in parallel with their own separate connection pools.
On a related note, using riverdatabasesql (with underlying pgx stdlib wrapper) means that the River client doesn't support listeners. If I could stick with the Bun driver then I'd be able to easily create a custom River driver that supports listeners, something like:
func NewBunDriver(db *bun.DB) riverdriver.Driver[bun.IDB] {
return &bunDriver{Driver: riverdatabasesql.New(db.DB), db: db}
}
type bunDriver struct {
*riverdatabasesql.Driver
db *bun.DB
}
func (b *bunDriver) SupportsListener() bool { return true }
func (b *bunDriver) GetListener() riverdriver.Listener {
return &bunListener{Listener: pgdriver.NewListener(b.db)}
}
func (b *bunDriver) UnwrapExecutor(tx bun.IDB) riverdriver.ExecutorTx {
return b.Driver.UnwrapExecutor(tx.(bun.Tx).Tx)
}
type bunListener struct { *pgdriver.Listener }
func (b *bunListener) Connect(ctx context.Context) error {
// bun will connect on first listen, even with no channels
return b.Listener.Listen(ctx)
}
func (b *bunListener) WaitForNotification(ctx context.Context) (*riverdriver.Notification, error) {
if channel, payload, err := b.Receive(ctx); err != nil {
return nil, err
} else {
return &riverdriver.Notification{Topic: channel, Payload: payload}, nil
}
}
func (b *bunListener) Listen(ctx context.Context, topic string) error { return b.Listener.Listen(ctx, topic) }
func (b *bunListener) Unlisten(ctx context.Context, topic string) error { return b.Listener.Unlisten(ctx, topic) }
func (b *bunListener) Ping(context.Context) error { return nil } // bun runs its own ping loop
func (b *bunListener) Close(context.Context) error { return b.Listener.Close() }
This driver also extracts the *sql.Tx from bun.IDB so I can create a river.Client[bun.IDB] that seamlessly works with Bun transactions.
Sorry about the delay on this one, but it should be fixed now. Check out v0.22.0.
https://github.com/riverqueue/river/releases/tag/v0.22.0
Regarding the listener: we'll continue work on this one and should have an answer soon-ish. I've been working on a SQLite driver that also needs some sort of answer to listen/notify.
@brandur we can probably remove the part of the note on this doc page that links to this issue, right? https://riverqueue.com/docs/database-drivers#known-limitations-of-riverdatabasesql
Yeah that can go. I opened a docs PR taking it out.