spatialpandas icon indicating copy to clipboard operation
spatialpandas copied to clipboard

DaskGeoDataFrame and datashader incompatibility

Open ahnsws opened this issue 9 months ago • 5 comments

Hello, I am running into an issue where using datashader on a DaskGeoDataFrame results in an error. To reproduce, I have the following poetry environment running on Ubuntu 22.04.5 LTS:

python = ">=3.12,<3.13"
spatialpandas = "0.5.0"
dask = "2025.3.0"
datashader = "0.17.0"
numpy = "2.1.3"

I followed this blog post from Holoviz to set up the DaskGeoDataFrame, and the code that generates the error is the below:

from pathlib import Path

from datashader import Canvas
from spatialpandas.dask import DaskGeoDataFrame
from spatialpandas.io import read_parquet_dask


def run():
    pq_file = Path(__file__).parent / "data" / "test.parq"

    gdf = read_parquet_dask(pq_file)
    assert isinstance(gdf, DaskGeoDataFrame)

    canvas = Canvas()
    canvas.points(gdf, geometry="geometry")


if __name__ == "__main__":
    run()

This gives the following error:

Traceback (most recent call last):
  File "2025-03-27_minimal.py", line 54, in <module>
    run()
  File "2025-03-27_minimal.py", line 50, in run
    canvas.points(gdf, geometry="geometry")
  File "/home/titanium/.cache/pypoetry/virtualenvs/sandbox-datashader2-_RrFaDUd-py3.12/lib/python3.12/site-packages/datashader/core.py", line 229, in points
    return bypixel(source, self, glyph, agg)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/titanium/.cache/pypoetry/virtualenvs/sandbox-datashader2-_RrFaDUd-py3.12/lib/python3.12/site-packages/datashader/core.py", line 1351, in bypixel
    return bypixel.pipeline(source, schema, canvas, glyph, agg, antialias=antialias)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/titanium/.cache/pypoetry/virtualenvs/sandbox-datashader2-_RrFaDUd-py3.12/lib/python3.12/site-packages/datashader/utils.py", line 121, in __call__
    return lk[cls](head, *rest, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/titanium/.cache/pypoetry/virtualenvs/sandbox-datashader2-_RrFaDUd-py3.12/lib/python3.12/site-packages/datashader/data_libraries/dask.py", line 42, in dask_pipeline
    return da.compute(dsk, scheduler=scheduler)[0]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/titanium/.cache/pypoetry/virtualenvs/sandbox-datashader2-_RrFaDUd-py3.12/lib/python3.12/site-packages/dask/base.py", line 656, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/titanium/.cache/pypoetry/virtualenvs/sandbox-datashader2-_RrFaDUd-py3.12/lib/python3.12/site-packages/dask/local.py", line 455, in get_async
    raise ValueError("Found no accessible jobs in dask")
ValueError: Found no accessible jobs in dask

Process finished with exit code 1

To get the code to work, I had to revert the packages to the following:

python = ">=3.12,<3.13"
spatialpandas = "0.4.10"
dask = "2024.12.1"
datashader = "0.17.0"
numpy = "1.26.4"

The only output now is a bunch of warnings:

/home/titanium/.cache/pypoetry/virtualenvs/sandbox-datashader2-_RrFaDUd-py3.12/lib/python3.12/site-packages/dask/dataframe/__init__.py:49: FutureWarning: 
Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.

  warnings.warn(msg, FutureWarning)
/home/titanium/.cache/pypoetry/virtualenvs/sandbox-datashader2-_RrFaDUd-py3.12/lib/python3.12/site-packages/spatialpandas/io/parquet.py:353: FutureWarning: Passing 'use_legacy_dataset' is deprecated as of pyarrow 15.0.0 and will be removed in a future version.
  d = ParquetDataset(
/home/titanium/.cache/pypoetry/virtualenvs/sandbox-datashader2-_RrFaDUd-py3.12/lib/python3.12/site-packages/spatialpandas/io/parquet.py:137: FutureWarning: Passing 'use_legacy_dataset' is deprecated as of pyarrow 15.0.0 and will be removed in a future version.
  dataset = ParquetDataset(
# the same warning is repeated many times

Process finished with exit code 0


I wasn't sure how to create an empty DaskGeoDataFrame, but the way I generated the parquet file was to download one of the csv files as mentioned in the above Holoviz blog post and use the below script:

from pathlib import Path

import dask.dataframe as dd
import numpy as np
from dask.diagnostics import ProgressBar
from spatialpandas import GeoDataFrame
from spatialpandas.geometry import PointArray


def lon_lat_to_easting_northing(longitude, latitude):
    # copied here to avoid dependency on holoviews
    origin_shift = np.pi * 6378137
    easting = longitude * origin_shift / 180.0
    with np.errstate(divide="ignore", invalid="ignore"):
        northing = (
            np.log(np.tan((90 + latitude) * np.pi / 360.0)) * origin_shift / np.pi
        )
    return easting, northing


def convert_partition(df):
    east, north = lon_lat_to_easting_northing(
        df["LON"].astype("float32"), df["LAT"].astype("float32")
    )
    return GeoDataFrame({"geometry": PointArray((east, north))})


def convert_csv_to_gdf():
    base_dir = Path(__file__).parent / "data"
    csv_files = base_dir / "AIS_2020_01*.csv"

    pq_file = base_dir / "test.parq"
    example = GeoDataFrame({"geometry": PointArray([], dtype="float32")})

    with ProgressBar():
        print("Reading csv files")
        gdf = dd.read_csv(csv_files, assume_missing=True)
        gdf = gdf.map_partitions(convert_partition, meta=example)

        print("Writing parquet file")
        gdf = gdf.pack_partitions_to_parquet(pq_file, npartitions=64)

    return gdf


if __name__ == "__main__":
    convert_csv_to_gdf()

using the below versions:

python = ">=3.12,<3.13"
spatialpandas = "0.4.10"
dask = "2024.12.1"
datashader = "0.17.0"
numpy = "1.26.4"

This is not exactly breaking, but it would be nice to be able to use updated packages. Thank you!

ahnsws avatar Mar 27 '25 17:03 ahnsws

Seems like there is some wrong path with read_parquet_dask

import dask.dataframe as dd
import spatialpandas as spd
from datashader import Canvas
from spatialpandas.dask import DaskGeoDataFrame
from spatialpandas.io import read_parquet_dask

points = spd.geometry.PointArray([[-17573586.0, 2429270.75], [-13642480.0, 6193684.0]])
sdf = spd.GeoDataFrame({"geometry": points})
sddf = dd.from_pandas(sdf, npartitions=2)
assert isinstance(sddf, DaskGeoDataFrame)

Canvas().points(sddf, geometry="geometry")  # Works

# Save to parquet and run again
sddf.to_parquet("data.parq")
gdf = read_parquet_dask("data.parq")
assert isinstance(gdf, DaskGeoDataFrame)
Canvas().points(gdf, geometry="geometry")  # Fails

FWIW, dask has a complete rewrite of Dask.DataFrame, which is used by the latest packages. So, I don't know how easy it will be to track down the underlying issue.

hoxbro avatar Mar 28 '25 08:03 hoxbro

Thanks for the much more concise example! After some more testing, the above code snippet runs without failing with the following:

spatialpandas = "0.4.10"
dask = "2024.11.2"
datashader = "0.17.0"
numpy = "1.26.4"

I suppose it'll do for now.

ahnsws avatar Mar 28 '25 18:03 ahnsws

I am hitting the same error when using read_parquet_dask, as described in the comment above.

With dask 2025.1 I get the ValueError: Found no accessible jobs in dask as per this issue's initial report. With a more recent dask 2025.5.1 I get the trace bellow:

In [1]: cvs = ds.Canvas()

In [2]: agg = cvs.points(df, geometry='loc')
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[2], line 1
----> 1 agg = cvs.points(df, geometry='loc')

File ~/code/vraoum/venv-vraoum/lib/python3.12/site-packages/datashader/core.py:229, in Canvas.points(self, source, x, y, agg, geometry)
    222     else:
    223         raise ValueError(
    224             "source must be an instance of spatialpandas.GeoDataFrame, "
    225             "spatialpandas.dask.DaskGeoDataFrame, geopandas.GeoDataFrame, or "
    226             "dask_geopandas.GeoDataFrame. Received objects of type {typ}".format(
    227                 typ=type(source)))
--> 229 return bypixel(source, self, glyph, agg)

File ~/code/vraoum/venv-vraoum/lib/python3.12/site-packages/datashader/core.py:1351, in bypixel(source, canvas, glyph, agg, antialias)
   1349 with warnings.catch_warnings():
   1350     warnings.filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
-> 1351     return bypixel.pipeline(source, schema, canvas, glyph, agg, antialias=antialias)

File ~/code/vraoum/venv-vraoum/lib/python3.12/site-packages/datashader/utils.py:121, in Dispatcher.__call__(self, head, *rest, **kwargs)
    119 for cls in getmro(typ)[1:]:
    120     if cls in lk:
--> 121         return lk[cls](head, *rest, **kwargs)
    122 raise TypeError("No dispatch for {0} type".format(typ))

File ~/code/vraoum/venv-vraoum/lib/python3.12/site-packages/datashader/data_libraries/dask.py:42, in dask_pipeline(df, schema, canvas, glyph, summary, antialias, cuda)
     39 scheduler = dask.base.get_scheduler() or df.__dask_scheduler__
     41 if isinstance(dsk, da.Array):
---> 42     return da.compute(dsk, scheduler=scheduler)[0]
     44 df = _dask_compat(df)
     45 keys = df.__dask_keys__()

File ~/code/vraoum/venv-vraoum/lib/python3.12/site-packages/dask/base.py:681, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    678     expr = expr.optimize()
    679     keys = list(flatten(expr.__dask_keys__()))
--> 681     results = schedule(expr, keys, **kwargs)
    683 return repack(results)

File ~/code/vraoum/venv-vraoum/lib/python3.12/site-packages/dask/local.py:191, in start_state_from_dask(dsk, cache, sortkey, keys)
    189 if task is None:
    190     if dependents[key] and not cache.get(key, None):
--> 191         raise ValueError(
    192             "Missing dependency {} for dependents {}".format(
    193                 key, dependents[key]
    194             )
    195         )
    196     continue
    197 elif isinstance(task, DataNode):

ValueError: Missing dependency ('fromdelayed-29d473b8b8dfafdf92db02e71c19e586', 19) for dependents {('delayed-container-read_parquet-chunk-3e2b8d8064fde4a7412916050f0ba458', 19)}

For now I did not find the downgrade equilibrium of the different package versions to make it work without breaking other parts of my code.

looran avatar Jun 11 '25 19:06 looran

running compute() on read_parquet_dask result fixes the issue in my code and in the issue reproducer code too, updated bellow.

import dask.dataframe as dd
import spatialpandas as spd
from datashader import Canvas
from spatialpandas.dask import DaskGeoDataFrame
from spatialpandas.io import read_parquet_dask

points = spd.geometry.PointArray([[-17573586.0, 2429270.75], [-13642480.0, 6193684.0]])
sdf = spd.GeoDataFrame({"geometry": points})
sddf = dd.from_pandas(sdf, npartitions=2)
assert isinstance(sddf, DaskGeoDataFrame)

c1 = Canvas().points(sddf, geometry="geometry")  # Works

# Save to parquet and run again
sddf.to_parquet("data.parq")
gdf = read_parquet_dask("data.parq")
assert isinstance(gdf, DaskGeoDataFrame)
#Canvas().points(gdf, geometry="geometry")  # Fails
c2 = Canvas().points(gdf.compute(), geometry="geometry")  # Works

assert(c1.equals(c2))

looran avatar Jun 16 '25 22:06 looran

@looran, that is somewhat expected as it converts it to a pandas DataFrame from a dask DataFrame. The problem is related to dask.

hoxbro avatar Jun 17 '25 09:06 hoxbro