datasets
datasets copied to clipboard
how to convert a dict generator into a huggingface dataset.
Link
No response
Description
Hey there, I have used seqio to get a well distributed mixture of samples from multiple dataset. However the resultant output from seqio is a python generator dict, which I cannot produce back into huggingface dataset.
The generator contains all the samples needed for training the model but I cannot convert it into a huggingface dataset.
The code looks like this:
for ex in seqio_data:
print(ex[“text”])
I need to convert the seqio_data (generator) into huggingface dataset.
the complete seqio code goes here:
import functools
import seqio
import tensorflow as tf
import t5.data
from datasets import load_dataset
from t5.data import postprocessors
from t5.data import preprocessors
from t5.evaluation import metrics
from seqio import FunctionDataSource, utils
TaskRegistry = seqio.TaskRegistry
def gen_dataset(split, shuffle=False, seed=None, column="text", dataset_params=None):
dataset = load_dataset(**dataset_params)
if shuffle:
if seed:
dataset = dataset.shuffle(seed=seed)
else:
dataset = dataset.shuffle()
while True:
for item in dataset[str(split)]:
yield item[column]
def dataset_fn(split, shuffle_files, seed=None, dataset_params=None):
return tf.data.Dataset.from_generator(
functools.partial(gen_dataset, split, shuffle_files, seed, dataset_params=dataset_params),
output_signature=tf.TensorSpec(shape=(), dtype=tf.string, name=dataset_name)
)
@utils.map_over_dataset
def target_to_key(x, key_map, target_key):
"""Assign the value from the dataset to target_key in key_map"""
return {**key_map, target_key: x}
dataset_name = 'oscar-corpus/OSCAR-2109'
subset= 'mr'
dataset_params = {"path": dataset_name, "language":subset, "use_auth_token":True}
dataset_shapes = None
TaskRegistry.add(
"oscar_marathi_corpus",
source=seqio.FunctionDataSource(
dataset_fn=functools.partial(dataset_fn, dataset_params=dataset_params),
splits=("train", "validation"),
caching_permitted=False,
num_input_examples=dataset_shapes,
),
preprocessors=[
functools.partial(
target_to_key, key_map={
"targets": None,
}, target_key="targets")],
output_features={"targets": seqio.Feature(vocabulary=seqio.PassThroughVocabulary, add_eos=False, dtype=tf.string, rank=0)},
metric_fns=[]
)
dataset = seqio.get_mixture_or_task("oscar_marathi_corpus").get_dataset(
sequence_length=None,
split="train",
shuffle=True,
num_epochs=1,
shard_info=seqio.ShardInfo(index=0, num_shards=10),
use_cached=False,
seed=42
)
for _, ex in zip(range(5), dataset):
print(ex['targets'].numpy().decode())
Owner
No response
@albertvillanova @lhoestq , could you please help me on this issue.
Hi ! As mentioned on the forum, the simplest for now would be to define a dataset script which can contain your generator. But we can also explore adding something like ds = Dataset.from_iterable(seqio_dataset)
@lhoestq , hey i did as you instructed, but sadly i cannot get pass through the download_manager, as i dont have anything to download. i was skipping the def _split_generators(self, dl_manager): function. but i cannot get around it. I get a NotImplementedError:
the following is my code for the same:
import datasets
import functools
import glob
from datasets import load_from_disk
import seqio
import tensorflow as tf
import t5.data
from datasets import load_dataset
from t5.data import postprocessors
from t5.data import preprocessors
from t5.evaluation import metrics
from seqio import FunctionDataSource, utils
TaskRegistry = seqio.TaskRegistry
data_path = glob.glob("/home/stephen/Desktop/MEGA_CORPUS/COMBINED_CORPUS/*", recursive=False)
def gen_dataset(split, shuffle=False, seed=None, column="text", dataset_path=None):
dataset = load_from_disk(dataset_path)
if shuffle:
if seed:
dataset = dataset.shuffle(seed=seed)
else:
dataset = dataset.shuffle()
while True:
for item in dataset[str(split)]:
yield item[column]
def dataset_fn(split, shuffle_files, seed=None, dataset_path=None):
return tf.data.Dataset.from_generator(
functools.partial(gen_dataset, split, shuffle_files, seed, dataset_path=dataset_path),
output_signature=tf.TensorSpec(shape=(), dtype=tf.string, name=dataset_path)
)
@utils.map_over_dataset
def target_to_key(x, key_map, target_key):
"""Assign the value from the dataset to target_key in key_map"""
return {**key_map, target_key: x}
_CITATION = "Not ready yet"
_DESCRIPTION = "a custom seqio based mixed samples on a given temperature value, that again returns a dataset in HF dataset format well samples on the Mixture temperature"
_HOMEPAGE = "ldcil.org"
class CustomSeqio(datasets.GeneratorBasedBuilder):
def _info(self):
return datasets.DatasetInfo(
description=_DESCRIPTION,
features=datasets.Features(
{
"text": datasets.Value("string"),
}
),
homepage="https://ldcil.org",
citation=_CITATION,)
def generate_examples(self):
seqio_train_list = []
for lang in data_path:
dataset_name = lang.split("/")[-1]
dataset_shapes = None
TaskRegistry.add(
str(dataset_name),
source=seqio.FunctionDataSource(
dataset_fn=functools.partial(dataset_fn, dataset_path=lang),
splits=("train", "test"),
caching_permitted=False,
num_input_examples=dataset_shapes,
),
preprocessors=[
functools.partial(
target_to_key, key_map={
"targets": None,
}, target_key="targets")],
output_features={"targets": seqio.Feature(vocabulary=seqio.PassThroughVocabulary, add_eos=False, dtype=tf.string, rank=0)},
metric_fns=[]
)
seqio_train_dataset = seqio.get_mixture_or_task(dataset_name).get_dataset(
sequence_length=None,
split="train",
shuffle=True,
num_epochs=1,
shard_info=seqio.ShardInfo(index=0, num_shards=10),
use_cached=False,
seed=42)
seqio_train_list.append(seqio_train_dataset)
lang_name_list = []
for lang in data_path:
lang_name = lang.split("/")[-1]
lang_name_list.append(lang_name)
seqio_mixture = seqio.MixtureRegistry.add(
"seqio_mixture",
lang_name_list,
default_rate=0.7)
seqio_mixture_dataset = seqio.get_mixture_or_task("seqio_mixture").get_dataset(
sequence_length=None,
split="train",
shuffle=True,
num_epochs=1,
shard_info=seqio.ShardInfo(index=0, num_shards=10),
use_cached=False,
seed=42)
for id, ex in enumerate(seqio_mixture_dataset):
yield id, {"text": ex["targets"].numpy().decode()}
and i load it by:
seqio_mixture = load_dataset("seqio_loader")
@lhoestq , just to make things clear ...
the following is my original code, thats not in the HF dataset loading script:
import functools
import seqio
import tensorflow as tf
import t5.data
from datasets import load_from_disk
from t5.data import postprocessors
from t5.data import preprocessors
from t5.evaluation import metrics
from seqio import FunctionDataSource, utils
import glob
TaskRegistry = seqio.TaskRegistry
def gen_dataset(split, shuffle=False, seed=None, column="text", dataset_path=None):
dataset = load_from_disk(dataset_path)
if shuffle:
if seed:
dataset = dataset.shuffle(seed=seed)
else:
dataset = dataset.shuffle()
while True:
for item in dataset[str(split)]:
yield item[column]
def dataset_fn(split, shuffle_files, seed=None, dataset_path=None):
return tf.data.Dataset.from_generator(
functools.partial(gen_dataset, split, shuffle_files, seed, dataset_path=dataset_path),
output_signature=tf.TensorSpec(shape=(), dtype=tf.string, name=dataset_path)
)
@utils.map_over_dataset
def target_to_key(x, key_map, target_key):
"""Assign the value from the dataset to target_key in key_map"""
return {**key_map, target_key: x}
data_path = glob.glob("/home/stephen/Desktop/MEGA_CORPUS/COMBINED_CORPUS/*", recursive=False)
seqio_train_list = []
for lang in data_path:
dataset_name = lang.split("/")[-1]
dataset_shapes = None
TaskRegistry.add(
str(dataset_name),
source=seqio.FunctionDataSource(
dataset_fn=functools.partial(dataset_fn, dataset_path=lang),
splits=("train", "test"),
caching_permitted=False,
num_input_examples=dataset_shapes,
),
preprocessors=[
functools.partial(
target_to_key, key_map={
"targets": None,
}, target_key="targets")],
output_features={"targets": seqio.Feature(vocabulary=seqio.PassThroughVocabulary, add_eos=False, dtype=tf.string, rank=0)},
metric_fns=[]
)
seqio_train_dataset = seqio.get_mixture_or_task(dataset_name).get_dataset(
sequence_length=None,
split="train",
shuffle=True,
num_epochs=1,
shard_info=seqio.ShardInfo(index=0, num_shards=10),
use_cached=False,
seed=42)
seqio_train_list.append(seqio_train_dataset)
lang_name_list = []
for lang in data_path:
lang_name = lang.split("/")[-1]
lang_name_list.append(lang_name)
seqio_mixture = seqio.MixtureRegistry.add(
"seqio_mixture",
lang_name_list,
default_rate=0.7
)
seqio_mixture_dataset = seqio.get_mixture_or_task("seqio_mixture").get_dataset(
sequence_length=None,
split="train",
shuffle=True,
num_epochs=1,
shard_info=seqio.ShardInfo(index=0, num_shards=10),
use_cached=False,
seed=42)
for _, ex in zip(range(15), seqio_mixture_dataset):
print(ex["targets"].numpy().decode())
where the seqio_mixture_dataset is the generator that i wanted to be wrapped in HF dataset.
also additionally, could you please tell me how do i set the default_rate=0.7 args where seqio_mixture is defined to be made as a custom option in the HF load_dataset() method,
maybe like this:
seqio_mixture_dataset = datasets.load_dataset("seqio_loader",temperature=0.5)
I like the idea of having Dataset.from_iterable(iterable) in the API. The only problem is that we also want to make this part cachable, which is tricky if iterable is a generator.
Some resources on this issue:
- https://github.com/uqfoundation/dill/issues/311
- https://stackoverflow.com/questions/7180212/why-cant-generators-be-pickled
- https://github.com/tonyroberts/generator_tools - python package for pickling generators; pickles bytecode, so it creates version-specific dumps
For the caching maybe we can have Dataset.from_generator as TF and pickle+hash the generator function (not the generator object itself) ?
And then keep Dataset.from_iterable fo pickable objects like lists
@lhoestq, @mariosasko do you too have any examples where the dataset is a generator and needs to be wrapped into hf dataset ?
@lhoestq, following to my previous question ... what possibly could be done in this link1 link2 case? do you have any ideas?
@lhoestq +1 for the Dataset.from_generator idea.
Having thought about it, let's avoid adding Dataset.from_iterable to the API since dictionaries are technically iteralbles ("iterable" is a broad term in Python), and we already provide Dataset.from_dict. And for lists maybe we can add Dataset.from_list similar to pa.Table.from_pylist. WDYT?
Hi @StephennFernandes!
To fix the issues in the copied code, rename generate_examples to _generate_examples and add one level of indentation as this is a method of GeneratorBasedBuilder and define _split_generators as follows (again as a method of `GeneratorBasedBuilder):
def _split_generators(self, dl_manager):
return [
datasets.SplitGenerator(
name=datasets.Split.TRAIN,
gen_kwargs={},
),
]
And if you are feeling extra adventurous, you can try to use ArrowWriter to directly create a cache file:
from datasets import Dataset
from datasets.arrow_writer import ArrowWriter
writer = ArrowWriter(path="path/to/cache_file.arrow", writer_batch_size=1000)
with writer:
for ex in generator:
writer.write(ex)
writer.finalize()
dset = Dataset.from_file("path/to/cache_file.arrow")
I have a problem which I think is very similar: I would like to "stream" data to a HF Array (memory-mapped) Dataset, where the final size of the dataset is unknown, but could be much larger than what fits into memory.
What I want to end up with is an Array Dataset which I can open using Dataset.load_from_disk(dataset_path="somename") and use e.g. as the training set.
For this I would have thought there should be an API which allows me to open/create the dataset (and define the features etc), then write examples to the dataset, but I could not find a way to do this.
I tried doing this and it looks like it works, but it feels very hacky and I am not sure if this might fail to update some of the fields in the json files which may turn out to be important:
from datasets import Dataset, Features, ClassLabel, Sequence, Value
from datasets.arrow_writer import ArrowWriter
# 1) define the features
features = Features(dict(
id=Value(dtype="string"),
tokens=Sequence(feature=Value(dtype="string")),
ner_tags=Sequence(feature=ClassLabel(names=['O', 'B-corporation', 'I-corporation', 'B-creative-work', 'I-creative-work', 'B-group', 'I-group', 'B-location', 'I-location', 'B-person', 'I-person', 'B-product', 'I-product'])),
))
# 2) create empty dataset for examples with these features and store to disk
empty = dict(
id = [],
tokens = [],
ner_tags = [],
)
ds = Dataset.from_dict(empty, features=features)
ds.save_to_disk(dataset_path="debug_ds1")
# 3) directly write all the examples to the arrow dataset
with ArrowWriter(path="debug_ds1/dataset.arrow") as writer:
writer.write(dict(id=0, tokens=["a", "b"], ner_tags=[0, 0]))
writer.write(dict(id=1, tokens=["x", "y"], ner_tags=[1, 0]))
writer.finalize()
ds2 = Dataset.load_from_disk(dataset_path="debug_ds1")
len(ds2)
Is there a cleaner/proper way to do this?
I like the sound of Dataset.from_iterable or Dataset.from_generator (should not from iterable be able to handle from generator too as all generators are iterables?) but how would I define the features for me examples there?
Hi @johann-petrak! You can pass the features directly to ArrowWriter's initializer like so ArrowWriter(..., features=features).
And the reason why I prefer Dataset.from_generator over Dataset.from_iterable is mentioned in one of my previous comments.
@mariosasko so at the moment we still have to create a fake Dataset first and then use ArrowWriter to write an actual dataset? I'm using the latest version of datasets on pypi but my final file is always empty. Is there anything wrong with the code below?
total = 0
with ArrowWriter(path=str(final_data_path), features=features) as writer:
for batch in loader:
for traj in batch:
for generator in question_generators:
for xi in generator(traj):
# print(f"Question: {xi.question}, answer: {xi.answer}")
total += 1
writer.write(
{
"id": f"qa_{total}",
"question": xi.question,
"answer": xi.answer,
}
)
writer.finalize()
print(f"Total #questions = {total}") # this prints 402
This works for me if I then (actually I also close the writer: writer.close()) open the Arrow file as a dataset using ds=Dataset.from_file(final_data_path) then ds.save_to_disk(somedir). The Dataset created that way contains the expected examples.
Oh thanks. That did the trick I believe. Shouldn't ArrowWriter have a context manager that does these operations?
You can just use Dataset.from_file to get your dataset, no need to do an extra save_to_disk somewhere else ;)
I was thinking that save_to_disk is necessary when one wants to re-use that dataset as a proper HF dataset later, no?
At least what I wanted to achieve is create a dataset that can be opened like any other local or remote dataset.
save_to_disk/load_from_disk is indeed more general, e.g. it supports datasets that consist in several files, and saves some extra info in a dataset_info.json file (description, citation, split sizes, etc.)
If you have one single file it's fine to simply do .from_file()