polars icon indicating copy to clipboard operation
polars copied to clipboard

read_csv slow (single threaded) on large delimited file

Open cmartin1968 opened this issue 3 years ago • 9 comments

Polars version checks

  • [X] I have checked that this issue has not already been reported.

  • [X] I have confirmed this bug exists on the latest version of polars.

Issue Description

I have a large tab delimited file of approx 50m rows and 138 columns. I have users polars read_csv on similar and even larger files with no issues. It will use all CPU cores and read the file to a data frame in minutes or seconds. With this particular file however CPU usage never tops 100% (i.e. single threaded) and the read takes orders of magnitude longer. It's not terrible but it's not that lightning fast performance I've come to expect with Polars.

I have played with options, specified the correct dtype for all columns, and fiddled with file enconding on the data file. Nothing seems to help. I'm just curious what might have triggered polars to use only a singe core when reading this file.

I'm running on a server with 16 cores and 512GB RAM. Memory usage tops out at about 120GB so resources are not an issue.

Reproducible Example

import polars as pl

mydtypes = {
  'mbrid' : pl.Int64,
  'actdate' : pl.Date,
  'compid' : pl.Int64,
  'ext_pcp_id' : pl.Int64,
  <...truncated...>
}

df=pl.read_csv("/Data/claims.txt", sep="\t",dtype=mydtypes)
df

(50000000, 138)

real	4m58.214s
user	4m36.504s
sys	0m23.034s

Expected Behavior

read_csv() should use all avaialbe CPU cores and complete much faster

Installed Versions

---Version info--- Polars: 0.14.2 Index type: UInt32 Platform: Linux-4.19.0-21-amd64-x86_64-with-debian-10.12 Python: 3.7.3 (default, Jan 22 2021, 20:04:44) [GCC 8.3.0] ---Optional dependencies--- pyarrow: 9.0.0 pandas: 1.3.5 numpy: 1.21.6 fsspec: connectorx: xlsx2csv:

cmartin1968 avatar Aug 20 '22 17:08 cmartin1968

Have you got a sample of the file?

ritchie46 avatar Aug 20 '22 17:08 ritchie46

The file contains personal healthcare information so I'd have to de-identify it first. How many lines would you like to see in a sample?

cmartin1968 avatar Aug 20 '22 17:08 cmartin1968

The file contains personal healthcare information so I'd have to de-identify it first. How many lines would you like to see in a sample?

Something manageable that reproduces the behavior. We could concatenate the file a few times if needed.

ritchie46 avatar Aug 20 '22 17:08 ritchie46

I'll work on producing a de-identified version and upload it when ready. May take a few days but I'll come back when ready.

cmartin1968 avatar Aug 20 '22 18:08 cmartin1968

I'll work on producing a de-identified version and upload it when ready. May take a few days but I'll come back when ready.

Ok, thanks!

ritchie46 avatar Aug 20 '22 18:08 ritchie46

Does the file has quoted fields and columns with a lot of text (with embedded newlines?)?

Could you print the dtypes for all columns?

ghuls avatar Aug 20 '22 18:08 ghuls

No, it's a pretty clean file. No quoted fields, no lengthy text fields, and no embedded newlines. It does have a fair amount of missing data for many of the columns though. I'm working on prepping a sample data file to upload once I remove all personal identifying information.

dtypes are as follows:

