pglite
pglite copied to clipboard
`electric.syncShapesToTables` does not support inserting custom array types
Schema has:
enum Role {
admin
assistant
owner
doctor
}
model UserOrganizationAssociation {
roles Role[]
organization_ids String[]
}
CREATE TYPE "public"."user_organization_roles" AS ENUM ('owner', 'admin', 'doctor', 'assistant');
CREATE TABLE "public"."user_organization_associations_synced" (
"id" UUID NOT NULL,
"organization_id" UUID NOT NULL,
"user_id" UUID NOT NULL,
"roles" "public"."user_organization_roles"[],
"created_at" TIMESTAMPTZ(6) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMPTZ(6) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"joined_using_invite_id" UUID,
"write_id" UUID,
CONSTRAINT "user_organization_associations_synced_pkey" PRIMARY KEY ("id")
);
With initialInsertMethod: "json" (it seems to be passing a native JS array as one of the arguments to the exec method). This gets accidentally serialized to "malformed array literal: "owner,doctor"
runQuery
INSERT INTO "public"."user_organization_associations_synced"
("created_at", "id", "joined_using_invite_id", "organization_id", "roles", "updated_at", "user_id")
VALUES
($1, $2, $3, $4, $5, $6, $7), ($8, $9, $10, $11, $12, $13, $14), ($15, $16, $17, $18, $19, $20, $21), ($22, $23, $24, $25, $26, $27, $28), ($29, $30, $31, $32, $33, $34, $35), ($36, $37, $38, $39, $40, $41, $42), ($43, $44, $45, $46, $47, $48, $49), ($50, $51, $52, $53, $54, $55, $56), ($57, $58, $59, $60, $61, $62, $63), ($64, $65, $66, $67, $68, $69, $70), ($71, $72, $73, $74, $75, $76, $77)
(77) ['2025-06-06 19:23:56.336166+00', '38e3b884-6637-54a7-95e4-86dfedd67324', null, '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '52ecc26a-3cfd-492b-b3ff-3885dcdc775f', '2025-06-06 19:23:56.336166+00', '37dd52cb-a0f9-5e0b-99e6-0bc68a42a2e7', '8ad4725a-9f9c-5dd6-8627-ada4ccdcd1aa', '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '339a4295-0e00-5633-9b16-4c96ca3caa03', '2025-06-06 19:23:56.336166+00', '34aca715-795b-506f-9bfc-e44ba05b9bf0', '72a79d77-2684-5d97-b3ed-4dd7e6584c2c', '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '710d1eca-5419-5755-9a58-9a0d96cad729', '2025-06-06 19:23:56.336166+00', 'cd074223-0b83-5d0d-b6a9-f828c0bda787', '45fa0aa0-4d7c-5dac-9147-ea1e1fd6cb41', '56997b12-a42d-4994-b851-f37a5be47acb', Array(3), '2025-06-06 19:23:56.336166+00', '8c13e3d1-4b1a-5315-8146-edb396c9edb5', '2025-06-06 19:23:56.336166+00', '9a0a2739-f183-53aa-8c8c-0b06c4109ba8', 'd59ec517-2dc6-5693-8f29-a4ffedf7da03', '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '98bafc3e-2d49-5542-aedc-77f05e915f23', '2025-06-06 19:23:56.336166+00', '920f379d-8b2b-540f-939a-570f2fb68b1f', 'f718a337-7b88-550d-8325-20a71e122b65', '56997b12-a42d-4994-b851-f37a5be47acb', Array(1), '2025-06-06 19:23:56.336166+00', '5defc2fb-3cae-54b3-9e83-7a02b0e9d7eb', '2025-06-06 19:23:56.336166+00', '74ad95c6-3dd0-58e3-bb63-30a4d9fe4819', '78f47e35-03c3-5bd0-871a-1874f3c5184f', '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '3d8ac1f0-26a9-50ae-9184-56046efb9797', '2025-06-06 19:23:56.336166+00', '89bb74b4-ad07-5013-abc5-ad23a5ca6123', '7877a627-cd3d-5fd8-a814-2f95766cf6ec', '56997b12-a42d-4994-b851-f37a5be47acb', Array(1), '2025-06-06 19:23:56.336166+00', 'a9b14102-41db-5f90-9e2b-e6df88b6c7b4', '2025-06-06 19:23:56.336166+00', '668b831c-8d38-5480-922d-b01fcae71752', '7ad512a1-1e8a-5a07-a7b6-4c0eed75b918', '56997b12-a42d-4994-b851-f37a5be47acb', Array(3), '2025-06-06 19:23:56.336166+00', 'dd3e43ec-444c-5ade-a2ac-56a1ddc21315', '2025-06-06 19:23:56.336166+00', 'b18b4f3e-3047-5cdb-ba41-fbeca3b28783', '78c371a5-9024-5b01-8961-3cc5d8cd27b5', '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '69ae54a9-4ba5-5c96-932f-fb6d24bf6749', '2025-06-06 19:23:56.336166+00', '42bdb043-3051-554c-a600-6dc4430c6704', 'ad130f7e-8bec-53d2-bef8-bbae4d85ce9e', '56997b12-a42d-4994-b851-f37a5be47acb', Array(3), '2025-06-06 19:23:56.336166+00', '7f0b6f6e-d5e3-58fd-99cf-74ec53fd2c97']
With initialInsertMethod: "insert": fails with the same error as above
With CSV: Received extra data after last expected column
The weird thing is that the electric proxy is returning these values as a string {owner,doctor} which can be inserted as-is. Somewhere the type is being parsed as a JS array even though I don't really have any custom serializers/parser anywhere.
export async function init(
userId: string | null,
dataDir: string,
onSynced: () => void
) {
const fetchWrapper = async (...args: Parameters<typeof fetch>) => {
const modifiedArgs = [...args];
console.log("[Electric] fetchWrapper", modifiedArgs);
const headers = new Headers(
(modifiedArgs[1] as RequestInit)?.headers || {}
);
const query = modifiedArgs[0] as string;
const params = new URLSearchParams(query);
const table = params.get("table");
// Set authorization token
headers.set(
"Authorization",
`Bearer ${(await supabase.auth.getSession()).data.session?.access_token}`
);
modifiedArgs[1] = { ...(modifiedArgs[1] as RequestInit), headers };
const response = await fetch(
...(modifiedArgs as [RequestInfo, RequestInit?])
);
let json = await response.json();
return new Response(JSON.stringify(json), {
status: response.status,
statusText: response.statusText,
headers: response.headers,
});
};
console.log("[Electric] initializing worker");
const pg = await PGliteWorker.create(
new Worker(new URL("./electric/worker/initWorker.js", location.origin), {
type: "module",
// Add name for better debugging
name: "electric-sql-worker",
}),
{
dataDir,
debug: 3,
extensions: {
electric: electricSync({ debug: true }),
live,
// pg_trgm,
},
}
);
await pg.waitReady;
await pg.exec(/*sql*/ `
-- Enable pg_trgm extension
CREATE EXTENSION IF NOT EXISTS pg_trgm;
`);
console.log("[Electric] worker initialized");
const db = new Kysely<GeneratedDatabaseType>({
dialect: new PGliteDialect(pg as unknown as PGlite),
log: (msg) => console.log("[Electric]", msg),
});
const migrator = new Migrator({
migrationTableName: "kysely_migration",
migrationLockTableName: "kysely_migration_lock",
provider: new LocalMigrationsProvider(),
db,
});
console.log("[Electric] Migrating to latest");
const migrationResults = await migrator.migrateToLatest();
console.log("[Electric] Migrations applied", migrationResults);
// Print all tables
const tables = await pg.query(/*sql*/ `
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
`);
console.log("[Electric] Tables:", tables);
await pg.exec(createOrReplaceViewStatements.join(";"));
// Print all views
const views = await pg.query(/*sql*/ `
SELECT table_name
FROM information_schema.views
WHERE table_schema = 'public'
`);
console.log("[Electric] Views:", views);
console.log("[Electric] View Statements:", createOrReplaceViewStatements);
await setUpFts(pg);
console.log("[Electric] pg initialized, starting sync...", pg);
const createShape = <TableName extends keyof GeneratedDatabaseType>(
table: TableName
): TableSyncOptions<TableName> => {
return {
schema: "public",
table: `${table}_synced`,
shape: {
fetchClient: fetchWrapper,
url: "http://127.0.0.1:3001/v1/shape",
params: { table },
},
primaryKey: ["id"],
};
};
const syncedTables: (keyof GeneratedDatabaseType)[] = [
"user_profiles",
"organizations",
"user_organization_associations",
"patient_profiles",
];
const shapes: Partial<Shapes> = Object.fromEntries(
syncedTables.map((table) => [table, createShape(table)])
);
const sync = await pg.electric.syncShapesToTables({
shapes,
key: userId || null,
initialInsertMethod: "json",
onInitialSync: () => {
onSynced();
console.log("[Electric] Initial sync complete");
},
});
return { db, pg, sync };
}
worker:
import { MemoryFS, PGlite } from "@electric-sql/pglite";
import { electricSync } from "@electric-sql/pglite-sync";
import { live } from "@electric-sql/pglite/live";
import { pg_trgm } from "@electric-sql/pglite/contrib/pg_trgm";
import { type PGliteWorkerOptions, worker } from "@electric-sql/pglite/worker";
const initPglite = async (
options: Exclude<PGliteWorkerOptions, "extensions">
) => {
console.log("[ELECTRIC] [WORKER] initPglite", options);
const [fetchedWasm, fetchedFsBundle] = await Promise.all([
fetch("./pglite.wasm"),
fetch("./pglite.data"),
]);
console.log("[ELECTRIC] [WORKER] fetchedWasm");
console.log("[ELECTRIC] [WORKER] fetchedFsBundle");
const wasmModule = await WebAssembly.compileStreaming(fetchedWasm);
console.log("[ELECTRIC] [WORKER] wasmModule compiled", wasmModule);
const db = await PGlite.create({
...options,
wasmModule,
debug: 3,
fs: new MemoryFS(),
fsBundle: await fetchedFsBundle.blob(),
extensions: {
electric: electricSync({ debug: true }),
live,
pg_trgm,
},
});
return db;
};
await worker({
async init(options) {
// Create and return a PGlite instance
return await initPglite(options);
},
});