sqlalchemy-trino
sqlalchemy-trino copied to clipboard
Use "SHOW CREATE TABLE" for more accurate reflection with Trino connectors
Originally filed as https://github.com/sqlalchemy/sqlalchemy/issues/7905
Describe the use case
Here is an issue filed against Pandas that connects to a Trino database and yields a bad result when using SQLAlchemy:
https://github.com/pandas-dev/pandas/issues/46661
Databases / Backends / Drivers targeted
Trino Iceberg connector with Hive metastore.
Example Use
Here is the reproducible example (copied from Pandas issue):
#!/usr/bin/env python
coding: utf-8
Note: this script requires connecting to a Trino instance using JWT access tokens. Please adapt to your Trino environment to reproduce
from dotenv import dotenv_values, load_dotenv import osc_ingest_trino as osc import os import pathlib
Load Environment Variables
dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src')) dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env' if os.path.exists(dotenv_path): load_dotenv(dotenv_path=dotenv_path,override=True)
import trino from sqlalchemy.engine import create_engine
env_var_prefix = 'TRINO'
sqlstring = 'trino://{user}@{host}:{port}/'.format( user = os.environ[f'{env_var_prefix}_USER'], host = os.environ[f'{env_var_prefix}_HOST'], port = os.environ[f'{env_var_prefix}_PORT'] ) sqlargs = { 'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']), 'http_scheme': 'https', 'catalog': 'osc_datacommons_dev' } engine = create_engine(sqlstring, connect_args = sqlargs) connection = engine.connect()
ingest_catalog = 'osc_datacommons_dev' ingest_schema = 'sandbox'
import pandas as pd
ticker_df = pd.DataFrame({'tname':['aapl','msft','goog','amzn','tsla','fb','ge','brk-a'], 'cik':[320193,789019,1652044,1018724,1318605,1326801,40545,1067983]}).convert_dtypes()
ingest_table = 'ticker_test' columnschema = osc.create_table_schema_pairs(ticker_df)
qres = engine.execute(f"drop table if exists {ingest_catalog}.{ingest_schema}.{ingest_table}") qres.fetchall()
tabledef = f""" create table if not exists {ingest_catalog}.{ingest_schema}.{ingest_table}( {columnschema} ) with ( partitioning = array['bucket(tname,20)'], format = 'ORC' ) """ print(tabledef) qres = engine.execute(tabledef) print(qres.fetchall()) ticker_df.to_sql(ingest_table, con=engine, schema=ingest_schema, if_exists='append', index=False, method=osc.TrinoBatchInsert(batch_size = 12000, verbose = True))
qres = engine.execute(f"show create table {ingest_schema}.{ingest_table}") orc_table = qres.fetchall() print(orc_table)
ticker_df.to_sql(ingest_table, con=engine, schema=ingest_schema, if_exists='replace', index=False, method=osc.TrinoBatchInsert(batch_size = 12000, verbose = True))
qres = engine.execute(f"show create table {ingest_schema}.{ingest_table}") replaced_table = qres.fetchall()
assert(orc_table==replaced_table)
Additional context
We are using Trino version 373, Pandas 1.4.1, and SQLAlchemy 1.4.27. Here are the details reported to the Pandas team:
python : 3.8.8.final.0 python-bits : 64 OS : Linux OS-release : 4.18.0-305.34.2.el8_4.x86_64 Version : https://github.com/pandas-dev/pandas/issues/1 SMP Mon Jan 17 09:42:23 EST 2022 machine : x86_64 processor : x86_64 byteorder : little LC_ALL : en_US.UTF-8 LANG : en_US.UTF-8 LOCALE : en_US.UTF-8
pandas : 1.4.2 numpy : 1.22.3 pytz : 2021.3 dateutil : 2.8.2 pip : 22.0.4 setuptools : 60.9.3 Cython : None pytest : None hypothesis : None sphinx : None blosc : None feather : None xlsxwriter : None lxml.etree : 4.6.4 html5lib : None pymysql : None psycopg2 : None jinja2 : 3.0.3 IPython : 8.1.1 pandas_datareader: None bs4 : 4.6.3 bottleneck : None brotli : None fastparquet : 0.8.1 fsspec : 2022.3.0 gcsfs : None markupsafe : 2.1.0 matplotlib : None numba : None numexpr : None odfpy : None openpyxl : 3.0.9 pandas_gbq : None pyarrow : 7.0.0 pyreadstat : None pyxlsb : None s3fs : None scipy : None snappy : None sqlalchemy : 1.4.27 tables : None tabulate : 0.8.9 xarray : None xlrd : None xlwt : None zstandard : None