pglite icon indicating copy to clipboard operation
pglite copied to clipboard

`electric.syncShapesToTables` does not support inserting custom array types

Open forabi opened this issue 6 months ago • 0 comments

Schema has:

enum Role {
  admin
  assistant
  owner
  doctor
}

model UserOrganizationAssociation {
  roles Role[]
  organization_ids String[]
}

CREATE TYPE "public"."user_organization_roles" AS ENUM ('owner', 'admin', 'doctor', 'assistant');


CREATE TABLE "public"."user_organization_associations_synced" (
    "id" UUID NOT NULL,
    "organization_id" UUID NOT NULL,
    "user_id" UUID NOT NULL,
    "roles" "public"."user_organization_roles"[],
    "created_at" TIMESTAMPTZ(6) NOT NULL DEFAULT CURRENT_TIMESTAMP,
    "updated_at" TIMESTAMPTZ(6) NOT NULL DEFAULT CURRENT_TIMESTAMP,
    "joined_using_invite_id" UUID,
    "write_id" UUID,

    CONSTRAINT "user_organization_associations_synced_pkey" PRIMARY KEY ("id")
);

With initialInsertMethod: "json" (it seems to be passing a native JS array as one of the arguments to the exec method). This gets accidentally serialized to "malformed array literal: "owner,doctor"