mydtypes = { 'mbrid' : pl.Utf8, 'actdate' : pl.Date, 'compid' : pl.Int64, 'ext_pcp_id' : pl.Utf8, 'group_code' : pl.Utf8, 'center_code' : pl.Utf8, 'product' : pl.Utf8, 'subproduct' : pl.Utf8, 'payorcode' : pl.Utf8, 'authnumber' : pl.Utf8, 'billtype' : pl.Utf8, 'ahr' : pl.Utf8, 'dhr' : pl.Utf8, 'dstatus' : pl.Utf8, 'chkdate' : pl.Date, 'claimnum' : pl.Utf8, 'claimlinenum' : pl.Int64, 'claimstatus' : pl.Utf8, 'claimtype' : pl.Utf8, 'claimtypesub' : pl.Utf8, 'admittype' : pl.Utf8, 'admittingdiag' : pl.Utf8, 'principaldiag' : pl.Utf8, 'diag1' : pl.Utf8, 'diag2' : pl.Utf8, 'diag3' : pl.Utf8, 'diag4' : pl.Utf8, 'diag5' : pl.Utf8, 'diag6' : pl.Utf8, 'diag7' : pl.Utf8, 'diag8' : pl.Utf8, 'diag9' : pl.Utf8, 'diag10' : pl.Utf8, 'diag11' : pl.Utf8, 'diag12' : pl.Utf8, 'diag13' : pl.Utf8, 'diag14' : pl.Utf8, 'diag15' : pl.Utf8, 'diag16' : pl.Utf8, 'diag17' : pl.Utf8, 'diag18' : pl.Utf8, 'diag19' : pl.Utf8, 'diag20' : pl.Utf8, 'diag21' : pl.Utf8, 'diag22' : pl.Utf8, 'diag23' : pl.Utf8, 'diag24' : pl.Utf8, 'diag25' : pl.Utf8, 'admittingdiag_icdversion' : pl.Int8, 'principaldiag_icdversion' : pl.Int8, 'diag1_icdversion' : pl.Int8, 'diag2_icdversion' : pl.Int8, 'diag3_icdversion' : pl.Int8, 'diag4_icdversion' : pl.Int8, 'diag5_icdversion' : pl.Int8, 'diag6_icdversion' : pl.Int8, 'diag7_icdversion' : pl.Int8, 'diag8_icdversion' : pl.Int8, 'diag9_icdversion' : pl.Int8, 'diag10_icdversion' : pl.Int8, 'diag11_icdversion' : pl.Int8, 'diag12_icdversion' : pl.Int8, 'diag13_icdversion' : pl.Int8, 'diag14_icdversion' : pl.Int8, 'diag15_icdversion' : pl.Int8, 'diag16_icdversion' : pl.Int8, 'diag17_icdversion' : pl.Int8, 'diag18_icdversion' : pl.Int8, 'diag19_icdversion' : pl.Int8, 'diag20_icdversion' : pl.Int8, 'diag21_icdversion' : pl.Int8, 'diag22_icdversion' : pl.Int8, 'diag23_icdversion' : pl.Int8, 'diag24_icdversion' : pl.Int8, 'diag25_icdversion' : pl.Int8, 'drgcode' : pl.Utf8, 'patientstatus' : pl.Utf8, 'pos' : pl.Utf8, 'prinproccode' : pl.Utf8, 'proccode1' : pl.Utf8, 'proccode2' : pl.Utf8, 'proccode3' : pl.Utf8, 'proccode4' : pl.Utf8, 'proccode5' : pl.Utf8, 'revenuecode' : pl.Utf8, 'hcpcscode' : pl.Utf8, 'proccode' : pl.Utf8, 'modifier1' : pl.Utf8, 'modifier2' : pl.Utf8, 'modifier3' : pl.Utf8, 'modifier4' : pl.Utf8, 'qty' : pl.Float64, 'billed_amt' : pl.Float64, 'allowed_amt' : pl.Float64, 'cob_amt' : pl.Float64, 'copay_amt' : pl.Float64, 'denied_amt' : pl.Float64, 'netpaid_amt' : pl.Float64, 'svcproviderid' : pl.Utf8, 'svcprovidername' : pl.Utf8, 'svcproviderspec' : pl.Utf8, 'svcproviderparstatus' : pl.Utf8, 'servicestartdate' : pl.Date, 'serviceenddate' : pl.Date, 'pcpclaim' : pl.Int8, 'mid' : pl.Int64, 'provid' : pl.Int64, 'pcpid' : pl.Int64, 'groupid' : pl.Int64, 'centerid' : pl.Int64, 'lobid' : pl.Int16, 'lobsubid' : pl.Int16, 'payorid' : pl.Int16, 'DIAGNOSIS_1' : pl.Utf8, 'DIAGNOSIS_2' : pl.Utf8, 'DIAGNOSIS_3' : pl.Utf8, 'DIAGNOSIS_4' : pl.Utf8, 'DIAGNOSIS_5' : pl.Utf8, 'DIAGNOSIS_6' : pl.Utf8, 'DIAGNOSIS_7' : pl.Utf8, 'DIAGNOSIS_8' : pl.Utf8, 'DIAGNOSIS_9' : pl.Utf8, 'DIAGNOSIS_10' : pl.Utf8, 'DIAGNOSIS_11' : pl.Utf8, 'DIAGNOSIS_12' : pl.Utf8, 'DIAGNOSIS_13' : pl.Utf8, 'DIAGNOSIS_14' : pl.Utf8, 'DIAGNOSIS_15' : pl.Utf8, 'DIAGNOSIS_16' : pl.Utf8, 'DIAGNOSIS_17' : pl.Utf8, 'DIAGNOSIS_18' : pl.Utf8, 'DIAGNOSIS_19' : pl.Utf8, 'DIAGNOSIS_20' : pl.Utf8, 'DIAGNOSIS_21' : pl.Utf8, 'DIAGNOSIS_22' : pl.Utf8, 'DIAGNOSIS_23' : pl.Utf8, 'DIAGNOSIS_24' : pl.Utf8, 'DIAGNOSIS_25' : pl.Utf8, }

