dask-geopandas
dask-geopandas copied to clipboard
BUG: parquet IO fails if one or more partitions are empty
In some cases, like after a spatial join, some of your partitions may end up being empty. If that happens, dask_geopandas raises an RuntimeError.
import geopandas
import dask_geopandas
import warnings
warnings.filterwarnings('ignore', message='.*initial implementation of Parquet.*')
world = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
cities = geopandas.read_file(geopandas.datasets.get_path("naturalearth_cities"))
africa = world.loc[world.continent == "Africa"]
ddf_cities = dask_geopandas.from_geopandas(cities, npartitions=12)
hilbert = ddf_cities.hilbert_distance()
ddf_cities = ddf_cities.set_index(hilbert, shuffle="tasks")
african_cities = dask_geopandas.sjoin(ddf_cities, africa)
african_cities.to_parquet("tst/")
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in _append_row_groups(metadata, md)
51 try:
---> 52 metadata.append_row_groups(md)
53 except RuntimeError as err:
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.FileMetaData.append_row_groups()
RuntimeError: AppendRowGroups requires equal schemas.
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
/var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_60072/2656726537.py in <module>
17 african_cities = dask_geopandas.sjoin(ddf_cities, africa)
18
---> 19 african_cities.to_parquet("tst/")
~/Git/dask-geopandas/dask_geopandas/core.py in to_parquet(self, path, *args, **kwargs)
584 index=data.index,
585 )
--> 586
587 return df.map_partitions(
588 func, x, y, z, meta=geopandas.GeoSeries(), token="points_from_xy"
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, name_function, **kwargs)
772
773 if compute:
--> 774 return compute_as_if_collection(
775 Scalar, graph, [(final_name, 0)], **compute_kwargs
776 )
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
313 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
314 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 315 return schedule(dsk2, keys, **kwargs)
316
317
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
77 pool = MultiprocessingPoolExecutor(pool)
78
---> 79 results = get_async(
80 pool.submit,
81 pool._max_workers,
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
505 _execute_task(task, data) # Re-execute locally
506 else:
--> 507 raise_exception(exc, tb)
508 res, worker_id = loads(res_info)
509 state["cache"][key] = res
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/local.py in reraise(exc, tb)
313 if exc.__traceback__ is not tb:
314 raise exc.with_traceback(tb)
--> 315 raise exc
316
317
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
218 try:
219 task, data = loads(task_info)
--> 220 result = _execute_task(task, data)
221 id = get_id()
222 result = dumps((result, id))
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/utils.py in apply(func, args, kwargs)
37 def apply(func, args, kwargs=None):
38 if kwargs:
---> 39 return func(*args, **kwargs)
40 else:
41 return func(*args)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in write_metadata(parts, fmd, fs, path, append, **kwargs)
744 i_start = 1
745 for i in range(i_start, len(parts)):
--> 746 _append_row_groups(_meta, parts[i][0]["meta"])
747 with fs.open(metadata_path, "wb") as fil:
748 _meta.write_metadata_file(fil)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in _append_row_groups(metadata, md)
53 except RuntimeError as err:
54 if "requires equal schemas" in str(err):
---> 55 raise RuntimeError(
56 "Schemas are inconsistent, try using "
57 '`to_parquet(..., schema="infer")`, or pass an explicit '
RuntimeError: Schemas are inconsistent, try using `to_parquet(..., schema="infer")`, or pass an explicit pyarrow schema. Such as `to_parquet(..., schema={"column1": pa.string()})`
However, it does save the file, seemingly correctly. When trying to read the parquet back, dask.dataframe
reads it with no issues but dask_geopandas complains about missing bounds.
dask_geopandas.read_parquet("tst/")
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_60072/759563712.py in <module>
----> 1 african_cities.to_parquet("tst/fd.*.parquet")
AttributeError: 'tuple' object has no attribute 'to_parquet'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_60072/2126585348.py in <module>
15 african_cities = dask_geopandas.sjoin(ddf_cities, africa)
16
---> 17 with warnings.filterwarnings('ignore', message='.*initial implementation of Parquet.*') as w:
18 african_cities.to_parquet("tst/")
AttributeError: __enter__
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_60072/3763839778.py in <module>
----> 1 dask_geopandas.read_parquet("tst/")
~/Git/dask-geopandas/dask_geopandas/io/parquet.py in read_parquet(*args, **kwargs)
155
156 def read_parquet(*args, **kwargs):
--> 157 result = dd.read_parquet(*args, engine=GeoArrowEngine, **kwargs)
158 # check if spatial partitioning information was stored
159 spatial_partitions = result._meta.attrs.get("spatial_partitions", None)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, ignore_metadata_file, metadata_task_size, split_row_groups, chunksize, aggregate_files, **kwargs)
323 gather_statistics = True
324
--> 325 read_metadata_result = engine.read_metadata(
326 fs,
327 paths,
~/Git/dask-geopandas/dask_geopandas/io/parquet.py in read_metadata(cls, fs, paths, **kwargs)
62 # get spatial partitions if available
63 regions = geopandas.GeoSeries(
---> 64 [_get_partition_bounds(part, fs) for part in parts], crs=meta.crs
65 )
66 if regions.notna().all():
~/Git/dask-geopandas/dask_geopandas/io/parquet.py in <listcomp>(.0)
62 # get spatial partitions if available
63 regions = geopandas.GeoSeries(
---> 64 [_get_partition_bounds(part, fs) for part in parts], crs=meta.crs
65 )
66 if regions.notna().all():
~/Git/dask-geopandas/dask_geopandas/io/parquet.py in _get_partition_bounds(part, fs)
52 if bbox is None:
53 return None
---> 54 return shapely.geometry.box(*bbox)
55
56
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/shapely/geometry/geo.py in box(minx, miny, maxx, maxy, ccw)
62 if not ccw:
63 coords = coords[::-1]
---> 64 return Polygon(coords)
65
66
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/shapely/geometry/polygon.py in __init__(self, shell, holes)
259
260 if shell is not None:
--> 261 ret = geos_polygon_from_py(shell, holes)
262 if ret is not None:
263 geom, n = ret
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/shapely/geometry/polygon.py in geos_polygon_from_py(shell, holes)
537
538 if shell is not None:
--> 539 ret = geos_linearring_from_py(shell)
540 if ret is None:
541 return None
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/shapely/speedups/_speedups.pyx in shapely.speedups._speedups.geos_linearring_from_py()
ValueError: GEOSGeom_createLinearRing_r returned a NULL pointer
However, it does save the file, seemingly correctly.
It's only failing when writing the common _metadata
file at the end, so that's the reason it appears (and also does) write the actual data files.
But I suppose we should simply skip writing those partitions? Because that will clutter the dataset with many small files, and you propagate the empty partitions to reading the dataset back in as well.
Although I see that also dask's to_parquet
writes such empty partitions (and runs into the same error)
Sidenote: the error from the metadata specifically happens when having string columns, because for an empty column (which will be object dtype in pandas) pyarrow cannot infer that it are strings, giving inconsistent schemas for the different files. This is a typical gotcha with all-NA or empty object dtype columns(https://github.com/dask/dask/issues/6243), and for that reason dask added the schema
keyword, as explained in the error message you get. Passing schema="infer"
should infer it from the first non-empty partition.
But I suppose we should simply skip writing those partitions?
That sounds like a very sensible solution. If that happens, I would maybe just warn about that so user is not surprised that the number of parquets does not equal number of partitions.
Yeah, I am only now wondering if there is a reason that dask doesn't do this ..
Passing
schema="infer"
should infer it from the first non-empty partition.
Ah, and that doesn't work because pyarrow cannot (yet) infer the type from a geometry dtype-column ..
Ah, and that doesn't work because pyarrow cannot (yet) infer the type from a geometry dtype-column
Shall we then wait for a pyarrow update?