io
io copied to clipboard
IODataset.from_parquet very slow when reading from GCS
I have constructed a test case that uses tfio.IODataset.from_parquet to read some data and it is much slower when reading from GCS compared to local files. That includes the time to download the files to local storage using using "gsutil".
Time to download files using gsutil
Downloading the files from GCS using gsutil takes less than 5 seconds. So why is IODataset.from_parquet so slow when reading from GCS?
> time gsutil -m cp gs://$REMOTE/* test/
real 0m2.795s
user 0m4.080s
sys 0m1.509s
Optimization
I tried many different combinations of things to optimize the read time. Some highlights:
- prefetch - Adding prefetch in various places (after the from_parquet or after the interleave) does not help.
- num_parallel_calls - Increasing the parallelization via interleave makes things worse when reading from GCS, but better when reading from local files.
- batch - Adding "batch" after the from_parquet improves both cases. It doesn't close the gap between two.
Results
| num_parallel_calls | read_method | execution_time |
|---|---|---|
| 1 | gfs | 21.92511743 |
| 1 | gfs | 21.32676973 |
| 1 | gfs | 19.28669586 |
| 1 | gfs | 19.11115026 |
| 4 | gfs | 142.9781562 |
| 4 | gfs | 82.91326238 |
| 4 | gfs | 83.15559933 |
| 4 | gfs | 77.25469493 |
| 8 | gfs | 122.6821029 |
| 8 | gfs | 107.7929496 |
| 8 | gfs | 100.1373496 |
| 8 | gfs | 96.95652105 |
| 16 | gfs | 100.9381117 |
| 16 | gfs | 96.3136123 |
| 16 | gfs | 81.40217685 |
| 16 | gfs | 74.11826929 |
| 1 | local | 11.45320377 |
| 1 | local | 7.349745569 |
| 1 | local | 11.23507389 |
| 1 | local | 7.508471486 |
| 4 | local | 4.817599988 |
| 4 | local | 3.924400457 |
| 4 | local | 4.212198135 |
| 4 | local | 4.115252805 |
| 8 | local | 4.005491037 |
| 8 | local | 3.916457508 |
| 8 | local | 3.997095786 |
| 8 | local | 4.219035345 |
| 16 | local | 3.899692797 |
| 16 | local | 3.965225237 |
| 16 | local | 3.955027787 |
| 16 | local | 3.973490917 |
Code
import time
import pandas as pd
import numpy as np
import tensorflow as tf
import tensorflow_io as tfio
import subprocess
# Parquet file specs
COLUMNS = 10
TOTAL_ROWS = 1000000
FILE_COUNT = 40
ROWS_PER_FILE = TOTAL_ROWS // FILE_COUNT
COLUMN_SPEC = {
f"pred_{i}": tf.TensorSpec(tf.TensorShape((None)), tf.float64)
for i in range(COLUMNS)
}
# Directories
TEST_RUN_ID = "test_1"
LOCAL_PREFIX = TEST_RUN_ID
GFS_PREFIX = "gs://bucket/folder" + TEST_RUN_ID
# Create files
subprocess.run(["mkdir", LOCAL_PREFIX]).check_returncode()
for i in range(FILE_COUNT):
data = np.random.rand(ROWS_PER_FILE, COLUMNS)
df = pd.DataFrame(data, columns=COLUMN_SPEC.keys())
df.to_parquet(f"{LOCAL_PREFIX}/test-{i}.parquet")
# Upload to GCS
subprocess.run(
["gsutil", "-m", "cp", f"{LOCAL_PREFIX}/*", GFS_PREFIX]
).check_returncode()
READ_METHODS = {"gfs": f"{GFS_PREFIX}/*", "local": f"{LOCAL_PREFIX}/*"}
for read_method, file_path in READ_METHODS.items():
for num_parallel_calls in [1, 4, 8, 16]:
for run in range(4):
start_time = time.perf_counter()
ds = tf.data.Dataset.list_files(file_path)
ds = ds.interleave(
lambda f: tfio.IODataset.from_parquet(f, columns=COLUMN_SPEC).batch(
200
),
cycle_length=num_parallel_calls,
num_parallel_calls=num_parallel_calls,
deterministic=False,
)
# Force TF to read all the data.
observations = ds.reduce(np.int64(0), lambda x, _: x + 1).numpy()
execution_time = time.perf_counter() - start_time
print(f"{observations} {num_parallel_calls} {read_method} {execution_time}")
Environemnt
Running on Google Cloud VM. Instance type: e2-standard-4
> pip list | grep tensor
tensorboard 2.9.1
tensorboard-data-server 0.6.1
tensorboard-plugin-profile 2.8.0
tensorboard-plugin-wit 1.8.1
tensorflow 2.9.2
tensorflow-estimator 2.9.0
tensorflow-io 0.26.0
tensorflow-io-gcs-filesystem 0.26.0