dask-geopandas icon indicating copy to clipboard operation
dask-geopandas copied to clipboard

BUG: parquet IO fails if one or more partitions are empty

Open martinfleis opened this issue 3 years ago • 6 comments

In some cases, like after a spatial join, some of your partitions may end up being empty. If that happens, dask_geopandas raises an RuntimeError.

import geopandas
import dask_geopandas

import warnings

warnings.filterwarnings('ignore', message='.*initial implementation of Parquet.*')

world = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
cities = geopandas.read_file(geopandas.datasets.get_path("naturalearth_cities"))
africa = world.loc[world.continent == "Africa"]

ddf_cities = dask_geopandas.from_geopandas(cities, npartitions=12)

hilbert = ddf_cities.hilbert_distance()
ddf_cities = ddf_cities.set_index(hilbert, shuffle="tasks")

african_cities = dask_geopandas.sjoin(ddf_cities, africa)

african_cities.to_parquet("tst/")
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in _append_row_groups(metadata, md)
     51     try:
---> 52         metadata.append_row_groups(md)
     53     except RuntimeError as err:

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.FileMetaData.append_row_groups()

RuntimeError: AppendRowGroups requires equal schemas.

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
/var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_60072/2656726537.py in <module>
     17 african_cities = dask_geopandas.sjoin(ddf_cities, africa)
     18 
---> 19 african_cities.to_parquet("tst/")

~/Git/dask-geopandas/dask_geopandas/core.py in to_parquet(self, path, *args, **kwargs)
    584             index=data.index,
    585         )
--> 586 
    587     return df.map_partitions(
    588         func, x, y, z, meta=geopandas.GeoSeries(), token="points_from_xy"

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, name_function, **kwargs)
    772 
    773     if compute:
--> 774         return compute_as_if_collection(
    775             Scalar, graph, [(final_name, 0)], **compute_kwargs
    776         )

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    313     schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    314     dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 315     return schedule(dsk2, keys, **kwargs)
    316 
    317 

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     77             pool = MultiprocessingPoolExecutor(pool)
     78 
---> 79     results = get_async(
     80         pool.submit,
     81         pool._max_workers,

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    505                             _execute_task(task, data)  # Re-execute locally
    506                         else:
--> 507                             raise_exception(exc, tb)
    508                     res, worker_id = loads(res_info)
    509                     state["cache"][key] = res

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/local.py in reraise(exc, tb)
    313     if exc.__traceback__ is not tb:
    314         raise exc.with_traceback(tb)
--> 315     raise exc
    316 
    317 

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218     try:
    219         task, data = loads(task_info)
--> 220         result = _execute_task(task, data)
    221         id = get_id()
    222         result = dumps((result, id))

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         # temporaries by their reference count and can execute certain
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):
    121         return arg

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/utils.py in apply(func, args, kwargs)
     37 def apply(func, args, kwargs=None):
     38     if kwargs:
---> 39         return func(*args, **kwargs)
     40     else:
     41         return func(*args)

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in write_metadata(parts, fmd, fs, path, append, **kwargs)
    744                 i_start = 1
    745             for i in range(i_start, len(parts)):
--> 746                 _append_row_groups(_meta, parts[i][0]["meta"])
    747             with fs.open(metadata_path, "wb") as fil:
    748                 _meta.write_metadata_file(fil)

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in _append_row_groups(metadata, md)
     53     except RuntimeError as err:
     54         if "requires equal schemas" in str(err):
---> 55             raise RuntimeError(
     56                 "Schemas are inconsistent, try using "
     57                 '`to_parquet(..., schema="infer")`, or pass an explicit '

RuntimeError: Schemas are inconsistent, try using `to_parquet(..., schema="infer")`, or pass an explicit pyarrow schema. Such as `to_parquet(..., schema={"column1": pa.string()})`

However, it does save the file, seemingly correctly. When trying to read the parquet back, dask.dataframe reads it with no issues but dask_geopandas complains about missing bounds.

dask_geopandas.read_parquet("tst/")
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_60072/759563712.py in <module>
----> 1 african_cities.to_parquet("tst/fd.*.parquet")

AttributeError: 'tuple' object has no attribute 'to_parquet'
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_60072/2126585348.py in <module>
     15 african_cities = dask_geopandas.sjoin(ddf_cities, africa)
     16 
---> 17 with warnings.filterwarnings('ignore', message='.*initial implementation of Parquet.*') as w:
     18     african_cities.to_parquet("tst/")

AttributeError: __enter__
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_60072/3763839778.py in <module>
----> 1 dask_geopandas.read_parquet("tst/")

~/Git/dask-geopandas/dask_geopandas/io/parquet.py in read_parquet(*args, **kwargs)
    155 
    156 def read_parquet(*args, **kwargs):
--> 157     result = dd.read_parquet(*args, engine=GeoArrowEngine, **kwargs)
    158     # check if spatial partitioning information was stored
    159     spatial_partitions = result._meta.attrs.get("spatial_partitions", None)

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, ignore_metadata_file, metadata_task_size, split_row_groups, chunksize, aggregate_files, **kwargs)
    323         gather_statistics = True
    324 
