io icon indicating copy to clipboard operation
io copied to clipboard

Streaming interleaved parquet files

Open angusl-gr opened this issue 3 years ago • 3 comments

Hi,

I'm trying to stream multiple Parquet files into a Keras model in an interleaved streaming fashion: e.g. stream rows from say ["file_1.parquet", ...,, "file_4.parquet"] and interleave them, and when one file is exhausted open file_5.parquet in its place and so on. The best I can do is effectively the final comment in this similar issue, but the performance is extremely slow compared to just writing my own generator which reads whole file in and produces batches. I have also tried the tensorflow_io.arrow integration but I get a lot of warnings, the performance seems poor and I can't find a straightforward way to do interleaving. Am I missing something or is this the best I can do?

Thanks!

My code for reference:

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio


def convert_ordered_dict_to_tensor(d):
    vals = list(d.values())
    return tf.transpose(tf.convert_to_tensor(vals))


if __name__ == "__main__":
    for i in range(10):
        data = np.random.rand(10**6, 100).astype(np.float32)
        df = pd.DataFrame(data)
        df.columns = [str(col_name) for col_name in df.columns]
        df.to_parquet(f"file_{i}.parquet")

    batch_size = 1024
    num_workers = 2
    columns = {f"{i}": tf.TensorSpec(tf.TensorShape([]), tf.float32) for i in range(100)}
    ds = tf.data.Dataset.list_files("file_*.parquet")
    ds = (
        ds.interleave(
            lambda f: tfio.IODataset.from_parquet(f, columns=columns),
            cycle_length=2,
            block_length=2,
            num_parallel_calls=num_workers,
        )
        .batch(batch_size)
        .map(convert_ordered_dict_to_tensor)
        .prefetch(2)
    )
    for batch in ds:
        pass

angusl-gr avatar Jun 20 '22 09:06 angusl-gr

Just bumping as I still haven't managed to find a resolution to this

angusl-gr avatar Jul 13 '22 15:07 angusl-gr

I have the same issue. Using tfio.IODataset.from_parquet is extremely slow when used with interleave, but without interleave seems to work fine.

dsiegel avatar Sep 02 '22 22:09 dsiegel

I found that moving the batch inside the interleave` speeds things up a lot. Also "deterministic=False" might help. cycle_length should be >= num_workers.

ds.interleave(
            lambda f: tfio.IODataset.from_parquet(f, columns=columns).batch(2000),
            cycle_length=num_workers,
            block_length=1,
            num_parallel_calls=num_workers,
            deterministic=False,
        )
        .map(convert_ordered_dict_to_tensor)
        .prefetch(2)

Maybe related: #1709

dsiegel avatar Sep 06 '22 14:09 dsiegel