PyAthena icon indicating copy to clipboard operation
PyAthena copied to clipboard

SQLAlchemy + Pandas very slow when compared to AWS Wrangler

Open avibrazil opened this issue 4 years ago • 17 comments

Tested same query that returns more than 2 million lines.

AWS Wrangler takes 1m34s to return a DataFrame. SQLAlchemy + PyAthena takes 16m37s to return a DataFrame.

See attached notebook for proof and methods.

Also, PyAthena apparently returns an object almost 9% bigger. But this can be due to Pandas, SQLAlchemy, data types and other minor things that I wouldn’t care right now.

avibrazil avatar Nov 25 '21 17:11 avibrazil

https://github.com/laughingman7743/PyAthena/issues/46 https://github.com/laughingman7743/PyAthena/tree/master/benchmarks https://github.com/laughingman7743/PyAthena#pandascursor

laughingman7743 avatar Nov 25 '21 21:11 laughingman7743

Thank you. I’ve seen all these links and methods before.

But they won’t clarify how to use this with SQLAlchemy. And I need to use SQLAlchemy.

So, is there any way to use PyAthena with SQLAlchemy and Pandas that would match acceptable performance?

Maybe a flag in the URL or something.

avibrazil avatar Nov 25 '21 22:11 avibrazil

I do not have an implementation that uses a combination of SQLAlchemy and Pandas. What is the use case for that?

laughingman7743 avatar Nov 26 '21 02:11 laughingman7743

I have a Pandas application that is independent of database. So SQLAlchemy provides a unified single interface and code doesn’t need to be changed when changing database backend, just the URL.

avibrazil avatar Nov 26 '21 03:11 avibrazil

For now, the current implementation does not allow the use of PandasCursor and SQLAlchemy in combination. It needs some modification, but by implementing switching the cursor to be used by the connection URL, we may possibly speed up the retrieval of query results.

laughingman7743 avatar Nov 26 '21 04:11 laughingman7743

Are there any guides on how to use it? I’m using SQLALCHEMY query builder to query Athena through pandas read_sql_query(). Or simply the use case I put in code in the notebook attached to this bug report.

Also, I’d suggest to be more explicit about this problem when using PyAthena with SQLALCHEMY and Pandas. Simply to suggest to not use SQLALCHEMY, to bypass SQLALCHEMY, is not a solution.

avibrazil avatar Nov 26 '21 12:11 avibrazil

What do you need a guide for, a guide for PandasCursor? It has a DB-API2.0 interface, which is the same as the default cursor. The way to fetch data is also the same.

laughingman7743 avatar Nov 26 '21 12:11 laughingman7743

I don't know how to use it in this situation:

conn_str = "awsathena+rest://:@athena.{region_name}.amazonaws.com:443/{schema_name}?work_group={work_group}&compression=snappy"

pyathena_sqlalchemy_conn = sqlalchemy.create_engine(
    url = conn_str.format(
        region_name  = region_name,
        schema_name  = schema_name,
        work_group   = work_group
    ),
    echo_pool = True,
)

pyathena_df = pd.read_sql_query(query,con=pyathena_sqlalchemy_conn)

What should I pass to pandas.read_sql_query() in the con argument in order to use PandasCursor? And how to get it with SQLAlchemy?

Thank you in advance

avibrazil avatar Nov 26 '21 12:11 avibrazil

In the current implementation, SQLAlchemy and PandasCursor cannot be used in combination. As long as you use SQLAlchmey, you cannot change the default cursor.

If you do not use SQLAlchemy, you can use PandasCursor as follows.

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
                 region_name="REGION_NAME",
                 schema_name="SCHEMA_NAME",
                 work_group="WORK_GROUP",
                 cursor_class=PandasCursor).cursor()
df = cursor.execute(query).as_pandas()

laughingman7743 avatar Nov 26 '21 13:11 laughingman7743

If you want to use PandasCursor in combination with SQLAlchemy, you probably need to modify the part that assembles the arguments of the connection object from the URL. https://github.com/laughingman7743/PyAthena/blob/master/pyathena/sqlalchemy_athena.py#L302-L332 There is no guarantee that SQLAlchemy will work correctly after changing to PandasCursor.

laughingman7743 avatar Nov 26 '21 13:11 laughingman7743

