arrow icon indicating copy to clipboard operation
arrow copied to clipboard

[Python] Explode array column

Open asfimport opened this issue 4 years ago • 18 comments

In Apache Spark, explode separates the elements of an array column (or expression) into multiple row.

Note that each explode works at the top-level only (not recursively).

This would also work with the existing flatten method to allow fully unnesting a pyarrow.StructArray.

Reporter: Malthe Borch

Note: This issue was originally created as ARROW-12099. Please see the migration documentation for further details.

asfimport avatar Mar 26 '21 16:03 asfimport

Micah Kornfield / @emkornfield: Note there is a flatten on list array which sounds very similar to explode?

asfimport avatar Mar 26 '21 21:03 asfimport

Malthe Borch: @emkornfield that looks helpful but what's very useful about Spark's explode is that it works directly on the table level. I'm not sure how easy it would be to join this flattened list array back with the original table?

asfimport avatar Mar 26 '21 21:03 asfimport

Micah Kornfield / @emkornfield: OK, does explode do so recursively or only for top-level types.  It would be good to formally specify this. 

asfimport avatar Mar 26 '21 22:03 asfimport

Malthe Borch: @emkornfield added; there is no support for recursively exploding a structure. I think that is a reasonable position to take here as well.

asfimport avatar Mar 26 '21 22:03 asfimport

Joris Van den Bossche / @jorisvandenbossche: In the pyarrow.compute module, we also have list_parent_indices which can be used to join the flattened list column with repeated rows of the original table.

So it's already possible to write an explode function with this functionality in python:


import pyarrow.compute as pc

def explode_table(table, column):
    other_columns = list(table.schema.names)
    other_columns.remove(column)
    indices = pc.list_parent_indices(table[column])
    result = table.select(other_columns).take(indices)
    result = result.append_column(pa.field(column, table.schema.field(column).type.value_type), pc.list_flatten(table[column]))
    return result

In [80]: table = pa.table({'a': range(3), 'b': [[1, 2], None, [3, 4, 5]]})

In [81]: explode_table(table, 'b')
Out[81]: 
pyarrow.Table
a: int64
b: int64

In [82]: explode_table(table, 'b').to_pandas()
Out[82]: 
   a  b
0  0  1
1  0  2
2  2  3
3  2  4
4  2  5

That said, I think it could be nice to provide this functionality in pyarrow itself.

asfimport avatar Mar 29 '21 15:03 asfimport

Malthe Borch: @jorisvandenbossche in Spark, explode does not "zip" arrays in different columns actually – it just copies the entire row for each value in the exploded column (which is originally an array) such that if the array had N values, there would now be N rows in place of the original row. Rinse and repeat for all rows in the original dataframe.

asfimport avatar Mar 29 '21 16:03 asfimport

Joris Van den Bossche / @jorisvandenbossche: Your description to me sounds more or less what I showed. Can you eg show an example of input / expected result? (as you would expect it from spark)

asfimport avatar Mar 29 '21 16:03 asfimport

Malthe Borch:  

So Spark cannot actually explode (or "generate") more than one expression per select statement (that is simply not allowed), but I suppose sometimes you want to "zip" the arrays (like you have shown) and other times you would want to form the cartesian product from them:


spark-sql> SELECT a, explode(b) FROM (SELECT explode(sequence(0, 2)) a, sequence(4, 6) b);
0	4
0	5
0	6
1	4
1	5
1	6
2	4
2	5
2	6
Time taken: 0.187 seconds, Fetched 9 row(s)

In your explode_table function, what role does the column parameter have exactly? Why does it touch the 'a' column if you mention 'b' - ?

 

 

asfimport avatar Mar 29 '21 16:03 asfimport

Joris Van den Bossche / @jorisvandenbossche: Thanks. Could you also post what the FROM clause looks like? (SELECT explode(sequence(0, 2)) a, sequence(4, 6) b) (to be sure how the input data looks like)

