trino-python-client
trino-python-client copied to clipboard
Trino-python-client not returning/capturing External Errors
Trino Python client not identifying the external errors and exiting successfully. Looks like it's not waiting until the end of transaction or relying on transaction status.
Connector: Hive Trino Version: 356 Trino-python Client: 0.305.0
Steps to Reproduce:
- Create stage Table without partition
CREATE TABLE dl_prod.prod_tmp.trino_python_issue_stg (
id int,
c_name varchar,
file_date varchar
)
WITH (
format = 'ORC',
external_location='hdfs://datalakeprod/data/dl/test/trino_python_issue_stg/'
)
- Insert Data into Stage Table
INSERT INTO dl_prod.prod_tmp.trino_python_issue_stg
(id,c_name,file_date) VALUES(1,'subba','20210811')
,(2,'subba','20210812')
- Create another table with partitions
CREATE TABLE dl_prod.prod_tmp.trino_python_issue (
id int,
c_name varchar,
file_date varchar
)
WITH (
format = 'ORC',
external_location='hdfs://datalakeprod/data/dl/test/trino_python_issue/',
partitioned_by = ARRAY['file_date']
)
-
Enabled Overwrite
set session dl_prod.insert_existing_partitions_behavior = 'overwrite'
-
Insert Data into Final Table
INSERT INTO dl_prod.prod_tmp.trino_python_issue
SELECT * FROM dl_prod.prod_tmp.trino_python_issue_stg
6)Drop Final Table
DROP TABLE prod_tmp.trino_python_issue
- Re-Create the final Table
CREATE TABLE dl_prod.prod_tmp.trino_python_issue (
id int,
c_name varchar,
file_date varchar
)
WITH (
format = 'ORC',
external_location='hdfs://datalakeprod/data/dl/test/trino_python_issue/',
partitioned_by = ARRAY['file_date']
)
- Run the below Insert statement. It will fail with external Error
INSERT INTO dl_prod.prod_tmp.trino_python_issue
SELECT * FROM dl_prod.prod_tmp.trino_python_issue_stg
Error in DBeaver:
SQL Error [16777231]: Query failed (#20210812_000114_00000_q5wrz): Unable to rename from hdfs://datalakeprod/tmp/presto/914b6fd7-f658-4a27-a69d-918c74a1bf28/file_date=20210812 to hdfs://datalakeprod/data/dl/test/trino_python_issue/file_date=20210812: target directory already exists
Trino UI:
io.trino.spi.TrinoException: Unable to rename from hdfs://datalakeprod/tmp/presto/36f2a894-77db-4abd-ac01-aa9e9cb2b5da/file_date=20210812 to hdfs://datalakeprod/data/dl/test/trino_python_issue/file_date=20210812: target directory already exists
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.renameDirectory(SemiTransactionalHiveMetastore.java:2530)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore$Committer.prepareAddPartition(SemiTransactionalHiveMetastore.java:1858)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commitShared(SemiTransactionalHiveMetastore.java:1384)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commit(SemiTransactionalHiveMetastore.java:1162)
at io.trino.plugin.hive.HiveMetadata.commit(HiveMetadata.java:2988)
at io.trino.plugin.hive.HiveConnector.commit(HiveConnector.java:225)
at io.trino.transaction.InMemoryTransactionManager$TransactionMetadata$ConnectorTransactionMetadata.commit(InMemoryTransactionManager.java:594)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:80)
at io.trino.$gen.Trino_356____20210811_064245_2.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
- Run the same Insert statement using python script. It returns output 2 and exit successfully. If you check the Trino UI. You can see the external error and No data loaded into Final table.
Output:
[2]
Load Time: --- 6.178645610809326 seconds ---
Trino UI:
io.trino.spi.TrinoException: Unable to rename from hdfs://datalakeprod/tmp/presto/c0a35086-3750-4b4d-9f28-d3b92734303d/file_date=20210812 to hdfs://datalakeprod/data/dl/test/trino_python_issue/file_date=20210812: target directory already exists
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.renameDirectory(SemiTransactionalHiveMetastore.java:2530)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore$Committer.prepareAddPartition(SemiTransactionalHiveMetastore.java:1858)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commitShared(SemiTransactionalHiveMetastore.java:1384)
at io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.commit(SemiTransactionalHiveMetastore.java:1162)
at io.trino.plugin.hive.HiveMetadata.commit(HiveMetadata.java:2988)
at io.trino.plugin.hive.HiveConnector.commit(HiveConnector.java:225)
at io.trino.transaction.InMemoryTransactionManager$TransactionMetadata$ConnectorTransactionMetadata.commit(InMemoryTransactionManager.java:594)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:80)
at io.trino.$gen.Trino_356____20210811_064245_2.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Python Code:
import sys, os, time, datetime, platform
from datetime import date
from requests.auth import HTTPBasicAuth
import requests.packages.urllib3 as urllib3
urllib3.disable_warnings()
import trino
def trino_query_runner(query):
TRINO_CONN._http_session.verify = False
cd=date.today()
start_time = time.time()
cur_trino = TRINO_CONN.cursor()
cur_trino.execute("set session dl_prod.insert_existing_partitions_behavior = 'overwrite'")
rows = cur_trino.fetchone()
cur_trino.execute(query)
rows = cur_trino.fetchone()
print(rows)
print("Load Time: --- %s seconds ---" % (time.time() - start_time))
def trino_python_connector_issue():
INST_SQL = """
INSERT INTO dl_prod.prod_tmp.trino_python_issue
SELECT * FROM dl_prod.prod_tmp.trino_python_issue_stg
"""
trino_query_runner(INST_SQL)
if __name__ == "__main__":
if platform.system() == 'Linux':
trino_python_connector_issue()
elif platform.system() == 'Windows':
TRINO_CONN=trino.dbapi.connect(
host='trino.dl.corp.test.com',
port=443,
user='svc-dl-db-full',
catalog='dl_prod',
schema='dbo',
http_scheme='https',
http_headers={'X-Trino-Time-Zone':'America/New_York'},
auth=trino.auth.BasicAuthentication("svc-dl-db-full", ""),
)
trino_python_connector_issue()
As far as I tried locally, fetchone
doesn't capture the external error in this case, but fetchall
does it. Could you try fetchall
as workaround?
Yes. fetchall worked. Looking forward for the fix. It will be great if we have status code property also along with data. So, That we can check the status.
@subbareddydagumati I cannot reproduce this issue, could you please check if you still encounter this with newest Trino and trino-python-client version? Eventually could you please guide me how to reproduce it? I followed the steps you described.
Fixed by #220.
@mdesmet Do you think it's possible to add a test to cover this? I couldn't find any existing test to verify failed queries actually appear failed to the client.