polars icon indicating copy to clipboard operation
polars copied to clipboard

SQL Query Hangs When Using scan_parquet off S3

Open stephenskory opened this issue 1 year ago • 5 comments
trafficstars

Checks

  • [X] I have checked that this issue has not already been reported.
  • [X] I have confirmed this bug exists on the latest version of Polars.

Reproducible example

This version works giving the output supplied

import polars as pl
df0 = pl.DataFrame({"foo": ["a", "b", "c"], "bar": [0, 1, 2]}).lazy()
df1 = pl.DataFrame({"muh": ["x", "y", "z"], "bar": [0, 1, 2]}).lazy()
ctx = pl.SQLContext(register_globals=True)
query = f"""SELECT df0.foo, df0.bar, df1.muh
FROM df0
JOIN df1 ON df0.bar = df1.bar
WHERE df0.bar IN (0, 1)"""
out = ctx.execute(query).collect()
print(out)

I saved the dataframes to disk (e.g. df0.write_parquet("/tmp/df0.parquet")), and this works:

import polars as pl
df0 = pl.read_parquet("/tmp/df0.parquet")
df1 = pl.read_parquet("/tmp/df1.parquet")
ctx = pl.SQLContext(register_globals=True)
# snip...

Lazy loading locally works as well:

import polars as pl
df0 = pl.scan_parquet("/tmp/df0.parquet")
df1 = pl.scan_parquet("/tmp/df1.parquet")
ctx = pl.SQLContext(register_globals=True)
# snip...

I then uploaded those two df files to s3 for the next two tries. This first version works giving the same output as before (all examples above and below are on an EC2 instance):

import polars as pl
import boto3
session = boto3.session.Session()
credentials = session.get_credentials().get_frozen_credentials()
storage_options = {
        "aws_access_key_id": credentials.access_key,
        "aws_secret_access_key": credentials.secret_key,
        "aws_session_token": credentials.token,
        "aws_region": session.region_name,
    }
df0 = pl.read_parquet("s3://bucket/df0.parquet",
                    storage_options=storage_options)
df1 = pl.read_parquet("s3://bucket/df1.parquet",
                    storage_options=storage_options)
ctx = pl.SQLContext(register_globals=True)
# snip...

If I take the above example and only swap the two instances of read_parquet for scan_parquet, the program hangs and the only way to free the command line is to kill -9 the process.

# snip...
df0 = pl.scan_parquet("s3://bucket/df0.parquet",
                    storage_options=storage_options)
df1 = pl.scan_parquet("s3://bucket/df1.parquet",
                    storage_options=storage_options)
# snip...
# hang...

Log output

join parallel: true
POLARS PREFETCH_SIZE: 16
querying metadata of 1/1 files...
reading of 1/1 file...
POLARS PREFETCH_SIZE: 16
querying metadata of 1/1 files...
parquet file must be read, statistics not sufficient for predicate.
POLARS ROW_GROUP PREFETCH_SIZE: 128
reading of 1/1 file...
parquet file must be read, statistics not sufficient for predicate.
POLARS ROW_GROUP PREFETCH_SIZE: 128

Issue description

I think the code examples show basically the problem. Doing a JOIN in-memory works, doing local both eager and lazy works, and with eager read_parquet off of S3. However, doing the same with lazy loading off S3 results in a hard-hang.

When I do an operationally equivalent test with actual data on S3 (~100mb), all the same cases work/hang. With lazy loading off S3 I do see a network traffic and CPU load spike, but no output and the same hang. When the program is hanging I do not see any network or CPU load after the initial network/CPU spike.

Expected behavior

shape: (2, 3)
┌─────┬─────┬─────┐
│ foo ┆ bar ┆ muh │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ str │
╞═════╪═════╪═════╡
│ a   ┆ 0   ┆ x   │
│ b   ┆ 1   ┆ y   │
└─────┴─────┴─────┘

Installed versions

