datasets icon indicating copy to clipboard operation
datasets copied to clipboard

Add a GROUP BY operator

Open felix-schneider opened this issue 3 years ago • 11 comments

Is your feature request related to a problem? Please describe. Using batch mapping, we can easily split examples. However, we lack an appropriate option for merging them back together by some key. Consider this example:

# features:
# {
#    "example_id": datasets.Value("int32"),
#    "text": datasets.Value("string")
# }

ds = datasets.Dataset()


def split(examples):
    sentences = [text.split(".") for text in examples["text"]]
    return {
        "example_id": [
            example_id
            for example_id, sents in zip(examples["example_id"], sentences)
            for _ in sents
        ],
        "sentence": [sent for sents in sentences for sent in sents],
        "sentence_id": [i for sents in sentences for i in range(len(sents))],
    }


split_ds = ds.map(split, batched=True)


def process(examples):
    outputs = some_neural_network_that_works_on_sentences(examples["sentence"])
    return {"outputs": outputs}


split_ds = split_ds.map(process, batched=True)

I have a dataset consisting of texts that I would like to process sentence by sentence in a batched way. Afterwards, I would like to put it back together as it was, merging the outputs together.

Describe the solution you'd like Ideally, it would look something like this:

def join(examples):
    order = np.argsort(examples["sentence_id"])
    text = ".".join(examples["text"][i] for i in order)
    outputs = [examples["outputs"][i] for i in order]
    return {"text": text, "outputs": outputs}


ds = split_ds.group_by("example_id", join)

Describe alternatives you've considered Right now, we can do this:

def merge(example):
    meeting_id = example["example_id"]
    parts = split_ds.filter(lambda x: x["example_id"] == meeting_id).sort("segment_no")
    return {"outputs": list(parts["outputs"])}

ds = ds.map(merge)

Of course, we could process the dataset like this:

def process(example):
    outputs = some_neural_network_that_works_on_sentences(example["text"].split("."))
    return {"outputs": outputs}

ds = ds.map(process, batched=True)

However, that does not allow using an arbitrary batch size and may lead to very inefficient use of resources if the batch size is much larger than the number of sentences in one example.

I would very much appreciate some kind of group by operator to merge examples based on the value of one column.

felix-schneider avatar Jan 27 '22 16:01 felix-schneider

Hi ! At the moment you can use to_pandas() to get a pandas DataFrame that supports group_by operations (make sure your dataset fits in memory though)

We use Arrow as a back-end for datasets and it doesn't have native group by (see https://github.com/apache/arrow/issues/2189) unfortunately.

I just drafted what it could look like to have group_by in datasets:

from datasets import concatenate_datasets

def group_by(d, col, join): 
    """from: https://github.com/huggingface/datasets/issues/3644"""
    # Get the indices of each group
    groups = {key: [] for key in d.unique(col)} 
    def create_groups_indices(key, i): 
        groups[key].append(i) 
    d.map(create_groups_indices, with_indices=True, input_columns=col) 
    # Get one dataset object per group
    groups = {key: d.select(indices) for key, indices in groups.items()} 
    # Apply join function
    groups = {
        key: dataset_group.map(join, batched=True, batch_size=len(dataset_group), remove_columns=d.column_names)
        for key, dataset_group in groups.items()
    } 
    # Return concatenation of all the joined groups
    return concatenate_datasets(groups.values())

example of usage:


def join(batch): 
    # take the batch of all the examples of a group, and return a batch with one aggregated example
    # (we could aggregate examples into several rows instead of one, if you want)
    return {"total": [batch["i"]]} 

d = Dataset.from_dict({
    "i": [i for i in range(50)],
    "group_key": [i % 4 for i in range(50)],
})
print(group_by(d, "group_key", join))
#                                                total
# 0  [0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48]
# 1  [1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45, 49]
# 2     [2, 6, 10, 14, 18, 22, 26, 30, 34, 38, 42, 46]
# 3     [3, 7, 11, 15, 19, 23, 27, 31, 35, 39, 43, 47]

Let me know if that helps !

cc @albertvillanova @mariosasko for visibility

lhoestq avatar Feb 04 '22 14:02 lhoestq

@lhoestq As of PyArrow 7.0.0, pa.Table has the group_by method, so we should also consider using that function for grouping.

mariosasko avatar Feb 08 '22 15:02 mariosasko

Any update on this?

felix-schneider avatar Jun 24 '22 09:06 felix-schneider

You can use https://github.com/mariosasko/datasets_sql by @mariosasko to go group by operations using SQL queries

lhoestq avatar Jun 28 '22 17:06 lhoestq

