Daft
Daft copied to clipboard
Defining a sampling distribution
I have parquet files divided into sub populations, I want to define a sampling distribution of the dataset to allow to sample from that distribution while training with pytorch.
In Tensorflow I have implemented it with:
ds = tf.data.experimental.sample_from_datasets(
[ds1, ds2], [0.7, 0.3]
)
In pytorch I have implement it with:
class WeightedDataset(IterableDataset):
def __init__(self, datasets: List[Dataset], weights: List[float]):
self.datasets = datasets
self.weights = weights
def __iter__(self):
ds_iters = [cycle(dataset) for dataset in self.datasets]
while True:
ds = np.random.choice(ds_iters, p=self.weights)
yield next(ds)
ds = WeightedDataset([ds1, ds2], [0.7, 0.3])
Currently I am using the second method with Daft which is prone to OOM as the amount of subpopulations gets bigger. Is there an option currently to define this sampling logic within daft based on a column? or hive partitioning? If not, is it possible to implement this logic within daft in way that would be less resource intensive?
Ideally I would like to do:
df = df.sample_with(col="colname", weights={"value1": 0.7, "value2": 0.3})
Or,
df = daft.from_dataframes([df1, df2], weights=[0.7, 0.3])
What do you say?
We currently have a df.sample method that samples a fraction of a dataframe, you could do something like:
df1 = df1.sample(0.7)
df2 = df2.sample(0.3)
df = df1.concat(df2)
Would this work for your use case?
No, since the number of samples in each dataframe is different and I want df1 samples to represent 0.3 of the df distribution. We use this feature to deal with imbalanced data, to increase the probability of sampling a class that is either wise has low sampling probability.
Following is a toy example of how we currently sample from our dataset:
import tensorflow as tf
ds1 = tf.data.Dataset.from_tensor_slices(tf.ones([1])).repeat()
ds2 = tf.data.Dataset.from_tensor_slices(tf.zeros([100])).repeat()
n_iters = 100
batch_size= 100
ds = tf.data.experimental.sample_from_datasets([ds1, ds2],
weights=[0.3, 0.7]).batch(batch_size)
iter_ds = iter(ds)
s = 0
for _ in range(n_iters):
s += sum(next(iter_ds))
print(s / (n_iters * batch_size))
does this make sense?
Ah I see now. One more question, could you use batch_size and weights to calculate the number of rows returned from each dataframe? For example:
batch_size = 100
weights = [0.3, 0.7]
rows_to_sample = [batch_size * w for w in weights] # [30, 70]
If that's possible for your use case, we can add an option in the sample method to specify an exact number of rows, e.g. df.sample(n = 30) which would sample 30 rows from df.
No, it is not possible as well, since in some cases we have something like this:
batch_size = 2
weights = [0.1, 0.1, 0.8]
Gotcha. Going back to the proposed API: df = daft.from_dataframes([df1, df2], weights=[0.7, 0.3]), would it be correct to say that the ouput of from_dataframes is a new dataframe with total number of rows equal to the sum of df1 and df2 rows, i.e. len(df) = len(df1) + len(df2), but the distribution of df is 70% df1 and 30% df2?
No, in our current process we do something like this:
ds = tf.data.experimental.sample_from_datasets([ds1, ds2, ..., dsn],
weights=[w1, w2, ..., wn])
Thus the sampling is a random process and the amount of samples in each dataset is different. That makes the size of the dataset(total amount of samples being drawn until a StopIteration is reached) depend on the first dataset (one of ds1, ds2, ..., dsn) that would get empty.
Ok, just wanna confirm:
- Are you using the to_torch_iter_dataset method?
- Are you using PyRunner?
We're gonna try reproduce this, I believe the issue with the OOM is that we are materializing too many partitions
Yes to both questions.
Regarding the OOM statement(I belive you are refering to #2383 ?), I think you are right :)
Hi @colin-ho,
Did you manage to reproduce the OOM with multiple datasets and the above sampling?
Do you think by any chance we can control the memory usage of the datasets by the newly added daft.set_execution_config(parquet_inflation_factor=...) from #2383?
From an offline conversation today:
- Fixing #2493 should help a lot with Daft being overly-aggressive with scheduling when streaming data out of multiple dataframes.
- Adding a new multi-stream concat operator (e.g.
daft.sampled_concat([df1, df2, ...], [0.5, 0.4, ...])) would allow the Daft execution engine even better control around "muxing" streams of data coming out of each dataframe.
cc @samster25