datasets icon indicating copy to clipboard operation
datasets copied to clipboard

how to convert a dict generator into a huggingface dataset.

Open StephennFernandes opened this issue 3 years ago • 18 comments

Link

No response

Description

Hey there, I have used seqio to get a well distributed mixture of samples from multiple dataset. However the resultant output from seqio is a python generator dict, which I cannot produce back into huggingface dataset.

The generator contains all the samples needed for training the model but I cannot convert it into a huggingface dataset.

The code looks like this:

for ex in seqio_data:
print(ex[“text”])

I need to convert the seqio_data (generator) into huggingface dataset.

the complete seqio code goes here:

import functools

import seqio
import tensorflow as tf
import t5.data
from datasets import load_dataset
from t5.data import postprocessors
from t5.data import preprocessors
from t5.evaluation import metrics
from seqio import FunctionDataSource, utils

TaskRegistry = seqio.TaskRegistry



def gen_dataset(split, shuffle=False, seed=None, column="text", dataset_params=None):
    dataset = load_dataset(**dataset_params)
    if shuffle:
        if seed:
            dataset = dataset.shuffle(seed=seed)
        else:
            dataset = dataset.shuffle()
    while True:
        for item in dataset[str(split)]:
            yield item[column]


def dataset_fn(split, shuffle_files, seed=None, dataset_params=None):
    return tf.data.Dataset.from_generator(
        functools.partial(gen_dataset, split, shuffle_files, seed, dataset_params=dataset_params),
        output_signature=tf.TensorSpec(shape=(), dtype=tf.string, name=dataset_name)
    )


@utils.map_over_dataset
def target_to_key(x, key_map, target_key):
    """Assign the value from the dataset to target_key in key_map"""
    return {**key_map, target_key: x}



dataset_name = 'oscar-corpus/OSCAR-2109'
subset= 'mr'
dataset_params = {"path": dataset_name, "language":subset, "use_auth_token":True}
dataset_shapes = None

TaskRegistry.add(
    "oscar_marathi_corpus",
    source=seqio.FunctionDataSource(
        dataset_fn=functools.partial(dataset_fn, dataset_params=dataset_params),
        splits=("train", "validation"),
        caching_permitted=False,
        num_input_examples=dataset_shapes,
    ),
preprocessors=[
functools.partial(
target_to_key, key_map={
"targets": None,
}, target_key="targets")],
    output_features={"targets": seqio.Feature(vocabulary=seqio.PassThroughVocabulary, add_eos=False, dtype=tf.string, rank=0)},
    metric_fns=[]
)

dataset = seqio.get_mixture_or_task("oscar_marathi_corpus").get_dataset(
    sequence_length=None,
    split="train",
    shuffle=True,
    num_epochs=1,
    shard_info=seqio.ShardInfo(index=0, num_shards=10),
    use_cached=False,
    seed=42
)
for _, ex in zip(range(5), dataset):
     print(ex['targets'].numpy().decode())

Owner

No response

StephennFernandes avatar May 29 '22 16:05 StephennFernandes

@albertvillanova @lhoestq , could you please help me on this issue.

StephennFernandes avatar May 29 '22 16:05 StephennFernandes

Hi ! As mentioned on the forum, the simplest for now would be to define a dataset script which can contain your generator. But we can also explore adding something like ds = Dataset.from_iterable(seqio_dataset)

lhoestq avatar May 30 '22 14:05 lhoestq

@lhoestq , hey i did as you instructed, but sadly i cannot get pass through the download_manager, as i dont have anything to download. i was skipping the def _split_generators(self, dl_manager): function. but i cannot get around it. I get a NotImplementedError:

the following is my code for the same:

import datasets 
import functools
import glob 
from datasets import load_from_disk
import seqio
import tensorflow as tf
import t5.data
from datasets import load_dataset
from t5.data import postprocessors
from t5.data import preprocessors
from t5.evaluation import metrics
from seqio import FunctionDataSource, utils

TaskRegistry = seqio.TaskRegistry

data_path = glob.glob("/home/stephen/Desktop/MEGA_CORPUS/COMBINED_CORPUS/*", recursive=False)


def gen_dataset(split, shuffle=False, seed=None, column="text", dataset_path=None):
    dataset = load_from_disk(dataset_path)
    if shuffle:
        if seed:
            dataset = dataset.shuffle(seed=seed)
        else:
            dataset = dataset.shuffle()
    while True:
        for item in dataset[str(split)]:
            yield item[column]