asfimport avatar Mar 29 '21 16:03 asfimport

Malthe Borch: What I can perhaps add is that I imagine an API such as:


table.explode("a")      # This would unroll the "a" array, but leave any other array as it is.
table.explode("a, "b")  # This would unroll the provided columns, "zipping" them.

That is, the interface would work strictly with already existing array columns.

asfimport avatar Mar 29 '21 16:03 asfimport

Joris Van den Bossche / @jorisvandenbossche: [~malthe] can you show small example input/output of what you imagine? (as has been clear above, describing it in words doesn't seem sufficient to be sure we understand it the same way ;))

asfimport avatar Mar 31 '21 12:03 asfimport

Joris Van den Bossche / @jorisvandenbossche: I assume your example starts with a table like the following?


In [100]: table = pa.table({'a': [0, 1, 2], 'b': [[4, 5, 6]]*3})

In [101]: table.to_pandas()
Out[101]: 
   a          b
0  0  [4, 5, 6]
1  1  [4, 5, 6]
2  2  [4, 5, 6]

The function I wrote above to explode a list column in such a table gives:


In [102]: explode_table(table, 'b').to_pandas()
Out[102]: 
   a  b
0  0  4
1  0  5
2  0  6
3  1  4
4  1  5
5  1  6
6  2  4
7  2  5
8  2  6

which seems the same output as you showed above?

asfimport avatar Mar 31 '21 12:03 asfimport

Ian Cook / @ianmcook: +1 Hive also has an explode function that works like this, but it is very difficult to use at a table level—you need to use something called a lateral view to do that, and the API is very unintuitive.

@jorisvandenbossche  I think your example in the previous comment is exactly correct. It would be very nice to have an explode_table kernel like that in the Arrow C++ library, exposed to Python and R through bindings.

In addition to working on ListArrays like in this example, this should also work on MapArrays. When called on a MapArray, it should return two exploded columns—one with the keys, one with the values.

asfimport avatar Aug 26 '21 17:08 asfimport

Ian Cook / @ianmcook: I think for the initial implementation, we should limit this to explode only a single ListArray or MapArray column, like:

explode_table(table, 'b')

The case of exploding multiple columns, like

explode_table(table, 'b', 'c')

seems quite complicated and is probably best to consider separately.

asfimport avatar Aug 26 '21 17:08 asfimport

SHIMA Tatsuya / @eitsupi: It seems related to ARROW-8813. tidyr::unnest in R works very well, and it would be great to have this feature available in Arrow.

asfimport avatar Nov 22 '21 11:11 asfimport

Guido Muscioni: (sorry for continuing this old thread, I can open a new issue if necessary)

I was working on @jorisvandenbossche  implementation and noticed that it skips the null values. I slightly modified the code to handle missing values in a list:


def explode_table_include_null(table, column):
    other_columns = list(table.schema.names)
    other_columns.remove(column)
    indices = pc.list_parent_indices(pc.fill_null(table[column], [None]))
    result = table.select(other_columns).take(indices)
    result = result.append_column(
        pa.field(column, table.schema.field(column).type.value_type),
        pc.list_flatten(pc.fill_null(table[column], [None])),
    )
    return result

For example, given this table:


table = pa.table({'a': range(3), 'b': [[1,2], None, [1,2,3]]})

pandas in this case will leave the line with the None value:


In [14]: table.to_pandas().explode('b')
Out[14]: 
   a     b
0  0     1
0  0     2
1  1  None
2  2     1
2  2     2
2  2     3

@jorisvandenbossche implementation will not return the None row:


In [16]: explode_table(table, 'b').to_pandas()
Out[16]: 
   a  b
0  0  1
1  0  2
2  2  1
3  2  2
4  2  3

Using the function above:


In [18]: explode_table_include_null(table, 'b').to_pandas()
Out[18]: 
   a    b
0  0  1.0
1  0  2.0
2  1  NaN
3  2  1.0
4  2  2.0
5  2  3.0

I am not sure if filling the value with a blind empty list is correct, it seems to work well given multiple examples (I am referring to the requirements of the pc.fill_null function about typing):


In [19]: table = pa.table({'a': range(3), 'b': [['1','2'],None, ['1','2','3']]})

In [20]: explode_table_include_null(table, 'b').to_pandas()
Out[20]: 
   a     b
0  0     1
1  0     2
2  1  None
3  2     1
4  2     2
5  2     3

In [22]: table = pa.table({'a': range(3), 'b': [[{'a':1}],None, [{'a':1}]]})

In [23]: explode_table_include_null(table, 'b').to_pandas()
Out[23]: 
   a           b
0  0  {'a': 1.0}
1  1        None
2  2  {'a': 1.0}

I did a very small benchmarking of this function against pandas and the speedup is remarkable:


df = pd.DataFrame({'a':list(range(10000)),'b':[['a','b','c'] for i in range(10000)]})
table = pa.Table.from_pandas(df)

In [25]: %timeit explode_table_include_null(table, 'b')
131 µs ± 1.9 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
In [26]: %timeit df.explode('b')
3.78 ms ± 69.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

asfimport avatar May 23 '22 22:05 asfimport

Nick Crews: Small tweak to Guido's implementation (thank you for this!): If the table only has the one ListArray or MapArray column, then it crashes.

This handles that case:


import pyarrow as pa
import pyarrow.compute as pc
def explode_table(table, column):
    null_filled = pc.fill_null(table[column], [None])
    flattened = pc.list_flatten(null_filled)
    other_columns = list(table.schema.names)
    other_columns.remove(column)
    if len(other_columns) == 0:
        return pa.table({column: flattened})
    else:
        indices = pc.list_parent_indices(null_filled)
        result = table.select(other_columns).take(indices)
        result = result.append_column(
            pa.field(column, table.schema.field(column).type.value_type),
            flattened,
        )
        return result 

asfimport avatar Jun 17 '22 00:06 asfimport

+1 to implementing this feature in Arrow.

In the meantime, I appreciate the workaround. Here's a version that optionally retains information about offsets of values in the original lists:

def explode_table(table, column, list_ix_column: str=None):
    """
    :param table: Input table to explode
    :param column: Name of column (must be unique and list-valued) to explode
    :param list_ix_column:  name of new column in which to store offsets
             into the original lists; or ``None`` to skip generating that 
             column.  
    """
    null_filled = pc.fill_null(table[column], [None])
    flattened = pc.list_flatten(null_filled)
    other_columns = list(table.schema.names)
    other_columns.remove(column)
    if len(other_columns) == 0:
        result = pa.table({column: flattened})
    else:
        indices = pc.list_parent_indices(null_filled)
        result = table.select(other_columns).take(indices)
        result = result.append_column(
            pa.field(column, table.schema.field(column).type.value_type),
            flattened,
        )

    if list_ix_column is not None:
        # Generate indices into original lists, aligned with the unrolled
        # lists generated above.
        # We do this by mapping the list length to a categorical type, where
        # value k of the categorical type is a list from 0 to k.
        list_lengths = pc.list_value_length(table.column(column))
        max_list_len = pc.max(list_lengths).as_py()
        dictionary = [
            pa.scalar(list(range(l)), type=pa.list_(pa.int32()))
            for l in range(max_list_len + 1)
        ]
        id_lists = (
            pa.DictionaryArray.from_arrays(list_lengths, dictionary)
            .dictionary_decode())
        list_ix = pc.list_flatten(id_lists)

        # Add column to the left of the unrolled list elements.
        result = result.add_column(
            result.num_columns - 1,
            pa.field(list_ix_column, pa.int32()), list_ix
        )
        
    return result 

frreiss avatar Mar 11 '24 21:03 frreiss