cmartin1968 avatar Aug 20 '22 19:08 cmartin1968

Is reading also slow with: use_pyarrow=True (remove dtype option).

ghuls avatar Aug 22 '22 13:08 ghuls

Just chiming in to say that I, too, am struggling with this issue. Although I have tried extensively, I am currently unable to create a sample dataset that reproduces this problem at the moment.

Edit: This is a bit lengthy, but it might illustrate the point. The difference between the two methods seems to grow exponentially, i.e. a 100M row Dataset takes days to read with read_csv and dtypes vs. 3 minutes with cast:

def lazy_cast_csv_columns(df, schema):
    # cast all except Boolean and Datetime
    df = df.with_columns(
        [
            pl.col(c).cast(t)
            for c, t in schema.items()
            if t not in [pl.Datetime("us"), pl.Boolean]
        ]
    )
    # cast datetimes
    df = df.with_columns(
        [
            pl.col(c).str.strptime(pl.Datetime("us"))
            for c, t in schema.items()
            if t == pl.Datetime("us")
        ]
    )
    # parse bool columns
    df = df.with_columns(
        [
            pl.when(pl.col(c) == "true")
            .then(True)
            .when(pl.col(c) == "false")
            .then(False)
            .otherwise(None)
            .alias(c)
            for c, t in schema.items()
            if t == pl.Boolean
        ]
    )
    return df

# generate fake schema that corresponds to real dataset
fake_schema = {
    "a": pl.Int64,
    "b": pl.Utf8,
    "c": pl.Boolean,
    "d": pl.Float64,
    "e": pl.Int64,
    "f": pl.Float64,
    "g": pl.Boolean,
    "h": pl.Int64,
    "i": pl.Float64,
    "j": pl.Int64,
    "k": pl.Float64,
    "l": pl.Int64,
    "m": pl.Float64,
    "n": pl.Datetime('us'),
    "o": pl.Datetime('us'),
    "p": pl.Utf8,
    "q": pl.Utf8,
    "r": pl.Utf8,
    "s": pl.Utf8,
    "t": pl.Utf8,
    "u": pl.Utf8,
    "v": pl.Int64,
    "w": pl.Utf8,
    "x": pl.Utf8,
    "y": pl.Utf8,
    "z": pl.Utf8,
    "A": pl.Utf8,
    "B": pl.Utf8,
    "C": pl.Utf8,
    "D": pl.Float64,
    "E": pl.Float64,
    "F": pl.Utf8,
    "G": pl.Utf8,
    "H": pl.Utf8,
    "I": pl.Utf8,
}