runQuery 
      INSERT INTO "public"."user_organization_associations_synced"
      ("created_at", "id", "joined_using_invite_id", "organization_id", "roles", "updated_at", "user_id")
      VALUES
      ($1, $2, $3, $4, $5, $6, $7), ($8, $9, $10, $11, $12, $13, $14), ($15, $16, $17, $18, $19, $20, $21), ($22, $23, $24, $25, $26, $27, $28), ($29, $30, $31, $32, $33, $34, $35), ($36, $37, $38, $39, $40, $41, $42), ($43, $44, $45, $46, $47, $48, $49), ($50, $51, $52, $53, $54, $55, $56), ($57, $58, $59, $60, $61, $62, $63), ($64, $65, $66, $67, $68, $69, $70), ($71, $72, $73, $74, $75, $76, $77)
     (77) ['2025-06-06 19:23:56.336166+00', '38e3b884-6637-54a7-95e4-86dfedd67324', null, '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '52ecc26a-3cfd-492b-b3ff-3885dcdc775f', '2025-06-06 19:23:56.336166+00', '37dd52cb-a0f9-5e0b-99e6-0bc68a42a2e7', '8ad4725a-9f9c-5dd6-8627-ada4ccdcd1aa', '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '339a4295-0e00-5633-9b16-4c96ca3caa03', '2025-06-06 19:23:56.336166+00', '34aca715-795b-506f-9bfc-e44ba05b9bf0', '72a79d77-2684-5d97-b3ed-4dd7e6584c2c', '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '710d1eca-5419-5755-9a58-9a0d96cad729', '2025-06-06 19:23:56.336166+00', 'cd074223-0b83-5d0d-b6a9-f828c0bda787', '45fa0aa0-4d7c-5dac-9147-ea1e1fd6cb41', '56997b12-a42d-4994-b851-f37a5be47acb', Array(3), '2025-06-06 19:23:56.336166+00', '8c13e3d1-4b1a-5315-8146-edb396c9edb5', '2025-06-06 19:23:56.336166+00', '9a0a2739-f183-53aa-8c8c-0b06c4109ba8', 'd59ec517-2dc6-5693-8f29-a4ffedf7da03', '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '98bafc3e-2d49-5542-aedc-77f05e915f23', '2025-06-06 19:23:56.336166+00', '920f379d-8b2b-540f-939a-570f2fb68b1f', 'f718a337-7b88-550d-8325-20a71e122b65', '56997b12-a42d-4994-b851-f37a5be47acb', Array(1), '2025-06-06 19:23:56.336166+00', '5defc2fb-3cae-54b3-9e83-7a02b0e9d7eb', '2025-06-06 19:23:56.336166+00', '74ad95c6-3dd0-58e3-bb63-30a4d9fe4819', '78f47e35-03c3-5bd0-871a-1874f3c5184f', '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '3d8ac1f0-26a9-50ae-9184-56046efb9797', '2025-06-06 19:23:56.336166+00', '89bb74b4-ad07-5013-abc5-ad23a5ca6123', '7877a627-cd3d-5fd8-a814-2f95766cf6ec', '56997b12-a42d-4994-b851-f37a5be47acb', Array(1), '2025-06-06 19:23:56.336166+00', 'a9b14102-41db-5f90-9e2b-e6df88b6c7b4', '2025-06-06 19:23:56.336166+00', '668b831c-8d38-5480-922d-b01fcae71752', '7ad512a1-1e8a-5a07-a7b6-4c0eed75b918', '56997b12-a42d-4994-b851-f37a5be47acb', Array(3), '2025-06-06 19:23:56.336166+00', 'dd3e43ec-444c-5ade-a2ac-56a1ddc21315', '2025-06-06 19:23:56.336166+00', 'b18b4f3e-3047-5cdb-ba41-fbeca3b28783', '78c371a5-9024-5b01-8961-3cc5d8cd27b5', '56997b12-a42d-4994-b851-f37a5be47acb', Array(2), '2025-06-06 19:23:56.336166+00', '69ae54a9-4ba5-5c96-932f-fb6d24bf6749', '2025-06-06 19:23:56.336166+00', '42bdb043-3051-554c-a600-6dc4430c6704', 'ad130f7e-8bec-53d2-bef8-bbae4d85ce9e', '56997b12-a42d-4994-b851-f37a5be47acb', Array(3), '2025-06-06 19:23:56.336166+00', '7f0b6f6e-d5e3-58fd-99cf-74ec53fd2c97']

With initialInsertMethod: "insert": fails with the same error as above

With CSV: Received extra data after last expected column

The weird thing is that the electric proxy is returning these values as a string {owner,doctor} which can be inserted as-is. Somewhere the type is being parsed as a JS array even though I don't really have any custom serializers/parser anywhere.


export async function init(
  userId: string | null,
  dataDir: string,
  onSynced: () => void
) {
  const fetchWrapper = async (...args: Parameters<typeof fetch>) => {
    const modifiedArgs = [...args];
    console.log("[Electric] fetchWrapper", modifiedArgs);
    const headers = new Headers(
      (modifiedArgs[1] as RequestInit)?.headers || {}
    );

    const query = modifiedArgs[0] as string;
    const params = new URLSearchParams(query);
    const table = params.get("table");
    // Set authorization token
    headers.set(
      "Authorization",
      `Bearer ${(await supabase.auth.getSession()).data.session?.access_token}`
    );

    modifiedArgs[1] = { ...(modifiedArgs[1] as RequestInit), headers };
    const response = await fetch(
      ...(modifiedArgs as [RequestInfo, RequestInit?])
    );

    let json = await response.json();

    return new Response(JSON.stringify(json), {
      status: response.status,
      statusText: response.statusText,
      headers: response.headers,
    });
  };

  console.log("[Electric] initializing worker");
  const pg = await PGliteWorker.create(
    new Worker(new URL("./electric/worker/initWorker.js", location.origin), {
      type: "module",
      // Add name for better debugging
      name: "electric-sql-worker",
    }),
    {
      dataDir,
      debug: 3,
      extensions: {
        electric: electricSync({ debug: true }),
        live,
        // pg_trgm,
      },
    }
  );

  await pg.waitReady;

  await pg.exec(/*sql*/ `
    -- Enable pg_trgm extension
    CREATE EXTENSION IF NOT EXISTS pg_trgm;
  `);

  console.log("[Electric] worker initialized");

  const db = new Kysely<GeneratedDatabaseType>({
    dialect: new PGliteDialect(pg as unknown as PGlite),
    log: (msg) => console.log("[Electric]", msg),
  });

  const migrator = new Migrator({
    migrationTableName: "kysely_migration",
    migrationLockTableName: "kysely_migration_lock",
    provider: new LocalMigrationsProvider(),
    db,
  });
  console.log("[Electric] Migrating to latest");
  const migrationResults = await migrator.migrateToLatest();
  console.log("[Electric] Migrations applied", migrationResults);

  // Print all tables
  const tables = await pg.query(/*sql*/ `
    SELECT table_name 
    FROM information_schema.tables 
    WHERE table_schema = 'public'
  `);
  console.log("[Electric] Tables:", tables);

  await pg.exec(createOrReplaceViewStatements.join(";"));

  // Print all views
  const views = await pg.query(/*sql*/ `
    SELECT table_name 
    FROM information_schema.views 
    WHERE table_schema = 'public'
  `);
  console.log("[Electric] Views:", views);
  console.log("[Electric] View Statements:", createOrReplaceViewStatements);

  await setUpFts(pg);
  console.log("[Electric] pg initialized, starting sync...", pg);

  const createShape = <TableName extends keyof GeneratedDatabaseType>(
    table: TableName
  ): TableSyncOptions<TableName> => {
    return {
      schema: "public",
      table: `${table}_synced`,
      shape: {
        fetchClient: fetchWrapper,
        url: "http://127.0.0.1:3001/v1/shape",
        params: { table },
      },
      primaryKey: ["id"],
    };
  };

  const syncedTables: (keyof GeneratedDatabaseType)[] = [
    "user_profiles",
    "organizations",
    "user_organization_associations",
    "patient_profiles",
  ];

  const shapes: Partial<Shapes> = Object.fromEntries(
    syncedTables.map((table) => [table, createShape(table)])
  );

  const sync = await pg.electric.syncShapesToTables({
    shapes,
    key: userId || null,
    initialInsertMethod: "json",
    onInitialSync: () => {
      onSynced();
      console.log("[Electric] Initial sync complete");
    },
  });

  return { db, pg, sync };
}

worker:

import { MemoryFS, PGlite } from "@electric-sql/pglite";
import { electricSync } from "@electric-sql/pglite-sync";
import { live } from "@electric-sql/pglite/live";
import { pg_trgm } from "@electric-sql/pglite/contrib/pg_trgm";
import { type PGliteWorkerOptions, worker } from "@electric-sql/pglite/worker";

const initPglite = async (
  options: Exclude<PGliteWorkerOptions, "extensions">
) => {
  console.log("[ELECTRIC] [WORKER] initPglite", options);
  const [fetchedWasm, fetchedFsBundle] = await Promise.all([
    fetch("./pglite.wasm"),
    fetch("./pglite.data"),
  ]);
  console.log("[ELECTRIC] [WORKER] fetchedWasm");
  console.log("[ELECTRIC] [WORKER] fetchedFsBundle");

  const wasmModule = await WebAssembly.compileStreaming(fetchedWasm);
  console.log("[ELECTRIC] [WORKER] wasmModule compiled", wasmModule);
  const db = await PGlite.create({
    ...options,
    wasmModule,
    debug: 3,
    fs: new MemoryFS(),
    fsBundle: await fetchedFsBundle.blob(),
    extensions: {
      electric: electricSync({ debug: true }),
      live,
      pg_trgm,
    },
  });

  return db;
};

await worker({
  async init(options) {
    // Create and return a PGlite instance
    return await initPglite(options);
  },
});

forabi avatar Jun 11 '25 22:06 forabi