--------Version info---------
Polars:               0.20.16
Index type:           UInt32
Platform:             Linux-5.15.0-1055-aws-aarch64-with-glibc2.31
Python:               3.11.5 (main, Sep 11 2023, 13:14:08) [GCC 11.2.0]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          2.2.1
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               2024.2.0
gevent:               <not installed>
hvplot:               0.9.0
matplotlib:           3.8.0
numpy:                1.24.3
openpyxl:             3.0.10
pandas:               2.1.4
pyarrow:              11.0.0
pydantic:             1.10.12
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           2.0.21
xlsx2csv:             <not installed>
xlsxwriter:           3.1.9

stephenskory avatar Mar 19 '24 21:03 stephenskory

FWIW I just ran the above examples on my laptop (M1 Mac) and all examples including the lazy-S3 version worked. This indicates that something is amiss on the EC2 instance with my Python/Polars stack, but I'm still stuck. Any ideas or suggestions? Just re-install everything? Is there a way to get any kind of useful error message?

stephenskory avatar Mar 19 '24 22:03 stephenskory

Does this occur in the dataframe API as well?

ritchie46 avatar Mar 20 '24 10:03 ritchie46

Yes, this example below hangs, but with read_parquet and no .collect() it works fine.

import polars as pl
import boto3
session = boto3.session.Session()
credentials = session.get_credentials().get_frozen_credentials()
storage_options = {
        "aws_access_key_id": credentials.access_key,
        "aws_secret_access_key": credentials.secret_key,
        "aws_session_token": credentials.token,
        "aws_region": session.region_name,
    }
df0 = pl.scan_parquet("s3://bucket/df0.parquet",
                    storage_options=storage_options)
df1 = pl.scan_parquet("s3://bucket/df1.parquet",
                    storage_options=storage_options)
dfboth = df0.join(df1, on="bar", how="inner")
dfboth = dfboth.filter(pl.col("bar").is_in((0, 1))).collect()
print(dfboth)

I tried upgrading a number of the packages related to Polars but it didn't change anything:

--------Version info---------
Polars:               0.20.16
Index type:           UInt32
Platform:             Linux-5.15.0-1055-aws-aarch64-with-glibc2.31
Python:               3.11.5 (main, Sep 11 2023, 13:14:08) [GCC 11.2.0]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          3.0.0 # upgraded
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               2024.3.1 # upgraded
gevent:               <not installed>
hvplot:               0.9.0
matplotlib:           3.8.0
numpy:                1.24.3
openpyxl:             3.0.10
pandas:               2.1.4
pyarrow:              15.0.2 # upgraded
pydantic:             2.6.4 # upgraded
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           2.0.21
xlsx2csv:             <not installed>
xlsxwriter:           3.1.9

stephenskory avatar Mar 20 '24 16:03 stephenskory

I believe this is a regression from https://github.com/pola-rs/polars/pull/15083.

Local MRE:

import os

os.environ["POLARS_FORCE_ASYNC"] = "1"
os.environ["POLARS_MAX_THREADS"] = "1"

import polars as pl

df = pl.Series("x", [1]).to_frame()

p = ".env/x.parquet"
df.write_parquet(p)

print(pl.collect_all([pl.scan_parquet(p)]))

Basically, every thread in the rayon pool blocks on an async task, and then one of those async tasks end up spawning a new task on the rayon pool:

https://github.com/pola-rs/polars/blob/474ac3478f8117f6bdc4992fae21f57c3bfdd86d/crates/polars-io/src/parquet/read_impl.rs#L673

and then blocks on that task, which never executes since all the rayon threads are blocked, so we deadlock

nameexhaustion avatar Mar 22 '24 11:03 nameexhaustion

I am runing into this issue with 0.20.16 when reading Parquet files from Google Cloud Buckets with code like this

df_pl = pl.concat(
    (pl.scan_parquet(c.url, storage_options=storage_options) for c in results),
    how="diagonal",
    # Ideally this reduces the memory usage
    rechunk=False,
).collect()

aberres avatar Mar 25 '24 12:03 aberres