--> 325     read_metadata_result = engine.read_metadata(
    326         fs,
    327         paths,

~/Git/dask-geopandas/dask_geopandas/io/parquet.py in read_metadata(cls, fs, paths, **kwargs)
     62         # get spatial partitions if available
     63         regions = geopandas.GeoSeries(
---> 64             [_get_partition_bounds(part, fs) for part in parts], crs=meta.crs
     65         )
     66         if regions.notna().all():

~/Git/dask-geopandas/dask_geopandas/io/parquet.py in <listcomp>(.0)
     62         # get spatial partitions if available
     63         regions = geopandas.GeoSeries(
---> 64             [_get_partition_bounds(part, fs) for part in parts], crs=meta.crs
     65         )
     66         if regions.notna().all():

~/Git/dask-geopandas/dask_geopandas/io/parquet.py in _get_partition_bounds(part, fs)
     52     if bbox is None:
     53         return None
---> 54     return shapely.geometry.box(*bbox)
     55 
     56 

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/shapely/geometry/geo.py in box(minx, miny, maxx, maxy, ccw)
     62     if not ccw:
     63         coords = coords[::-1]
---> 64     return Polygon(coords)
     65 
     66 

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/shapely/geometry/polygon.py in __init__(self, shell, holes)
    259 
    260         if shell is not None:
--> 261             ret = geos_polygon_from_py(shell, holes)
    262             if ret is not None:
    263                 geom, n = ret

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/shapely/geometry/polygon.py in geos_polygon_from_py(shell, holes)
    537 
    538     if shell is not None:
--> 539         ret = geos_linearring_from_py(shell)
    540         if ret is None:
    541             return None

~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/shapely/speedups/_speedups.pyx in shapely.speedups._speedups.geos_linearring_from_py()

ValueError: GEOSGeom_createLinearRing_r returned a NULL pointer

martinfleis avatar Jan 24 '22 14:01 martinfleis

However, it does save the file, seemingly correctly.

It's only failing when writing the common _metadata file at the end, so that's the reason it appears (and also does) write the actual data files.

jorisvandenbossche avatar Jan 28 '22 14:01 jorisvandenbossche

But I suppose we should simply skip writing those partitions? Because that will clutter the dataset with many small files, and you propagate the empty partitions to reading the dataset back in as well.

Although I see that also dask's to_parquet writes such empty partitions (and runs into the same error)

Sidenote: the error from the metadata specifically happens when having string columns, because for an empty column (which will be object dtype in pandas) pyarrow cannot infer that it are strings, giving inconsistent schemas for the different files. This is a typical gotcha with all-NA or empty object dtype columns(https://github.com/dask/dask/issues/6243), and for that reason dask added the schema keyword, as explained in the error message you get. Passing schema="infer" should infer it from the first non-empty partition.

jorisvandenbossche avatar Jan 28 '22 15:01 jorisvandenbossche

But I suppose we should simply skip writing those partitions?

That sounds like a very sensible solution. If that happens, I would maybe just warn about that so user is not surprised that the number of parquets does not equal number of partitions.

martinfleis avatar Jan 28 '22 15:01 martinfleis

Yeah, I am only now wondering if there is a reason that dask doesn't do this ..

jorisvandenbossche avatar Jan 28 '22 15:01 jorisvandenbossche

Passing schema="infer" should infer it from the first non-empty partition.

Ah, and that doesn't work because pyarrow cannot (yet) infer the type from a geometry dtype-column ..

jorisvandenbossche avatar Jan 28 '22 15:01 jorisvandenbossche

Ah, and that doesn't work because pyarrow cannot (yet) infer the type from a geometry dtype-column

Shall we then wait for a pyarrow update?

martinfleis avatar Feb 15 '22 12:02 martinfleis