Daft icon indicating copy to clipboard operation
Daft copied to clipboard

Defining a sampling distribution

Open GuyPozner opened this issue 1 year ago • 10 comments

I have parquet files divided into sub populations, I want to define a sampling distribution of the dataset to allow to sample from that distribution while training with pytorch.

In Tensorflow I have implemented it with:

ds = tf.data.experimental.sample_from_datasets(
    [ds1, ds2], [0.7, 0.3]
)

In pytorch I have implement it with:

class WeightedDataset(IterableDataset):
    def __init__(self, datasets: List[Dataset], weights: List[float]):
        self.datasets = datasets
        self.weights = weights

    def __iter__(self):
        ds_iters = [cycle(dataset) for dataset in self.datasets]
        while True:
            ds = np.random.choice(ds_iters, p=self.weights)
            yield next(ds)

ds = WeightedDataset([ds1, ds2], [0.7, 0.3])

Currently I am using the second method with Daft which is prone to OOM as the amount of subpopulations gets bigger. Is there an option currently to define this sampling logic within daft based on a column? or hive partitioning? If not, is it possible to implement this logic within daft in way that would be less resource intensive?

Ideally I would like to do:

df = df.sample_with(col="colname", weights={"value1": 0.7, "value2": 0.3})

Or,

df = daft.from_dataframes([df1, df2], weights=[0.7, 0.3])

What do you say?

GuyPozner avatar Jun 02 '24 11:06 GuyPozner

We currently have a df.sample method that samples a fraction of a dataframe, you could do something like:

df1 = df1.sample(0.7)
df2 = df2.sample(0.3)
df = df1.concat(df2)

Would this work for your use case?

colin-ho avatar Jun 05 '24 16:06 colin-ho

No, since the number of samples in each dataframe is different and I want df1 samples to represent 0.3 of the df distribution. We use this feature to deal with imbalanced data, to increase the probability of sampling a class that is either wise has low sampling probability.

Following is a toy example of how we currently sample from our dataset:

import tensorflow as tf

ds1 = tf.data.Dataset.from_tensor_slices(tf.ones([1])).repeat()
ds2 = tf.data.Dataset.from_tensor_slices(tf.zeros([100])).repeat()

n_iters = 100
batch_size= 100
ds = tf.data.experimental.sample_from_datasets([ds1, ds2],
                                               weights=[0.3, 0.7]).batch(batch_size)
iter_ds = iter(ds)
s = 0
for _ in range(n_iters):
    s += sum(next(iter_ds))
    
print(s / (n_iters * batch_size))

does this make sense?

GuyPozner avatar Jun 06 '24 14:06 GuyPozner

Ah I see now. One more question, could you use batch_size and weights to calculate the number of rows returned from each dataframe? For example:

batch_size = 100
weights = [0.3, 0.7]
rows_to_sample = [batch_size * w for w in weights] # [30, 70]

If that's possible for your use case, we can add an option in the sample method to specify an exact number of rows, e.g. df.sample(n = 30) which would sample 30 rows from df.

colin-ho avatar Jun 06 '24 22:06 colin-ho

No, it is not possible as well, since in some cases we have something like this:

batch_size = 2
weights = [0.1, 0.1, 0.8]

GuyPozner avatar Jun 08 '24 09:06 GuyPozner

Gotcha. Going back to the proposed API: df = daft.from_dataframes([df1, df2], weights=[0.7, 0.3]), would it be correct to say that the ouput of from_dataframes is a new dataframe with total number of rows equal to the sum of df1 and df2 rows, i.e. len(df) = len(df1) + len(df2), but the distribution of df is 70% df1 and 30% df2?

colin-ho avatar Jun 11 '24 19:06 colin-ho

No, in our current process we do something like this:

ds = tf.data.experimental.sample_from_datasets([ds1, ds2, ..., dsn],
                                                                                   weights=[w1, w2, ..., wn])

Thus the sampling is a random process and the amount of samples in each dataset is different. That makes the size of the dataset(total amount of samples being drawn until a StopIteration is reached) depend on the first dataset (one of ds1, ds2, ..., dsn) that would get empty.

GuyPozner avatar Jun 17 '24 06:06 GuyPozner

Ok, just wanna confirm:

We're gonna try reproduce this, I believe the issue with the OOM is that we are materializing too many partitions

colin-ho avatar Jun 17 '24 23:06 colin-ho

Yes to both questions.

Regarding the OOM statement(I belive you are refering to #2383 ?), I think you are right :)

GuyPozner avatar Jun 18 '24 05:06 GuyPozner

Hi @colin-ho, Did you manage to reproduce the OOM with multiple datasets and the above sampling? Do you think by any chance we can control the memory usage of the datasets by the newly added daft.set_execution_config(parquet_inflation_factor=...) from #2383?

michaelvay avatar Jul 09 '24 07:07 michaelvay

From an offline conversation today:

  1. Fixing #2493 should help a lot with Daft being overly-aggressive with scheduling when streaming data out of multiple dataframes.
  2. Adding a new multi-stream concat operator (e.g. daft.sampled_concat([df1, df2, ...], [0.5, 0.4, ...])) would allow the Daft execution engine even better control around "muxing" streams of data coming out of each dataframe.

cc @samster25

jaychia avatar Jul 09 '24 23:07 jaychia