polars
polars copied to clipboard
write_database is Significantly slower than .to_pandas().to_sql()
Polars version checks
-
[X] I have checked that this issue has not already been reported.
-
[X] I have confirmed this bug exists on the latest version of Polars.
Issue description
I have been using the .to_pandas().to_sql()
for a while now, and with polars latest version it's now possible to use .write_database(). However, I have found it to be much slower than the previous option.
Having looked at the code, it seems that the only difference is the usage of use_pyarrow_extension_array=True
. This might be an issue from pandas side, as I also tested .to_pandas(use_pyarrow_extension_array=True).to_sql()
and for the test cases, it more than doubled the exporting time (on my use cases, around 2.5x slower)
Reproducible example
try:
from sqlalchemy import create_engine
except ImportError as exc:
raise ImportError(
"'sqlalchemy' not found. Install polars with 'pip install polars[sqlalchemy]'."
) from exc
engine = create_engine(connection_uri)
# this conversion to pandas as zero-copy
# so we can utilize their sql utils for free
self.to_pandas(use_pyarrow_extension_array=True).to_sql(
name=table_name, con=engine, if_exists=if_exists, index=False
)
Expected behavior
So, while .to_pandas(use_pyarrow_extension_array=True)
is indeed faster, it seems .to_pandas().to_sql()
is actually faster overall. If this is true from your side, maybe it would be good to change .to_pandas(use_pyarrow_extension_array=True).to_sql()
to .to_pandas().to_sql()
.
In addition to this, It seems that not being able to set the fast_execute_many=True
when creating the sqlalchemy
engine further slows down the exporting process to 10x slower than the following code:
from sqlalchemy import create_engine
engine = create_engine(connection_uri, fast_execute_many=True)
df.to_pandas().to_sql(
table_name,
con=engine,
schema="dbo",
chunksize=None,
if_exists="replace",
index=False
)
Below is a suggestion how the suggested changes could look like:
try:
from sqlalchemy import create_engine
except ImportError as exc:
raise ImportError(
"'sqlalchemy' not found. Install polars with 'pip install polars[sqlalchemy]'."
) from exc
# add fast_execute_many=True as a parameter
engine = create_engine(connection_uri, fast_execute_many=True)
# this conversion to pandas as zero-copy
# so we can utilize their sql utils for free
# remove use_pyarrow_extension_array=True
self.to_pandas().to_sql(
name=table_name, con=engine, if_exists=if_exists, index=False
)
Installed versions
---Version info---
Polars: 0.16.13
Index type: UInt32
Platform: Windows-10-10.0.17763-SP0
Python: 3.10.5 (tags/v3.10.5:f377153, Jun 6 2022, 16:14:13) [MSC v.1929 64 bit (AMD64)]
---Optional dependencies---
numpy: 1.23.1
pandas: 1.5.3
pyarrow: 8.0.0
connectorx: 0.3.1
deltalake: <not installed>
fsspec: 2023.3.0
matplotlib: 3.5.2
xlsx2csv: 0.8
xlsxwriter: <not installed>
use_pyarrow_extension_array
is only supported from pandas 1.5.x, if I am not mistaken.
Yes, you are right! I copied the dependencies from the wrong environment. Just edited it now, and the pandas version I am using is 1.5.3
@alexander-beedie interested in doing similar magic (similar to excel) on sqlalchemy one day? ;)
@alexander-beedie interested in doing similar magic (similar to excel) on sqlalchemy one day? ;)
@ritchie46: Lol.... This is right up my alley actually; I can write to various backends -anything with a postgres flavour, for example- from a Polars frame faster than SQLAlchemy or Pandas can[^1] (typically leveraging bulk insert via a COPY FROM
construct against a sliding BytesIO buffer), so it's just a question of how much work we want to do on our side to take backend-specific fast-paths (would help if we can get access to a range of test databases too) 🤔
[^1]: I already wrote this into our data API at work ;)
(typically leveraging bulk insert via a COPY FROM construct against a sliding BytesIO buffer),
Wow. What do you serialize it into then? CSV with a fixed schema? Or specific bytes dump per backend?
so it's just a question of how much work we want to do on our side to take backend-specific fast-paths
That's a good question. I think some complexity is worth it here. Dumping to databases is great functionality to have and to have performant as well.
Wow. What do you serialize it into then? CSV with a fixed schema? Or specific bytes dump per backend?
For PostgreSQL-compatible targets I target the "TEXT" format^1 for COPY FROM
(which is TSV with a custom NULL repr to distinguish from the empty string), mapping frame schema to the end-state table schema; the "null_value" param I added into write_csv
was specifically in support of streamlining this...😁
Sound good. Everything will be in cache and probably mostly compute bound. I shall ensure that bound is higher with: https://github.com/pola-rs/polars/issues/6155
This sounds like what I do for MSSQL at my company: Microsoft has a bulk copy utility bcp
which allows for character and native format bulk uploads. The native format wasn't hard to figure out, and I now use numpy to convert convert each pandas' column data to the exact byte specification, do a df = b''.join(df.to_numpy().ravel())
to serialize the entire df and write that to a flat file.
The result is a 300x speedup over pd.to_sql()
. I have tested it on tons of huge dataframes and it completely crushes pandas:
Microsoft has a bulk copy utility
bcp
which allows for character and native format bulk uploads
@mcrumiller Do you have to call bcp
, or is there an available interface via the driver? (like PostgreSQL COPY FROM). Thinking about how to better generalise/abstract the concept... 💭
@alexander-beedie I dump a .xml
(format file) and .dat
(data file) file to a temp directory, use subprocess to run the bcp
system command, then delete the files. One thing that's nice if you don't care about deleting is that subprocess doesn't have to wait for the system command to finish, so with huge uploads your python process can keep chugging along while your machine is uploading in the background.
I hadn't considered driver interfaces, as I'm not too experienced on that front, but a quick google search gives me SQL Server Driver Extensions - Bulk Copy Functions which looks promising.
I was hoping to implement this in polars as well, which is why I've been asking around a bit about whether pl.Binary
could be used in this case, but it looks like it probably can't: I don't see how to concatenate binary columns, and the binary representation doesn't quite seem to be what I'd expect. For example, casting a pl.UInt64
and pl.UInt8
columns to binary seem to give the same result:
pl.Series([1], dtype=pl.UInt8).cast(pl.Binary).item()
pl.Series([1], dtype=pl.UInt64).cast(pl.Binary).item()
both give b'1'
.
There is also a Rust/Python project which converts a ArrowDataframe into the Postgres binary format and then does the bulk insert. This should be faster than TSV bulk insert (I have measured 4x on speed and 1.7x on memory)
https://github.com/adriangb/pgpq
For example, here's a snippet of how to convert pandas datetime
to bcp format. data
is a Timestamped pd.Series
:
# See https://bornsql.ca/blog/how-sql-server-stores-data-types-datetime-date-time-and-datetime2/
# days since 0001-01-01, then grab only 3 bytes
null = data.isna()
data = (data.to_numpy().astype("datetime64[D]") - np.datetime64("0001-01-01")) \
.astype(np.uint32).astype("V3")
# nullable is True if the SQL column we're uploading to allows nulls
if nullable:
prefix_type = np.uint8
prefix = np.full(N_rows, length, dtype=prefix_type)
null_prefix = prefix_type(np.iinfo(prefix_type).max)
data = prefix.view(np.void) + data.astype(object)
data[null] = null_prefix
This encodes each value in the Series into 3-byte values each, plus (optionally) the null prefix that specifies if the value is null or not.
In the end, each column is processed similarly (ints/datetimes/booleans/strings/etc.) and at the end, you can perform a horizontal sum
which simply concatenates the bytestreams:
df = b''.join(df.to_numpy().ravel())
Which can be written directly to disk. This then also needs an accompanying xml
file to specify to bcp
the columns, formats, etc. It's not that simple but it's way faster than any other upload methods I've seen.
Hi @alexander-beedie, any chance of getting this picked up over the coming weeks/months? Most specifically your mention of postgres specific fast paths.
I have some further thoughts, but in summary, the performance benefit gained by for anyone writing to postgres would be massive!
Happy to create a new feature request for that specifically if you think it is a better place to continue.
Thanks in advance
For example, here's a snippet of how to convert pandas
datetime
to bcp format.data
is a Timestampedpd.Series
:# See https://bornsql.ca/blog/how-sql-server-stores-data-types-datetime-date-time-and-datetime2/ # days since 0001-01-01, then grab only 3 bytes null = data.isna() data = (data.to_numpy().astype("datetime64[D]") - np.datetime64("0001-01-01")) \ .astype(np.uint32).astype("V3") # nullable is True if the SQL column we're uploading to allows nulls if nullable: prefix_type = np.uint8 prefix = np.full(N_rows, length, dtype=prefix_type) null_prefix = prefix_type(np.iinfo(prefix_type).max) data = prefix.view(np.void) + data.astype(object) data[null] = null_prefix
This encodes each value in the Series into 3-byte values each, plus (optionally) the null prefix that specifies if the value is null or not.
In the end, each column is processed similarly (ints/datetimes/booleans/strings/etc.) and at the end, you can perform a horizontal
sum
which simply concatenates the bytestreams:df = b''.join(df.to_numpy().ravel())
Which can be written directly to disk. This then also needs an accompanying
xml
file to specify tobcp
the columns, formats, etc. It's not that simple but it's way faster than any other upload methods I've seen.
Do you have a full example of going from pandas to dat and xml file you want to share? 😇 @mcrumiller . And what your bcp
command looks like.
I found this open issue after testing polars 'write_database' with the 'abcd' engine. AWS postgres destination, 6K row dataframe. It took over 5 mins with polars vs. 2.3seconds with pandas+sqlalchemy.
I found this open issue after testing polars 'write_database' with the 'abcd' engine. AWS postgres destination, 6K row dataframe. It took over 5 mins with polars vs. 2.3seconds with pandas+sqlalchemy.
@bobcolner: Got an example of how you called it? That sounds waaaaaay out 😅 Probably worth raising a bug report about it actually; if you do that I'll pick it up and see what's going on.
Hi @alexander-beedie, any chance of getting this picked up over the coming weeks/months? Most specifically your mention of postgres specific fast paths.
@henryharbeck: I'll get there in the end 😅 Have been focused on the read
side of things in recent weeks, and it feels like that has come along significantly. Once that settles more I will return to the write
half of the equation again (will assign myself as a reminder ✌️)
I found this open issue after testing polars 'write_database' with the 'abcd' engine. AWS postgres destination, 6K row dataframe. It took over 5 mins with polars vs. 2.3seconds with pandas+sqlalchemy.
@bobcolner: Got an example of how you called it? That sounds waaaaaay out 😅 Probably worth raising a bug report about it actually; if you do that I'll pick it up and see what's going on.
sure, I used this call for polars:
df_polars.write_database(table_name='polars_table', connection=DB_URL, engine='adbc', if_exists='replace')
and this for pandas:
sqa_engine = sqlalchemy.create_engine(
f"postgresql+psycopg2{DB_URL.split('postgresql')[1]}"
)
df_pandas.to_sql(
con=sqa_engine, name='pandas_table', schema="public", if_exists="replace", index=False
)
FYI I was writing to Using AWS Aurora Serverless v2 as the postgres destination. see: https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/aurora-serverless-v2.html
thanks for looking into this!
PS -it would be great to add a schema
parameter to the polars write_database
method.
@henryharbeck: I'll get there in the end 😅 Have been focused on the
read
side of things in recent weeks, and it feels like that has come along significantly. Once that settles more I will return to thewrite
half of the equation again (will assign myself as a reminder ✌️)
Thank you very much @alexander-beedie I certainly agree! Many of those PRs have directly benefited my day to day work, not to mention the speed at which they have been rolled out. As someone who's IO is mostly databases rather than files, personal thanks from me!
Just wanted to ensure that it was still on the cards for the future.
For example, here's a snippet of how to convert pandas
datetime
to bcp format.data
is a Timestampedpd.Series
:# See https://bornsql.ca/blog/how-sql-server-stores-data-types-datetime-date-time-and-datetime2/ # days since 0001-01-01, then grab only 3 bytes null = data.isna() data = (data.to_numpy().astype("datetime64[D]") - np.datetime64("0001-01-01")) \ .astype(np.uint32).astype("V3") # nullable is True if the SQL column we're uploading to allows nulls if nullable: prefix_type = np.uint8 prefix = np.full(N_rows, length, dtype=prefix_type) null_prefix = prefix_type(np.iinfo(prefix_type).max) data = prefix.view(np.void) + data.astype(object) data[null] = null_prefix
This encodes each value in the Series into 3-byte values each, plus (optionally) the null prefix that specifies if the value is null or not. In the end, each column is processed similarly (ints/datetimes/booleans/strings/etc.) and at the end, you can perform a horizontal
sum
which simply concatenates the bytestreams:df = b''.join(df.to_numpy().ravel())
Which can be written directly to disk. This then also needs an accompanying
xml
file to specify tobcp
the columns, formats, etc. It's not that simple but it's way faster than any other upload methods I've seen.Do you have a full example of going from pandas to dat and xml file you want to share? 😇 @mcrumiller . And what your
bcp
command looks like.
@alexander-beedie I have used pandas for a long time and created a custom bulk copy approach. The bulk insert Python statement looks like this (please contact me if you would like the full code base):
rowterminator = '0x0a' if 'linux' in sys.platform else '\r\n'
query = "BULK INSERT " + table_name + " FROM '" + file_path + \
"' WITH (FIELDTERMINATOR = ';', ROWTERMINATOR = '" + rowterminator + "', KEEPNULLS)"
cursor = conn.cursor()
cursor.execute(query)
conn.commit()
cursor.close()
where table_name
, file_path
are obvious parameters.
Before using the bulk insert statement, you will have to create the SQL table table_name
first (just empty table) with the correct schema. Although polars has stricter column types than pandas, the code below I used to determine the MS SQL column types based on the pandas dataframe df
.
def determine_column_types(df: pd.DataFrame):
"""Based on the provided dataframe df, determine for each of the columns what the type of the items in the columns
are. Unfortunately, when NaN-values are present, pandas will cast all numeric types 'floats' and other type to
'objects'. This def will try to determine a more specific object-type for each of the columns (could run this function on a random sample of rows from the df, say 1_000).
In this method five different database-types are considered: boolean, int, float, datetime and varchar. This is the
order in which they will be considered, as this is from most-specific to least-specific. For example, an int or
float can be an instance of a datetime or a varchar as well, but the most-specific type is best.
:param df: The pandas DataFrame from which the data of each column is to be inferred from the values in the column
:return: A dictionary containing column names with their respectable data-types"""
column_types = __OrderedDict()
for col in df.columns:
if len(df[col].dropna().index) == 0:
curtype = 'VARCHAR'
elif df[col].dtype == 'datetime64[ns]':
curtype = 'DATETIME'
else:
try:
# If the column is numeric, it is either an int (or boolean) or a float or possibly a datetime
# Use the dfcol_dropna to ensure proper conversion from float to int in case of NaN's/NULLs
dfcol_dropna = __pd.to_numeric(df[col].dropna(), downcast='integer')
df.loc[:, col] = __pd.to_numeric(df[col], downcast='integer')
# If the current instance is an int, only update the curtype when it is still None
if dfcol_dropna.dtype in (__np.int8, __np.int16, __np.int32, __np.int64,
__np.uint8, __np.uint16, __np.uint32, __np.uint64, __np.bool):
# Next, check if only 1's and 0's are present. If so, it is a boolean, else an integer
if __pd.Series(dfcol_dropna.unique()).isin([0, 1, True, False]).all():
df[col].replace(to_replace=[True, False], value=[1, 0], inplace=True)
curtype = 'BIT'
else:
curtype = 'INT'
elif df[col].dtype in (__np.float16, __np.float32, __np.float64):
curtype = 'FLOAT'
else:
# If it is neither an int (boolean or not) or float, then it is a datetime or varchar.
try:
__pd.to_datetime(df[col], infer_datetime_format=True)
curtype = 'DATETIME'
except ValueError:
curtype = 'VARCHAR'
except (ValueError, TypeError):
# If it is not a boolean, an int or a float, consider datetime
try:
__pd.to_datetime(df[col], infer_datetime_format=True)
curtype = 'DATETIME'
except ValueError:
# If this fails as well, its type will be set to the least-specific type VARCHAR
curtype = 'VARCHAR'
# When one way or another the curtype was not set, simpy go for the least-specific type
if not curtype:
curtype = 'VARCHAR'
column_types[df[col].name] = curtype
return column_types
Now, this code is a few years old and can be optimized/sanitized. What I found was drastic improvements of inserts using BCP. However, it would be even quicker if we can somehow keep the CSV as Bytes in memory and push to the database, as the csv writing is quite a heavy task by itself. This, however, poses the problem of keeping it all in-memory, so writing a temporary csv file and deleting it after the bulk insert is a safer bet. Nonetheless, it should be way quicker than any other method for MS SQL.
Of course, this is pandas to MS SQL, so not at all generalizable. But I found it worth sharing.
I've since updated my bcp-packaging and it's mainly now performed with zero-copy .to_numpy()
calls, unless the column is nullable/has nulls in it. I don't the believe the BCP shell command allows in-memory files, but if someone could get around that it would be great. As-is, it's much faster to compact it into a native format as opposed to using csv's.
Here is an interesting article, also how to upload to a Azure SQL database, with some tips and tricks on the bcp command: https://www.sqlshack.com/how-to-handle-100-million-rows-with-sql-server-bcp/
@alexander-beedie I've done that approach you mentioned of converting to a csv io object then using copy but one wrinkle is if, let's say, you have a counter column that generates values so you can't bulk copy direct so I'll do the postgres copy to a temp table and then insert from the temp table to the real table so that the df to be inserted doesn't have to have the same columns as the destination table. I just mention that to preempt the "I need to insert 5 column df to 6 column table in DB" issue.
For clarification. To Polars developers: what is the intended objective of Polars write_database
?
a) to be a "fastest-possible loading" tool, using any means available from each RDBMS implementation (PostgreSQL COPY
command, Microsoft SQL Server bcp
tool)? or
b) to be an "as general as possible" command that can pump data into any SQL database (like Pandas to_sql
tries to)?
After reading this thread, I'm having a strong feeling that the answer is a). In this case, I cannot help very much.
Otherwise, I came here from #13513. My wish was (initially) just to remove Pandas dependency (a large package currently used just to add a thin wrapper over SQLAlchemy to insert data). In this case, I'm interested in helping with some PRs. (not neglecting performance, of course).
Disclaimer: I have a strong interest in working with data from several RDBMS, but not enough to go into the specifics of each one (COPY, BCP, etc.). I understand they can be orders of magnitude faster, but the lack of standardization is a very strong problem for me (e.g., the cited tools need the source data to be accessible from the server's local filesystem, which is not available to the user in many cases).
Hi, is this issue still ongoing where polars.write_database() is way slower than pandas.to_sql() with sqlalchemy engine? Or improvements already made?
Hi, is this issue still ongoing where polars.write_database() is way slower than pandas.to_sql() with sqlalchemy engine? Or improvements already made?
We haven't done the big write_database
rewrite yet, but I have just landed a stop-gap PR that will enable quite a few opt-in speedups; you will be able to supply an instantiated connection object and also pass-down all the associated insert parameters that the underlying methods would be able to use - examples in the PR: #16099.
This means that, in the cases where we were previously slower, we should now be able to get the same speed as pandas to_sql
with sqlalchemy, assuming you establish the connection/options in the same way (and adbc may be faster in some cases). So, we no longer have to be slower here :v:
Coming in the 0.20.26
release.
(Update: released).
I am a noob so you may not satisfied by the code quality but this is a lot faster than pandas.to_sql or polars write_database : https://gist.github.com/Tapanhaz/9c53e47a95f7759e6f506410cea0b3c3
import polars_sql
import numpy as np
import polars as pl
from datetime import datetime
from dateutil.rrule import DAILY, rrule
total_rows = 1000000
total_cols = 15
column_names = [f"column_{i+1}" for i in range(total_cols)]
date_list = list(rrule(freq=DAILY, dtstart=datetime(1000, 1, 1), count=total_rows))
data = {
col: np.random.rand(total_rows) for col in column_names
}
data = {"date": date_list, **data}
df = pl.DataFrame(data)
print(f"Dataframe shape :: {df.shape}")
start_1 = time.perf_counter()
df.sql_writer.to_sql(table_name="test_table_1", connection="sqlite:///test.db", if_table_exists="append")
print(f"Time Required to insert using sql_writer:: {time.perf_counter() - start_1} secs.")
start_2 = time.perf_counter()
df.write_database(table_name="test_table_2", connection="sqlite:///test.db", if_table_exists="append")
print(f"Time Required to insert using polars write database:: {time.perf_counter() - start_2} secs.")```