def dataset_fn(split, shuffle_files, seed=None, dataset_path=None):
    return tf.data.Dataset.from_generator(
        functools.partial(gen_dataset, split, shuffle_files, seed, dataset_path=dataset_path),
        output_signature=tf.TensorSpec(shape=(), dtype=tf.string, name=dataset_path)
    )

@utils.map_over_dataset
def target_to_key(x, key_map, target_key):
    """Assign the value from the dataset to target_key in key_map"""
    return {**key_map, target_key: x}


_CITATION = "Not ready yet"
_DESCRIPTION = "a custom seqio based mixed samples on a given temperature value, that again returns a dataset in HF dataset format well samples on the Mixture temperature"
_HOMEPAGE = "ldcil.org"

class CustomSeqio(datasets.GeneratorBasedBuilder):

    def _info(self):
        return datasets.DatasetInfo(
            description=_DESCRIPTION,
            features=datasets.Features(
                {
                    "text": datasets.Value("string"),
                }
            ),
            homepage="https://ldcil.org",
            citation=_CITATION,)

def generate_examples(self):
    seqio_train_list = []
    for lang in data_path:
        dataset_name = lang.split("/")[-1]
        dataset_shapes = None 

        TaskRegistry.add(
        str(dataset_name),
        source=seqio.FunctionDataSource(
            dataset_fn=functools.partial(dataset_fn, dataset_path=lang),
            splits=("train", "test"),
            caching_permitted=False,
            num_input_examples=dataset_shapes,
        ),
        preprocessors=[
        functools.partial(
        target_to_key, key_map={
        "targets": None,
        }, target_key="targets")],
            output_features={"targets": seqio.Feature(vocabulary=seqio.PassThroughVocabulary, add_eos=False, dtype=tf.string, rank=0)},
            metric_fns=[]
        )

        seqio_train_dataset = seqio.get_mixture_or_task(dataset_name).get_dataset(
        sequence_length=None,
        split="train",
        shuffle=True,
        num_epochs=1,
        shard_info=seqio.ShardInfo(index=0, num_shards=10),
        use_cached=False,
        seed=42)
        seqio_train_list.append(seqio_train_dataset)
    
    lang_name_list = []
    for lang in data_path:
        lang_name = lang.split("/")[-1]
        lang_name_list.append(lang_name)

    seqio_mixture = seqio.MixtureRegistry.add(
        "seqio_mixture",
        lang_name_list,
        default_rate=0.7)
    
    seqio_mixture_dataset = seqio.get_mixture_or_task("seqio_mixture").get_dataset(
    sequence_length=None,
    split="train",
    shuffle=True,
    num_epochs=1,
    shard_info=seqio.ShardInfo(index=0, num_shards=10),
    use_cached=False,
    seed=42)

    for id, ex in  enumerate(seqio_mixture_dataset):
        yield id, {"text": ex["targets"].numpy().decode()}

and i load it by:

seqio_mixture = load_dataset("seqio_loader")

StephennFernandes avatar Jun 04 '22 14:06 StephennFernandes

@lhoestq , just to make things clear ...

the following is my original code, thats not in the HF dataset loading script:

import functools
import seqio
import tensorflow as tf
import t5.data
from datasets import load_from_disk
from t5.data import postprocessors
from t5.data import preprocessors
from t5.evaluation import metrics
from seqio import FunctionDataSource, utils
import glob 

TaskRegistry = seqio.TaskRegistry



def gen_dataset(split, shuffle=False, seed=None, column="text", dataset_path=None):
    dataset = load_from_disk(dataset_path)
    if shuffle:
        if seed:
            dataset = dataset.shuffle(seed=seed)
        else:
            dataset = dataset.shuffle()
    while True:
        for item in dataset[str(split)]:
            yield item[column]


def dataset_fn(split, shuffle_files, seed=None, dataset_path=None):
    return tf.data.Dataset.from_generator(
        functools.partial(gen_dataset, split, shuffle_files, seed, dataset_path=dataset_path),
        output_signature=tf.TensorSpec(shape=(), dtype=tf.string, name=dataset_path)
    )


@utils.map_over_dataset
def target_to_key(x, key_map, target_key):
    """Assign the value from the dataset to target_key in key_map"""
    return {**key_map, target_key: x}

data_path = glob.glob("/home/stephen/Desktop/MEGA_CORPUS/COMBINED_CORPUS/*", recursive=False)

seqio_train_list = []

