Inserting Data into Databricks via the databricks-sql-python library (Leveraging SQLALCHEMY)
Issue Description: Inserting Data into Databricks via the databricks-sql-python library (Leveraging SQLALCHEMY)
Error Message:
sql
DatabaseError: (databricks.sql.exc.ServerOperationError) Column id is not specified in INSERT
[SQL: INSERT INTO model_integrated (name) VALUES (%(name)s)]
[parameters: {'name': 'Loadsheetname'}]
(Background on this error at: https://sqlalche.me/e/14/4xp6)
Overview: I'm encountering an issue when attempting to insert records into a Databricks database using SQLAlchemy. The error suggests that the id column is not specified in the INSERT statement, leading to a ServerOperationError.
For what it's worth, this works perfectly fine when inserting into a PostgreSQL database.
Steps to Reproduce:
- Connect to Databricks using SQLAlchemy.
- Define SQLAlchemy models, including an auto-incrementing primary key (id) column.
- Attempt to insert records into the model_integrated table.
- Encounter the mentioned error. Expected Behavior: I expect the records to be inserted successfully into the Databricks database, with the auto-incrementing id column being generated by the database.
Environment:
Python Version: 3.11.4 Databricks-sql-python: 3.0.1
I have verified that a similar approach works for a PostgreSQL database but fails in Databricks. The issue seems to be related to the auto-incrementing primary key behavior. Code Snippet:
python
import pandas as pd
import sqlalchemy.orm
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import Column, Integer, String, ForeignKey, Numeric, DateTime
from databricks import sql
connection = sql.connect(
server_hostname=HOST,
http_path=HTTP_PATH,
access_token=ACCESS_TOKEN)
print("Connection established")
# SQLAlchemy setup
Base = declarative_base()
# Model class for "model" table
class ModelIntegrated(Base):
__tablename__ = 'model_integrated'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String)
periods = relationship('PeriodIntegrated', backref=('model_integrated'))
# Model class for "period" table
class PeriodIntegrated(Base):
__tablename__ = 'period_integrated'
id = Column(Integer, primary_key=True, autoincrement=True)
model_id = Column(Integer, ForeignKey('model_integrated.id'))
name = Column(String)
solution = Column(Numeric)
start = Column(DateTime)
end = Column(DateTime)
period_order = Column(Integer)
catalog = "<catalog>"
conn_string = (
"databricks://token:{token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}".format(
token=ACCESS_TOKEN,
host=HOST,
port=<port>,
catalog=catalog,
http_path=HTTP_PATH,
schema="<schema>"
)
)
print("this is the conn_string", conn_string)
engine = create_engine(conn_string, echo=True)
print("engine executed")
Session = sessionmaker(bind=engine)
session = Session()
excel_file_path = "loadsheet.xlsx"
xls = pd.ExcelFile(excel_file_path)
for tab_name in xls.sheet_names:
print(f"Processing tab: {tab_name}")
df = pd.read_excel(excel_file_path, sheet_name=tab_name)
# Convert column names to lowercase
df.columns = df.columns.str.lower()
print("Data in the tab:")
print(df)
if tab_name == 'Model':
print("Processing Model data")
for _, row in df.iterrows():
# PostreSQL solution
model = ModelIntegrated(name=row['name'])
session.add(model)
session.commit() # Commit the transaction
# Retrieve the generated ID using a separate query
model_id = session.query(ModelIntegrated.id).filter_by(name=row['name']).scalar()
# session.flush() # Get the auto-generated ID
# model_id = model.id # Retrieve the ID
print(f"Inserted Model with name: {model.name}, ID: {model_id}")
elif tab_name == 'Period':
print("Processing Period data")
# Sort the DataFrame by "start" dates in ascending order
df_sorted = df.sort_values(by='start')
# Add a new column "period_order" with ascending integer values
df_sorted['period_order'] = range(1, len(df_sorted) + 1)
for _, row in df_sorted.iterrows():
period = PeriodIntegrated(
model_id=model_id,
name=row['name'],
solution=row['solution'],
start=datetime.strptime(row['start'], '%Y-%m-%d %I:%M:%S %p'), # Convert to datetime
end=datetime.strptime(row['end'], '%Y-%m-%d %I:%M:%S %p') # Convert to datetime
)
# Set the "period_order" attribute with the value from the DataFrame
period.period_order = row['period_order']
session.add(period)
print(f"Inserted Period with name: {period.name}, Period Order: {period.period_order}")
# Commit the changes
session.commit()
session.close()
Note: I have also reached out to the Databricks community for assistance.
Thank you, Brent
Thanks for the detailed write-up. There are a few things happening here.
First up, you cannot use autoincrement=True with the Databricks dialect. This is discussed in our dialect REAMDE here. Support for it will come in a future release.
Second, it looks like you're using the old SQLAlchemy 1.x syntax for your model code. The dialect included in databricks-sql-connector>=3.0.0 is built for SQLAlchemy 2.x exclusively. It may work but we can't guarantee it. You can see an example of the new syntax in our e2e tests here.
Third, the actual exception is a syntax error. SQLAlchemy is writing an invalid SQL statement by omitting the column names in the INSERT. I'm not clear how this is happening as it's not something we observe in the 1000+ INSERT test cases that we run during development. But I wonder if you may be using an older SQLAlchemy version below 2.0.0.
Which SQLAlchemy version do you have installed?
I tried reproducing this error locally with:
- Python 3.11.4
- SQLAlchemy==2.0.22
- databricks-sql-connector==3.0.1
and am not able to make SQLAlchemy emit an INSERT statement that omits the column names.
Can you provide a reproducible example? FWIW: I don't think the Excel file has any bearing on this behaviour. In my attempts to reproduce I used both an Excel file as input and a randomly generated pandas dataframe. It worked as expected in both cases.
Thanks @susodapop . I was running SQLAlchemy==1.4.49.
I upgraded to 2.0.22, but I'm still facing the same issue...
I think the issue is that within the excel file there is no "id" column.
I want SQLAlchemy to add the "Model" to Databricks. Have Databricks + SQLAlchemy issue an id (Primary Key), and then return that id for all the future tables which I can use as a Foreign Key.
This works for a PostgreSQL database with the exact same data. ''' for tab_name in xls.sheet_names: print(f"Processing tab: {tab_name}") df = pd.read_excel(excel_file_path, sheet_name=tab_name)
# Convert column names to lowercase
df.columns = df.columns.str.lower()
print("Data in the tab:")
print(df)
if tab_name == 'Model':
print("Processing Model data")
for _, row in df.iterrows():
model = ModelIntegrated(name=row['name'])
session.add(model)
session.flush() # Get the auto-generated ID
model_id = model.id # Retrieve the ID
print(f"Inserted Model with name: {model.name}, ID: {model_id}")
'''
Thank you, Brent
Just getting back to this after traveling.
Can you please provide a runnable reproduction? I have attempted to reproduce the issue going off the code you provided but the code works. I don't have access to your excel file (and as indicated above, I don't think the excel file is really the problem). Without reproduction steps we're blocked on implementing a fix.
Hi @susodapop here is a runnable reproduceable code. You'll just need to add your Databricks placeholders, and location to the excel file (attached)
import pandas as pd
from datetime import datetime
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Numeric, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
def process_excel_data(excel_file_path, host, http_path, access_token, catalog, schema):
# Replace placeholders with actual values
HOST = host
HTTP_PATH = http_path
ACCESS_TOKEN = access_token
# SQLAlchemy setup
Base = declarative_base()
# Model class for "model" table
class ModelIntegrated(Base):
__tablename__ = 'model_integrated'
id = Column(Integer, primary_key=True)
name = Column(String)
periods = relationship('PeriodIntegrated', backref=('model_integrated'))
# Model class for "period" table
class PeriodIntegrated(Base):
__tablename__ = 'period_integrated'
id = Column(Integer, primary_key=True)
model_id = Column(Integer, ForeignKey('model_integrated.id'))
name = Column(String)
solution = Column(Numeric)
start = Column(DateTime)
end = Column(DateTime)
period_order = Column(Integer)
conn_string = (
f"databricks://token:{ACCESS_TOKEN}@{HOST}?http_path={HTTP_PATH}&catalog={catalog}&schema={schema}"
)
engine = create_engine(conn_string, echo=True)
Session = sessionmaker(bind=engine)
session = Session()
xls = pd.ExcelFile(excel_file_path)
for tab_name in xls.sheet_names:
print(f"Processing tab: {tab_name}")
df = pd.read_excel(excel_file_path, sheet_name=tab_name)
# Convert column names to lowercase
df.columns = df.columns.str.lower()
print("Data in the tab:")
print(df)
if tab_name == 'Model':
print("Processing Model data")
for _, row in df.iterrows():
# PostgreSQL solution
model = ModelIntegrated(name=row['name'])
session.add(model)
session.commit() # Commit the transaction
# Retrieve the generated ID using a separate query
model_id = session.query(ModelIntegrated.id).filter_by(name=row['name']).scalar()
print(f"Inserted Model with name: {model.name}, ID: {model_id}")
elif tab_name == 'Period':
print("Processing Period data")
# Sort the DataFrame by "start" dates in ascending order
df_sorted = df.sort_values(by='start')
# Add a new column "period_order" with ascending integer values
df_sorted['period_order'] = range(1, len(df_sorted) + 1)
for _, row in df_sorted.iterrows():
period = PeriodIntegrated(
model_id=model_id,
name=row['name'],
solution=row['solution'],
start=datetime.strptime(row['start'], '%Y-%m-%d %I:%M:%S %p'), # Convert to datetime
end=datetime.strptime(row['end'], '%Y-%m-%d %I:%M:%S %p') # Convert to datetime
)
# Set the "period_order" attribute with the value from the DataFrame
period.period_order = row['period_order']
session.add(period)
print(f"Inserted Period with name: {period.name}, Period Order: {period.period_order}")
# Commit the changes
session.commit()
session.close()
# Example usage
excel_path = "<location>/githubtest.xlsx"
process_excel_data(excel_path, "<your_host>", "<your_http_path>", "<your_access_token>", "<your_catalog>", "<your_schema>")