This new method apparently reduced query time from 16min to 6m35s. But still far from beating AWS Wrangler (without SQLALchemy, indeed) 1m15s:

conn_str = "awsathena+rest://:@athena.{region_name}.amazonaws.com:443/{schema_name}?work_group={work_group}&compression=snappy"

sqlalchemy_connect_args={}
if "awsathena+rest" in conn_str:
    from pyathena.pandas.cursor import PandasCursor
    sqlalchemy_connect_args.update(
        dict(
            cursor_class=PandasCursor
        )
    )

pyathena_sqlalchemy_conn = sqlalchemy.create_engine(
    url = conn_str.format(
        region_name  = region_name,
        schema_name  = schema_name,
        work_group   = work_group
    ),
    connect_args=sqlalchemy_connect_args,
    echo_pool = True,
)

cursor = pyathena_sqlalchemy_conn.raw_connection()

pyathena_df = pd.read_sql_query(query,con=cursor)

pyathena_df.info()

It consists of getting a PandasCursor from SQLAlchemy's raw connections.

But I don't know yet if I can reuse the cursor or should I get a new cursor for each query.

avibrazil avatar Nov 26 '21 13:11 avibrazil

Here is a simplification. Don't need to get a raw_connection() to proceed. Simply using the connection constructed with a PandasCursor gives me 7m14s query time for my 2 million lines query, instead of 17m.

conn_str = "awsathena+rest://:@athena.{region_name}.amazonaws.com:443/{schema_name}?work_group={work_group}&compression=snappy"

sqlalchemy_connect_args={}
if "awsathena+rest" in conn_str:
    from pyathena.pandas.cursor import PandasCursor
    sqlalchemy_connect_args.update(
        dict(
            cursor_class=PandasCursor
        )
    )

pyathena_sqlalchemy_conn = sqlalchemy.create_engine(
    url = conn_str.format(
        region_name  = region_name,
        schema_name  = schema_name,
        work_group   = work_group
    ),
    connect_args=sqlalchemy_connect_args,
    echo_pool = True,
)

pyathena_df = pd.read_sql_query(query,con=pyathena_sqlalchemy_conn)

Still, PyAthena alone or AWS Wrangler (both without SQLAlchemy) run same query in 1m15s, for 2 million rows result.

Villain here is SQLAlchemy or the integration between SQLAlchemy and PyAthena.

I hope this can be solved in the future in order make PyAthena+SQLAlchemy to deliver same performance as PyAthena alone. SQLAlchemy is desired because it makes your code more elegant and succinct, and makes your app look nicer and independent from database engine.

avibrazil avatar Nov 26 '21 15:11 avibrazil

For reference, here is the notebook used in this experiment: AWS Wrangler × PyAthena × SQLAlchemy.zip.

avibrazil avatar Nov 26 '21 15:11 avibrazil

The PandasCursor itself already has a DataFrame, but it handles the fetching of the DataFrame to comply with the DB-API interface. https://github.com/laughingman7743/PyAthena/blob/master/pyathena/pandas/result_set.py#L100-L133 The read_sql_query method of Pandas calls the fetchall method of the cursor to create the DataFrame. https://github.com/pandas-dev/pandas/blob/945c9ed766a61c7d2c0a7cbb251b6edebf9cb7d5/pandas/io/sql.py#L1521-L1602 The process generates an array of tuples from the DataFrame and re-creates the DataFrame. This is a slow process in general.

laughingman7743 avatar Nov 26 '21 16:11 laughingman7743

So indeed all this abstraction from SQLAlchemy comes with a toll. Anyway, although not optimal, I'm happy to have a performance improvement with PandasCursor.

avibrazil avatar Nov 26 '21 16:11 avibrazil

https://towardsdatascience.com/heres-the-most-efficient-way-to-iterate-through-your-pandas-dataframe-4dad88ac92ee

laughingman7743 avatar Jan 25 '22 03:01 laughingman7743

I have implemented various performance improvements.

  • https://github.com/laughingman7743/PyAthena/pull/284
  • https://github.com/laughingman7743/PyAthena/pull/325
  • https://github.com/laughingman7743/PyAthena/pull/370

I think Pandas cursor can get a large result set at the same speed as awswrangler(aws-sdk-pandas) by combining the unload option with our own S3 file system implementation (implemented in v2.18.0).

laughingman7743 avatar Dec 18 '22 08:12 laughingman7743