# generate some variety of fake data for each dtype
fake_data = {
    pl.Boolean: ["true", "false", "", "true"],
    pl.Float64: ["1516.64", "", "345.6", "745667.345"],
    pl.Int64: ["38456", "634", "", "753"],
    pl.Utf8: ["", "lorem ipsum", "sit amat", "after work"],
    pl.Datetime("us"): [
        "2021-07-22T04:07:40.938Z",
        "2000-07-22T04:07:40.938Z",
        "",
        "2015-07-22T04:07:40.938Z",
    ],
}

# generate rows from fake_schema and fake_data
rows = (
    "\n".join([",".join(x) for x in zip(*[fake_data[t] for c, t in schema.items()])])
    + "\n"
)

# get headers from fake schema
headers = ",".join(schema.keys()) + "\n"

# generate a csv string
csv_str = headers + (rows * 1_000_000)

# generate a test path
test_path = Path.home() / "test.csv"

# write csv
with test_path.open("w") as f:
    f.write(csv_str)
%%time
# test with standard csv reader
df1 = pl.read_csv(test_path, dtypes=schema)
# CPU times: user 2min 2s, sys: 3.77 s, total: 2min 6s
# Wall time: 2.22 s
%%time
# test with lazy_cast_csv_columns - faster
df2 = pl.scan_csv(test_path, infer_schema_length=0)
df2 = lazy_cast_csv_columns(df2, schema)
df2 = df2.collect()
# CPU times: user 10.7 s, sys: 4.82 s, total: 15.5 s
# Wall time: 1.27 s
# check if result is equal
df1.frame_equal(df2)
# True

DrMaphuse avatar Sep 08 '22 10:09 DrMaphuse

I think it is due the time columns parsing:

In [108]: %%time
     ...: # test with standard csv reader
     ...: df1 = pl.read_csv(test_path, infer_schema_length=0, dtypes=fake_schema)
     ...: 
     ...: 
CPU times: user 1min 29s, sys: 1.32 s, total: 1min 30s
Wall time: 7.75 s

In [109]: %%time
     ...: # test with lazy_cast_csv_columns - faster
     ...: df2 = pl.scan_csv(test_path, infer_schema_length=0)
     ...: df2 = lazy_cast_csv_columns(df2, fake_schema)
     ...: df2 = df2.collect()
     ...: 
     ...: 
CPU times: user 7.6 s, sys: 2.25 s, total: 9.86 s
Wall time: 1.34 s

In [111]: %%time
     ...: # test with standard csv reader (reading everything as pl.Utf8 columns and running the same casting function afterwards).
     ...: df3 = lazy_cast_csv_columns(pl.read_csv(test_path, infer_schema_length=0), fake_schema)
     ...: 
     ...: 
CPU times: user 8.23 s, sys: 2.22 s, total: 10.4 s
Wall time: 1.44 s

In [112]: %%time
     ...: # test with standard csv reader with type inference and run the same casting function afterwards).
     ...: df4 = lazy_cast_csv_columns(pl.read_csv(test_path), fake_schema)
     ...: 
     ...: 
CPU times: user 7.8 s, sys: 1.78 s, total: 9.58 s
Wall time: 1.29 s

ghuls avatar Oct 20 '22 14:10 ghuls

First writing the CSV with polars and then reading it, solves it.

df1 = pl.read_csv(test_path, infer_schema_length=0, dtypes=fake_schema)
In [156]: df1.write_csv("~/test.written_by_polars.csv")
In [155]: %%time
     ...: df1_csv_from_polars = pl.read_csv("~/test.written_by_polars.csv", infer_schema_length=0, dtypes=fake_schema)
     ...: 
     ...: 
CPU times: user 10.7 s, sys: 1.96 s, total: 12.7 s
Wall time: 1.18 s

Original file has timestamps like this: 2021-07-22T04:07:40.938Z. While polars written CSV file has: 2021-07-22T04:07:40.938000.

image

ghuls avatar Oct 20 '22 14:10 ghuls

closing as stale & unable to properly reproduce

universalmind303 avatar Dec 11 '23 06:12 universalmind303