BUG: read_sql produces duplicates
System information
- OS Platform and Distribution: CentOS Linux release 7.9.2009 (Core)
- Modin version 0.15.1
- Python version: 3.8
- Code we can use to reproduce: Code is in the Source code but due to NDA I couldn't use the real code and used substitues instead.
Describe the problem
modin.read_sql function produces false duplicate entries.
How I found out:
- I ran the
modin.read_sql, reading entries from MSSQL database (see Source code). - Ran the group by query and it produced duplicate results.
- In order to validate this, I found the duplicate entries in the modin dataframe (with square brackets command) and found out that they are indeed present.
Then I did the same with pandas and there were no duplicate entries. I also ran validation query on the MSSQL database directly and no duplicates there either.
I tested it with both dask and ray backend, it works the same.
Source code / logs
import modin.config as cfg
cfg.Engine.put("dask")
import modin.pandas as pd
import modin
from dask.distributed import Client
client = Client()
# SELECT_QUERY is pretty simple select query with join and where, nothing more
# additional info: SELECT_QUERY was a string, not sqlalchemy expression
data = pd.read_sql(SELECT_QUERY, CONNECTION_STRING_MSSQL)
# this assert runs OK for pandas but not for modin
assert data .groupby(LIST_OF_UNIQUE_COLS).size()[lambda x: x > 1].pipe(len) == 0
Hi @JanPalasek thanks for the report!
@mvashishtha - Could the JOIN in the SQL query be somehow causing duplicate rows to occur in the Modin dataframe?
@devin-petersohn I have a reproducer on my mac.
In this example, we start with two identical dataframes of 512 rows and 13 columns. Each column is an identical range from 512 to 0. A read_sql query joins the frames on each of their first two columns. In pandas, the resulting dataframe has the original distinct 512 rows, and column colA0 is unique. In Modin, it's not. If I use 256 rows instead, or 12 columns, I no longer get this bug. I also need to join on at least 2 columns to get the bug. I also need to select all 13 columns from both frames to get the bug. If I only select 12 columns I don't get the bug. If I reverse the initial data so it's increasing instead of decreasing, I don't get the bug.
Show example
import pandas
import numpy as np
import modin.pandas as pd
from modin.config import MinPartitionSize, NPartitions
def get_query(num_cols):
# We need to join on at least 2 cols to get the bug.
query = 'SELECT '
query += ', '.join("A.colA" + str(i) for i in range(num_cols))
query += ', '
query += ', '.join("B.colB" + str(i) for i in range(num_cols))
query += ' FROM ASmall A INNER JOIN BSmall B ON A.colA0 = B.colB0 AND A.colA1 = B.colB1'
return query
def test_num_cols(num_cols):
modin_df = pd.read_sql(get_query(num_cols), con)
pandas_df = pandas.read_sql(get_query(num_cols), con)
return modin_df._to_pandas().equals(pandas_df)
MinPartitionSize.put(1)
NPartitions.put(2**4)
con = 'mssql+pymssql://sa:Strong.Pwd-123@localhost:1433/master'
data=np.tile(np.arange(2**9 - 1, -1, -1), (13, 1)).T
A_small = pandas.DataFrame(data).add_prefix('colA')
B_small = pandas.DataFrame(data).add_prefix('colB')
A_small.to_sql("ASmall", con, if_exists='replace', index=False)
B_small.to_sql("BSmall", con, if_exists='replace', index=False)
cols_to_read = 13
pdf = pandas.read_sql(get_query(13), con)
assert pdf.colA0.is_unique
df = pd.read_sql(get_query(13), con)
# This assertion fails
assert df.colA0.is_unique
print(df.colA0._to_pandas().compare(pdf.colA0))
Digging into the Modin query results for different partitions (from queries generated here), I see the first query gives the last 32 rows in reverse (increasing) order:
pandas.read_sql(f'SELECT * FROM ({get_query(num_cols)}) as _ ORDER BY(SELECT NULL) OFFSET 0 ROWS FETCH NEXT 32 ROWS ONLY', con)
the next query gives rows 449 through 480 (as expected) in the correct order:
pandas.read_sql(f'SELECT * FROM ({get_query(num_cols)}) as _ ORDER BY(SELECT NULL) OFFSET 32 ROWS FETCH NEXT 32 ROWS ONLY', con)
and the final partition query gives the last 32 rows again, but this time in the correct (decreasing) order:
pandas.read_sql(f'SELECT * FROM ({get_query(num_cols)}) as _ ORDER BY(SELECT NULL) OFFSET 480 ROWS FETCH NEXT 32 ROWS ONLY', con)
and those 32 rows are the only duplicates in the Modin result, as we can see with print(df.colA0._to_pandas().compare(pdf.colA0)).
So somehow OFFSET 0 ROWS FETCH NEXT 32 ROWS ONLY is giving the last 32 rows in reverse order, instead of the first 32 rows in the expected order. The rest of the partitions are correct. I can't explain that behavior. I know the DB doesn't guarantee an order when there's no real ORDER BY, but empirically we find we can rely on each partition's query using the same order.
System Info:
- macOS Monterey 12.4
- Python 3.9.12
- Modin 8efd8f765fffcd7166a78471620a942174ee8086
- commands to start sql server:
sudo docker pull [mcr.microsoft.com/mssql/server:2019-latest](http://mcr.microsoft.com/mssql/server:2019-latest)
docker run -d --name example_sql_server -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=Strong.Pwd-123' -p 1433:1433 [mcr.microsoft.com/mssql/server:2019-latest](http://mcr.microsoft.com/mssql/server:2019-latest)
I talked about this with @batur-ponder and we thought the solution for reading a whole table would be:
- If the table has a primary key or other uniqueness constraint, ORDER BY the key when reading the partitions.
- Otherwise, copy the query result into a temp table with an IDENTITY / ROW_NUMBER column, then let the partitions query the temp table ordering by the identity column.
We'd have to check whether Modin's performance is good in both cases.
Despite all this, in the case of a complex query such as in the reproducer here, it's not easy to tell what the primary keys in the query result are. I don't know how we can partition results in a way that guarantees correctness.
I have the same problem with SQL Server and modin.read_sql.