[Python] Explode array column
In Apache Spark, explode separates the elements of an array column (or expression) into multiple row.
Note that each explode works at the top-level only (not recursively).
This would also work with the existing flatten method to allow fully unnesting a pyarrow.StructArray.
Reporter: Malthe Borch
Note: This issue was originally created as ARROW-12099. Please see the migration documentation for further details.
Micah Kornfield / @emkornfield: Note there is a flatten on list array which sounds very similar to explode?
Malthe Borch: @emkornfield that looks helpful but what's very useful about Spark's explode is that it works directly on the table level. I'm not sure how easy it would be to join this flattened list array back with the original table?
Micah Kornfield / @emkornfield: OK, does explode do so recursively or only for top-level types. It would be good to formally specify this.
Malthe Borch: @emkornfield added; there is no support for recursively exploding a structure. I think that is a reasonable position to take here as well.
Joris Van den Bossche / @jorisvandenbossche:
In the pyarrow.compute module, we also have list_parent_indices which can be used to join the flattened list column with repeated rows of the original table.
So it's already possible to write an explode function with this functionality in python:
import pyarrow.compute as pc
def explode_table(table, column):
other_columns = list(table.schema.names)
other_columns.remove(column)
indices = pc.list_parent_indices(table[column])
result = table.select(other_columns).take(indices)
result = result.append_column(pa.field(column, table.schema.field(column).type.value_type), pc.list_flatten(table[column]))
return result
In [80]: table = pa.table({'a': range(3), 'b': [[1, 2], None, [3, 4, 5]]})
In [81]: explode_table(table, 'b')
Out[81]:
pyarrow.Table
a: int64
b: int64
In [82]: explode_table(table, 'b').to_pandas()
Out[82]:
a b
0 0 1
1 0 2
2 2 3
3 2 4
4 2 5
That said, I think it could be nice to provide this functionality in pyarrow itself.
Malthe Borch: @jorisvandenbossche in Spark, explode does not "zip" arrays in different columns actually – it just copies the entire row for each value in the exploded column (which is originally an array) such that if the array had N values, there would now be N rows in place of the original row. Rinse and repeat for all rows in the original dataframe.
Joris Van den Bossche / @jorisvandenbossche: Your description to me sounds more or less what I showed. Can you eg show an example of input / expected result? (as you would expect it from spark)
So Spark cannot actually explode (or "generate") more than one expression per select statement (that is simply not allowed), but I suppose sometimes you want to "zip" the arrays (like you have shown) and other times you would want to form the cartesian product from them:
spark-sql> SELECT a, explode(b) FROM (SELECT explode(sequence(0, 2)) a, sequence(4, 6) b);
0 4
0 5
0 6
1 4
1 5
1 6
2 4
2 5
2 6
Time taken: 0.187 seconds, Fetched 9 row(s)
In your explode_table function, what role does the column parameter have exactly? Why does it touch the 'a' column if you mention 'b' - ?
Joris Van den Bossche / @jorisvandenbossche:
Thanks. Could you also post what the FROM clause looks like? (SELECT explode(sequence(0, 2)) a, sequence(4, 6) b) (to be sure how the input data looks like)
Malthe Borch: What I can perhaps add is that I imagine an API such as:
table.explode("a") # This would unroll the "a" array, but leave any other array as it is.
table.explode("a, "b") # This would unroll the provided columns, "zipping" them.
That is, the interface would work strictly with already existing array columns.
Joris Van den Bossche / @jorisvandenbossche:
[~malthe] can you show small example input/output of what you imagine? (as has been clear above, describing it in words doesn't seem sufficient to be sure we understand it the same way ;))
Joris Van den Bossche / @jorisvandenbossche: I assume your example starts with a table like the following?
In [100]: table = pa.table({'a': [0, 1, 2], 'b': [[4, 5, 6]]*3})
In [101]: table.to_pandas()
Out[101]:
a b
0 0 [4, 5, 6]
1 1 [4, 5, 6]
2 2 [4, 5, 6]
The function I wrote above to explode a list column in such a table gives:
In [102]: explode_table(table, 'b').to_pandas()
Out[102]:
a b
0 0 4
1 0 5
2 0 6
3 1 4
4 1 5
5 1 6
6 2 4
7 2 5
8 2 6
which seems the same output as you showed above?
Ian Cook / @ianmcook:
+1 Hive also has an explode function that works like this, but it is very difficult to use at a table level—you need to use something called a lateral view to do that, and the API is very unintuitive.
@jorisvandenbossche I think your example in the previous comment is exactly correct. It would be very nice to have an explode_table kernel like that in the Arrow C++ library, exposed to Python and R through bindings.
In addition to working on ListArrays like in this example, this should also work on MapArrays. When called on a MapArray, it should return two exploded columns—one with the keys, one with the values.
Ian Cook / @ianmcook: I think for the initial implementation, we should limit this to explode only a single ListArray or MapArray column, like:
explode_table(table, 'b')
The case of exploding multiple columns, like
explode_table(table, 'b', 'c')
seems quite complicated and is probably best to consider separately.
SHIMA Tatsuya / @eitsupi: It seems related to ARROW-8813. tidyr::unnest in R works very well, and it would be great to have this feature available in Arrow.
Guido Muscioni: (sorry for continuing this old thread, I can open a new issue if necessary)
I was working on @jorisvandenbossche implementation and noticed that it skips the null values. I slightly modified the code to handle missing values in a list:
def explode_table_include_null(table, column):
other_columns = list(table.schema.names)
other_columns.remove(column)
indices = pc.list_parent_indices(pc.fill_null(table[column], [None]))
result = table.select(other_columns).take(indices)
result = result.append_column(
pa.field(column, table.schema.field(column).type.value_type),
pc.list_flatten(pc.fill_null(table[column], [None])),
)
return result
For example, given this table:
table = pa.table({'a': range(3), 'b': [[1,2], None, [1,2,3]]})
pandas in this case will leave the line with the None value:
In [14]: table.to_pandas().explode('b')
Out[14]:
a b
0 0 1
0 0 2
1 1 None
2 2 1
2 2 2
2 2 3
@jorisvandenbossche implementation will not return the None row:
In [16]: explode_table(table, 'b').to_pandas()
Out[16]:
a b
0 0 1
1 0 2
2 2 1
3 2 2
4 2 3
Using the function above:
In [18]: explode_table_include_null(table, 'b').to_pandas()
Out[18]:
a b
0 0 1.0
1 0 2.0
2 1 NaN
3 2 1.0
4 2 2.0
5 2 3.0
I am not sure if filling the value with a blind empty list is correct, it seems to work well given multiple examples (I am referring to the requirements of the pc.fill_null function about typing):
In [19]: table = pa.table({'a': range(3), 'b': [['1','2'],None, ['1','2','3']]})
In [20]: explode_table_include_null(table, 'b').to_pandas()
Out[20]:
a b
0 0 1
1 0 2
2 1 None
3 2 1
4 2 2
5 2 3
In [22]: table = pa.table({'a': range(3), 'b': [[{'a':1}],None, [{'a':1}]]})
In [23]: explode_table_include_null(table, 'b').to_pandas()
Out[23]:
a b
0 0 {'a': 1.0}
1 1 None
2 2 {'a': 1.0}
I did a very small benchmarking of this function against pandas and the speedup is remarkable:
df = pd.DataFrame({'a':list(range(10000)),'b':[['a','b','c'] for i in range(10000)]})
table = pa.Table.from_pandas(df)
In [25]: %timeit explode_table_include_null(table, 'b')
131 µs ± 1.9 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
In [26]: %timeit df.explode('b')
3.78 ms ± 69.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Nick Crews: Small tweak to Guido's implementation (thank you for this!): If the table only has the one ListArray or MapArray column, then it crashes.
This handles that case:
import pyarrow as pa
import pyarrow.compute as pc
def explode_table(table, column):
null_filled = pc.fill_null(table[column], [None])
flattened = pc.list_flatten(null_filled)
other_columns = list(table.schema.names)
other_columns.remove(column)
if len(other_columns) == 0:
return pa.table({column: flattened})
else:
indices = pc.list_parent_indices(null_filled)
result = table.select(other_columns).take(indices)
result = result.append_column(
pa.field(column, table.schema.field(column).type.value_type),
flattened,
)
return result
+1 to implementing this feature in Arrow.
In the meantime, I appreciate the workaround. Here's a version that optionally retains information about offsets of values in the original lists:
def explode_table(table, column, list_ix_column: str=None):
"""
:param table: Input table to explode
:param column: Name of column (must be unique and list-valued) to explode
:param list_ix_column: name of new column in which to store offsets
into the original lists; or ``None`` to skip generating that
column.
"""
null_filled = pc.fill_null(table[column], [None])
flattened = pc.list_flatten(null_filled)
other_columns = list(table.schema.names)
other_columns.remove(column)
if len(other_columns) == 0:
result = pa.table({column: flattened})
else:
indices = pc.list_parent_indices(null_filled)
result = table.select(other_columns).take(indices)
result = result.append_column(
pa.field(column, table.schema.field(column).type.value_type),
flattened,
)
if list_ix_column is not None:
# Generate indices into original lists, aligned with the unrolled
# lists generated above.
# We do this by mapping the list length to a categorical type, where
# value k of the categorical type is a list from 0 to k.
list_lengths = pc.list_value_length(table.column(column))
max_list_len = pc.max(list_lengths).as_py()
dictionary = [
pa.scalar(list(range(l)), type=pa.list_(pa.int32()))
for l in range(max_list_len + 1)
]
id_lists = (
pa.DictionaryArray.from_arrays(list_lengths, dictionary)
.dictionary_decode())
list_ix = pc.list_flatten(id_lists)
# Add column to the left of the unrolled list elements.
result = result.add_column(
result.num_columns - 1,
pa.field(list_ix_column, pa.int32()), list_ix
)
return result