support postgres xmin replication
Feature description
We have a number of postgres sources that cannot be configured for CDC and many tables are large and don't have a suitable incremental/cursor column (ie. last_changed).
Am currently using airbyte for most our streams as they support cdc, xmin and user supplied cursor column. It would be nice if the postgres source supported all three as well (if I understand correctly it only supports cdc, you have to use sql_database for user supplied and xmin isn't supported).
I think I can use query_adapter_callback to implement xmin (I know this can wrap but it's a work-around until I can get last_changed implemented on our source tables) but it would be nice if the postgres source supported all three (or I've misread the documentation, in which case maybe it's not as clear as it could be?)
Are you a dlt user?
Yes, I run dlt in production.
Use case
I'm currently using a mix of airbyte and dlt as neither cover all my use cases.
Proposed solution
No response
Related issues
No response
@waterworthd-cim please take a look at https://github.com/dlt-hub/verified-sources where we have postgres replication source.
Correct me if I'm wrong but that source requires use of cdc, not xmin according to the documentation? I'm replicating from a Postgres replica, not a primary so CDC isn't available.
@david-waterworth OK so there's something I'm not aware of :) could you point me to some docs that explain what xmin replicaiton is? we have a regular cdc and one that is right now in PR that replicates from WAL (for old postgres versions): https://github.com/dlt-hub/verified-sources/pull/589
if there's a trick Airbyte does then it would be good to know.
OK I've found it. it seems you just need to add xmin to a list of columns that are selected, just use query_adapter callback to do it and define incremental loading on it. see our docs for examples.
Yeah I've done that with the sql_table / psycopg2 / sqlalchemy (although I couldn't just use query_adapter_callback - I also had to use type_adapter_callback or dlt would error). Are you saying that this is also supported with the postgres source?
For anyone else, this is more or less what I did
def type_adapter_callback(table) -> None:
required_columns = [("xmin", sqltypes.BigInteger, {"nullable": True})]
for col_name, col_type, col_kwargs in required_columns:
if col_name not in table.c:
table.append_column(sa.Column(col_name, col_type, **col_kwargs))
def query_adapter_callback(query, table, incremental=None, _engine=None) -> sa.TextClause:
return sa.text(
f"SELECT {table.fullname}.*,"
f" xmin::text::bigint as xmin FROM {table.fullname}"
" WHERE"
f" {incremental.cursor_path}::text::bigint >= ({incremental.start_value}::int8)"
)
return query
if by postgres source you mean pg_replication then no, xmin is just a regular query. you code is cool! we'll add it to our docs.
btw.
f" {incremental.cursor_path}::text::bigint >= ({incremental.start_value}::int8)"
aren't you getting duplicates in your data? I think you are re-taking the last row with >=?
do you still have the exception you've got? dlt should be able to generate types automatically for the missing columns
@rudolfix the >= was deliberate although possibly note required. I based it off an example I found somewhere and there was a discussion around > vs >= and whether it was better to risk duplicates (which will get fixed with merge at an additional cost) vs gaps. I don't think gaps are very likely with xmin, but if you're using a low resolution timestamp column I feel like you could have multiple rows on the source with the same timestamp but not generated at the exact instance of time, and dlt could run in between these updates so only get some of them. So figured better be safe than sorry.
I don't have the exception but I can probably recreate, so if I managed I'll post here. It worked fine when I used connectorx (based on this example - https://dlthub.com/blog/dlt-arrow-loading). When I migrated to sql_table (specifically doing a case in the sql in the query_adapter_callback) it didn't work. I assumed it was using the underlying table and not the sql query to generate the schema but that was a guess that I never verified. i.e. my actual code is less general that above, i.e. when I had to convert NUMERIC to FLOAT I used
def query_adapter_callback(query, table, incremental=None, _engine=None) -> sa.TextClause:
return sa.text(
f"SELECT data::FLOAT AS data,"
f" xmin::text::bigint as xmin FROM {table.fullname}"
" WHERE"
f" {incremental.cursor_path}::text::bigint >= ({incremental.start_value}::int8)"
)
return query
and then added
def type_adapter_callback(sql_type):
if isinstance(sql_type, sa.NUMERIC):
return FLOAT
return sql_type