ibis icon indicating copy to clipboard operation
ibis copied to clipboard

Cross-backend table creation and insertion

Open NickCrews opened this issue 1 year ago • 5 comments

Is your feature request related to a problem?

Inspired by https://github.com/ibis-project/ibis/issues/8110

I use duckdb for local analysis and processing and a hosted postgres to house our production data. I sometimes want to move data between them.

It would be great if I could do

duckdb = ibis.duckdb.connect()
sqlite = ibis.sqlite.connect()
pg = ibis.postgres.connect(URI)

# Currently I have something monkeypatched into the PG backend so I can do
pg.create_table("new_table", duckdb.table("t"))

# It would be neat if this even worked transitively (with duckdb in the middle, hidden)
pg.create_table("new_table", sqlite.table("t"))

# The full DDL APIs would be ideal:
pg.create_view("new_table", duckdb.table("t"))

Describe the solution you'd like

Current implementation (not quite runnable), I can make it so if you want

"""Utils for interacting with PostgreSQL databases."""
from dataclasses import dataclass
import re

import ibis
from ibis.backends.postgres import Backend as PGBackend
from ibis.expr.types import Table
from noatak.ibis import get_DuckDBPyConnection

from atlas.env import read_env


def connect(uri: str | None = None) -> PGBackend:
    """Connect to a PostgreSQL database.

    This is a wrapper around ibis.postgres.connect() that adds support for
    importing duckdb tables into PostgreSQL.
    """
    if uri is None:
        uri = read_env()["PG_URI"]
    conn_info = _PGConnInfo.from_uri(uri)
    backend = ibis.connect(conn_info.to_uri())

    def create_table(
        self, name, obj=None, *, schema=None, database=None, temp=False, overwrite=False
    ) -> None:
        if (
            isinstance(obj, Table)
            and obj._find_backend(use_default=True).name == "duckdb"
        ):
            if schema is not None:
                raise ValueError("Cannot specify schema for duckdb table")
            if database is not None:
                raise ValueError("Cannot specify database for duckdb table")
            return _import_duckdb_table(
                table=obj,
                pg_connection_info=conn_info,
                new_name=name,
                temp=temp,
                overwrite=overwrite,
            )
        else:
            return super(PGBackend, self).create_table(
                name, obj=obj, schema=schema, database=database, temp=temp
            )

    create_table.__doc__ = PGBackend.create_table.__doc__
    backend.create_table = create_table.__get__(backend)
    return backend


@dataclass
class _PGConnInfo:
    dbname: str
    host: str
    user: str
    password: str

    @classmethod
    def from_uri(cls, uri: str):
        """Parse a connection string into a PGConnInfo object.

        eg
        postgresql://NickCrews:[email protected]/scg-db?sslmode=require
        """
        uri = uri.removesuffix("?sslmode=require")
        pattern = r"postgresql://(?P<user>[^:]+):(?P<password>[^@]+)@(?P<host>[^/]+)/(?P<dbname>[^?]+)"
        match = re.match(pattern, uri)
        if match is None:
            raise ValueError(f"Could not parse connection string: {uri}")
        return cls(**match.groupdict())

    def to_uri(self) -> str:
        return f"postgresql://{self.user}:{self.password}@{self.host}/{self.dbname}"


def _import_duckdb_table(
    *,
    table: Table,
    pg_connection_info: str | _PGConnInfo,
    new_name: str,
    temp: bool = False,
    overwrite: bool = False,
) -> None:
    """Create a table in PostgreSQL from an (duckdb-based) Ibis table."""
    if not isinstance(pg_connection_info, _PGConnInfo):
        pg_connection_info = _PGConnInfo.from_uri(pg_connection_info)
    # we need to cache the table so that:
    # 1. the table name is qualified with main.ibis_cache_rwe632 (or whatever)
    #    so that we can load it into the pgdb database
    # 2. In case it is a memtable, so that it is actually materialized into
    #    a backend, so that ._find_backend() doesn't barf.
    table = table.cache()
    backend = table._find_backend()
    ddb_conn = get_DuckDBPyConnection(backend)
    i = pg_connection_info
    databases_present = [
        row[0] for row in ddb_conn.execute("SHOW DATABASES").fetchall()
    ]
    if "pgdb" not in databases_present:
        attach_sql = f"ATTACH 'dbname={i.dbname} host={i.host} user={i.user} password={i.password}' AS pgdb (TYPE postgres);"  # noqa
        ddb_conn.sql(attach_sql)
    src_sql = str(ibis.to_sql(table, dialect="duckdb"))
    temp_str = "TEMPORARY " if temp else " "
    overwrite_str = "IF NOT EXISTS" if overwrite else " "
    ddl_sql = f"CREATE{temp_str}TABLE{overwrite_str}pgdb.{new_name} AS {src_sql}"
    ddb_conn.sql(ddl_sql)

What version of ibis are you running?

main

What backend(s) are you using, if any?

No response

Code of Conduct

  • [X] I agree to follow this project's Code of Conduct

NickCrews avatar Jan 27 '24 02:01 NickCrews

It'd be great if this also worked with insert in addition to create_table.

ianmcook avatar Jan 30 '24 21:01 ianmcook

xref: #4800

lostmygithubaccount avatar Mar 04 '24 17:03 lostmygithubaccount

What would pg.create_view("my_view", duckdb_con.table("my_table")) do? Views are defined by queries to tables in the database, not sure what exactly it would mean to define a view in postgres based on tables in DuckDB.

cpcloud avatar Jul 23 '24 18:07 cpcloud

scoping it down to tables seems fine I think?

lostmygithubaccount avatar Jul 23 '24 18:07 lostmygithubaccount

hmm, you are definitely right, pg.create_view("my_view", duckdb_con.table("my_table")) should error. Limiting to .create_table() would be totally adequate.

NickCrews avatar Jul 23 '24 20:07 NickCrews