for lang in data_path:
    dataset_name = lang.split("/")[-1]
    dataset_shapes = None 

    TaskRegistry.add(
    str(dataset_name),
    source=seqio.FunctionDataSource(
        dataset_fn=functools.partial(dataset_fn, dataset_path=lang),
        splits=("train", "test"),
        caching_permitted=False,
        num_input_examples=dataset_shapes,
    ),
    preprocessors=[
    functools.partial(
    target_to_key, key_map={
    "targets": None,
    }, target_key="targets")],
        output_features={"targets": seqio.Feature(vocabulary=seqio.PassThroughVocabulary, add_eos=False, dtype=tf.string, rank=0)},
        metric_fns=[]
    )

    seqio_train_dataset = seqio.get_mixture_or_task(dataset_name).get_dataset(
    sequence_length=None,
    split="train",
    shuffle=True,
    num_epochs=1,
    shard_info=seqio.ShardInfo(index=0, num_shards=10),
    use_cached=False,
    seed=42)
    seqio_train_list.append(seqio_train_dataset)

lang_name_list = []
for lang in data_path:
    lang_name = lang.split("/")[-1]
    lang_name_list.append(lang_name)

seqio_mixture = seqio.MixtureRegistry.add(
  "seqio_mixture",
  lang_name_list,
  default_rate=0.7
)

seqio_mixture_dataset = seqio.get_mixture_or_task("seqio_mixture").get_dataset(
    sequence_length=None,
    split="train",
    shuffle=True,
    num_epochs=1,
    shard_info=seqio.ShardInfo(index=0, num_shards=10),
    use_cached=False,
    seed=42)

for _, ex in zip(range(15), seqio_mixture_dataset):
    print(ex["targets"].numpy().decode())

where the seqio_mixture_dataset is the generator that i wanted to be wrapped in HF dataset.

also additionally, could you please tell me how do i set the default_rate=0.7 args where seqio_mixture is defined to be made as a custom option in the HF load_dataset() method,

maybe like this: seqio_mixture_dataset = datasets.load_dataset("seqio_loader",temperature=0.5)

StephennFernandes avatar Jun 04 '22 14:06 StephennFernandes

I like the idea of having Dataset.from_iterable(iterable) in the API. The only problem is that we also want to make this part cachable, which is tricky if iterable is a generator.

Some resources on this issue:

  • https://github.com/uqfoundation/dill/issues/311
  • https://stackoverflow.com/questions/7180212/why-cant-generators-be-pickled
  • https://github.com/tonyroberts/generator_tools - python package for pickling generators; pickles bytecode, so it creates version-specific dumps

mariosasko avatar Jun 05 '22 16:06 mariosasko

For the caching maybe we can have Dataset.from_generator as TF and pickle+hash the generator function (not the generator object itself) ?

And then keep Dataset.from_iterable fo pickable objects like lists

lhoestq avatar Jun 06 '22 15:06 lhoestq

@lhoestq, @mariosasko do you too have any examples where the dataset is a generator and needs to be wrapped into hf dataset ?

StephennFernandes avatar Jun 06 '22 16:06 StephennFernandes

@lhoestq, following to my previous question ... what possibly could be done in this link1 link2 case? do you have any ideas?

StephennFernandes avatar Jun 06 '22 16:06 StephennFernandes

@lhoestq +1 for the Dataset.from_generator idea.

Having thought about it, let's avoid adding Dataset.from_iterable to the API since dictionaries are technically iteralbles ("iterable" is a broad term in Python), and we already provide Dataset.from_dict. And for lists maybe we can add Dataset.from_list similar to pa.Table.from_pylist. WDYT?

mariosasko avatar Jun 06 '22 16:06 mariosasko

Hi @StephennFernandes!