Hi, I have a similar issue as OP but the suggested solutions do not work for my case. Basically, I process documents through a model to extract the last_hidden_state, using the "map" method on a Dataset object, but would like to average the result over a categorical column at the end (i.e. groupby this column).

  • A to_pandas() saturates the memory, although it gives me the desired result through a .groupby().apply(np.mean, axis=0) on a smaller use-case,
  • The solution posted on Feb 4 is much too slow,
  • datasets_sql seems to not like the fact that I'm averaging np.arrays. So I'm kinda out of "non brute force" options... Any help appreciated

jeremylhour avatar Jan 18 '23 17:01 jeremylhour

Hi, I have a similar issue as OP but the suggested solutions do not work for my case. Basically, I process documents through a model to extract the last_hidden_state, using the "map" method on a Dataset object, but would like to average the result over a categorical column at the end (i.e. groupby this column).

If you haven't yet, you could explore using Polars for this. It's a new DataFrame library written in Rust with Python bindings. It is Pandas like it in many ways ,but does have some biggish differences in syntax/approach so it's definitely not a drop-in replacement.

Polar's also uses Arrow as a backend but also supports out-of-memory operations; in this case, it's probably easiest to write out your dataset to parquet and then use the polar's scan_parquet method (this will lazily read from the parquet file). The thing you get back from that is a LazyDataFrame i.e. nothing is loaded into memory until you specify a query and call a collect method.

Example below of doing a groupby on a dataset which definitely wouldn't fit into memory on my machine:

from datasets import load_dataset
import polars as pl

ds = load_dataset("blbooks")
ds['train'].to_parquet("test.parquet")
df = pl.scan_parquet("test.parquet")
df.groupby('date').agg([pl.count()]).collect()

datasets_sql seems to not like the fact that I'm averaging np.arrays.

I am not certain how Polars will handle this either. It does have NumPy support (https://pola-rs.github.io/polars-book/user-guide/howcani/interop/numpy.html) but I assume Polars will need to have at least enough memory in each group you want to average over so you may still end up needing more memory depending on the size of your dataset/groups.

davanstrien avatar Jan 19 '23 09:01 davanstrien

Hi @davanstrien , thanks a lot, I didn't know about this library and the answer works! I need to try it on the full dataset now, but I'm hopeful. Here's what my code looks like:

list_size = 768
df.groupby("date").agg(
    pl.concat_list(
        [
            pl.col("hidden_state")
            .arr.slice(n, 1)
            .arr.first()
            .mean()
            for n in range(0, list_size)
        ]
    ).collect()

For some reasons, the following code was giving me a "mean() got unexpected argument 'axis'":

df2 = df.groupby('date').agg(
    pl.col("hidden_state").map(np.mean).alias("average_hidden_state")
).collect()

EDIT: The solution works on my large dataset, the memory does not crash and the time is reasonable, thanks a lot again!

jeremylhour avatar Jan 19 '23 12:01 jeremylhour

@jeremylhour glad this worked for you :)

davanstrien avatar Jan 19 '23 14:01 davanstrien

I find this functionality missing in my workflow as well and the workarounds with SQL and Polars unsatisfying. Since PyArrow has exposed this functionality, I hope this soon makes it into a release. (:

kuchenrolle avatar Mar 14 '23 14:03 kuchenrolle

Any update on this feature?

zudi-lin avatar Mar 13 '24 19:03 zudi-lin

We added a proper Polars integration at #3334 if it can help:

>>> from datasets import load_dataset
>>> ds = load_dataset("TheBritishLibrary/blbooks", "1700_1799", split="train")
>>> ds.to_polars().groupby('date').len()
┌─────────────────────┬──────┐
│ date                ┆ len  │
│ ---                 ┆ ---  │
│ datetime[ms]        ┆ u32  │
╞═════════════════════╪══════╡
│ 1796-01-01 00:00:00 ┆ 5831 │
│ 1775-01-01 00:00:00 ┆ 4697 │
│ 1749-01-01 00:00:00 ┆ 1118 │
│ 1740-01-01 00:00:00 ┆ 713  │
│ 1714-01-01 00:00:00 ┆ 865  │
│ …                   ┆ …    │
│ 1795-01-01 00:00:00 ┆ 5930 │
│ 1754-01-01 00:00:00 ┆ 1373 │
│ 1780-01-01 00:00:00 ┆ 1970 │
│ 1734-01-01 00:00:00 ┆ 1047 │
│ 1719-01-01 00:00:00 ┆ 1235 │
└─────────────────────┴──────┘

lhoestq avatar Mar 14 '24 13:03 lhoestq