io icon indicating copy to clipboard operation
io copied to clipboard

IODataset.from_parquet very slow when reading from GCS

Open dsiegel opened this issue 3 years ago • 0 comments

I have constructed a test case that uses tfio.IODataset.from_parquet to read some data and it is much slower when reading from GCS compared to local files. That includes the time to download the files to local storage using using "gsutil".

Time to download files using gsutil

Downloading the files from GCS using gsutil takes less than 5 seconds. So why is IODataset.from_parquet so slow when reading from GCS?

> time gsutil -m cp  gs://$REMOTE/* test/
real	0m2.795s
user	0m4.080s
sys 0m1.509s

Optimization

I tried many different combinations of things to optimize the read time. Some highlights:

  • prefetch - Adding prefetch in various places (after the from_parquet or after the interleave) does not help.
  • num_parallel_calls - Increasing the parallelization via interleave makes things worse when reading from GCS, but better when reading from local files.
  • batch - Adding "batch" after the from_parquet improves both cases. It doesn't close the gap between two.

Results

num_parallel_calls read_method execution_time
1 gfs 21.92511743
1 gfs 21.32676973
1 gfs 19.28669586
1 gfs 19.11115026
4 gfs 142.9781562
4 gfs 82.91326238
4 gfs 83.15559933
4 gfs 77.25469493
8 gfs 122.6821029
8 gfs 107.7929496
8 gfs 100.1373496
8 gfs 96.95652105
16 gfs 100.9381117
16 gfs 96.3136123
16 gfs 81.40217685
16 gfs 74.11826929
1 local 11.45320377
1 local 7.349745569
1 local 11.23507389
1 local 7.508471486
4 local 4.817599988
4 local 3.924400457
4 local 4.212198135
4 local 4.115252805
8 local 4.005491037
8 local 3.916457508
8 local 3.997095786
8 local 4.219035345
16 local 3.899692797
16 local 3.965225237
16 local 3.955027787
16 local 3.973490917

Code

import time
import pandas as pd
import numpy as np
import tensorflow as tf
import tensorflow_io as tfio
import subprocess

# Parquet file specs
COLUMNS = 10
TOTAL_ROWS = 1000000
FILE_COUNT = 40
ROWS_PER_FILE = TOTAL_ROWS // FILE_COUNT
COLUMN_SPEC = {
    f"pred_{i}": tf.TensorSpec(tf.TensorShape((None)), tf.float64)
    for i in range(COLUMNS)
}

# Directories
TEST_RUN_ID = "test_1"
LOCAL_PREFIX = TEST_RUN_ID
GFS_PREFIX = "gs://bucket/folder" + TEST_RUN_ID

# Create files
subprocess.run(["mkdir", LOCAL_PREFIX]).check_returncode()
for i in range(FILE_COUNT):
    data = np.random.rand(ROWS_PER_FILE, COLUMNS)
    df = pd.DataFrame(data, columns=COLUMN_SPEC.keys())
    df.to_parquet(f"{LOCAL_PREFIX}/test-{i}.parquet")

# Upload to GCS
subprocess.run(
    ["gsutil", "-m", "cp", f"{LOCAL_PREFIX}/*", GFS_PREFIX]
).check_returncode()

READ_METHODS = {"gfs": f"{GFS_PREFIX}/*", "local": f"{LOCAL_PREFIX}/*"}

for read_method, file_path in READ_METHODS.items():
    for num_parallel_calls in [1, 4, 8, 16]:
        for run in range(4):
            start_time = time.perf_counter()
            ds = tf.data.Dataset.list_files(file_path)
            ds = ds.interleave(
                lambda f: tfio.IODataset.from_parquet(f, columns=COLUMN_SPEC).batch(
                    200
                ),
                cycle_length=num_parallel_calls,
                num_parallel_calls=num_parallel_calls,
                deterministic=False,
            )
            # Force TF to read all the data.
            observations = ds.reduce(np.int64(0), lambda x, _: x + 1).numpy()
            execution_time = time.perf_counter() - start_time
            print(f"{observations} {num_parallel_calls} {read_method} {execution_time}")


Environemnt

Running on Google Cloud VM. Instance type: e2-standard-4

> pip list | grep tensor
tensorboard                  2.9.1
tensorboard-data-server      0.6.1
tensorboard-plugin-profile   2.8.0
tensorboard-plugin-wit       1.8.1
tensorflow                   2.9.2
tensorflow-estimator         2.9.0
tensorflow-io                0.26.0
tensorflow-io-gcs-filesystem 0.26.0

dsiegel avatar Sep 06 '22 14:09 dsiegel