To fix the issues in the copied code, rename generate_examples to _generate_examples and add one level of indentation as this is a method of GeneratorBasedBuilder and define _split_generators as follows (again as a method of `GeneratorBasedBuilder):

 def _split_generators(self, dl_manager):
        return [
            datasets.SplitGenerator(
                name=datasets.Split.TRAIN,
                gen_kwargs={},
            ),
        ]

And if you are feeling extra adventurous, you can try to use ArrowWriter to directly create a cache file:

from datasets import Dataset
from datasets.arrow_writer import ArrowWriter

writer = ArrowWriter(path="path/to/cache_file.arrow", writer_batch_size=1000)

with writer:
    for ex in generator:
        writer.write(ex) 
    writer.finalize()

dset = Dataset.from_file("path/to/cache_file.arrow")

mariosasko avatar Jun 06 '22 16:06 mariosasko

I have a problem which I think is very similar: I would like to "stream" data to a HF Array (memory-mapped) Dataset, where the final size of the dataset is unknown, but could be much larger than what fits into memory. What I want to end up with is an Array Dataset which I can open using Dataset.load_from_disk(dataset_path="somename") and use e.g. as the training set.

For this I would have thought there should be an API which allows me to open/create the dataset (and define the features etc), then write examples to the dataset, but I could not find a way to do this.

I tried doing this and it looks like it works, but it feels very hacky and I am not sure if this might fail to update some of the fields in the json files which may turn out to be important:

from datasets import Dataset, Features, ClassLabel, Sequence, Value
from datasets.arrow_writer import ArrowWriter                                                                                                                                                   
# 1) define the features
features = Features(dict(
    id=Value(dtype="string"),
    tokens=Sequence(feature=Value(dtype="string")),
    ner_tags=Sequence(feature=ClassLabel(names=['O', 'B-corporation', 'I-corporation', 'B-creative-work', 'I-creative-work', 'B-group', 'I-group', 'B-location', 'I-location', 'B-person', 'I-person', 'B-product', 'I-product'])),
))
# 2) create empty dataset for examples with these features and store to disk
empty = dict(
    id = [],
    tokens = [],
    ner_tags = [],
)
ds = Dataset.from_dict(empty, features=features)
ds.save_to_disk(dataset_path="debug_ds1")

# 3) directly write all the examples to the arrow dataset 
with ArrowWriter(path="debug_ds1/dataset.arrow") as writer: 
    writer.write(dict(id=0, tokens=["a", "b"], ner_tags=[0, 0])) 
    writer.write(dict(id=1, tokens=["x", "y"], ner_tags=[1, 0])) 
    writer.finalize() 
 
ds2 = Dataset.load_from_disk(dataset_path="debug_ds1")
len(ds2)

Is there a cleaner/proper way to do this?

I like the sound of Dataset.from_iterable or Dataset.from_generator (should not from iterable be able to handle from generator too as all generators are iterables?) but how would I define the features for me examples there?

johann-petrak avatar Jun 29 '22 10:06 johann-petrak

Hi @johann-petrak! You can pass the features directly to ArrowWriter's initializer like so ArrowWriter(..., features=features).

And the reason why I prefer Dataset.from_generator over Dataset.from_iterable is mentioned in one of my previous comments.

mariosasko avatar Jul 05 '22 12:07 mariosasko

@mariosasko so at the moment we still have to create a fake Dataset first and then use ArrowWriter to write an actual dataset? I'm using the latest version of datasets on pypi but my final file is always empty. Is there anything wrong with the code below?

    total = 0
    with ArrowWriter(path=str(final_data_path), features=features) as writer:
        for batch in loader:
            for traj in batch:
                for generator in question_generators:
                    for xi in generator(traj):
                        # print(f"Question: {xi.question}, answer: {xi.answer}")
                        total += 1
                        writer.write(
                            {
                                "id": f"qa_{total}",
                                "question": xi.question,
                                "answer": xi.answer,
                            }
                        )
        writer.finalize()
    print(f"Total #questions = {total}") # this prints 402

aleSuglia avatar Jul 14 '22 18:07 aleSuglia

This works for me if I then (actually I also close the writer: writer.close()) open the Arrow file as a dataset using ds=Dataset.from_file(final_data_path) then ds.save_to_disk(somedir). The Dataset created that way contains the expected examples.

johann-petrak avatar Jul 14 '22 18:07 johann-petrak

Oh thanks. That did the trick I believe. Shouldn't ArrowWriter have a context manager that does these operations?

aleSuglia avatar Jul 15 '22 07:07 aleSuglia

You can just use Dataset.from_file to get your dataset, no need to do an extra save_to_disk somewhere else ;)

lhoestq avatar Jul 18 '22 09:07 lhoestq

I was thinking that save_to_disk is necessary when one wants to re-use that dataset as a proper HF dataset later, no? At least what I wanted to achieve is create a dataset that can be opened like any other local or remote dataset.

johann-petrak avatar Jul 18 '22 12:07 johann-petrak

save_to_disk/load_from_disk is indeed more general, e.g. it supports datasets that consist in several files, and saves some extra info in a dataset_info.json file (description, citation, split sizes, etc.)

If you have one single file it's fine to simply do .from_file()

lhoestq avatar Jul 18 '22 12:07 lhoestq