dask-image
dask-image copied to clipboard
Implement support for ndimage.map_coordinates
This PR suggests a dask-image
implementation for ndimage.map_coordinates
which covers the case in which the provided coordinates fit into memory and the input
array consists of a (potentially large) dask array. See https://github.com/dask/dask-image/issues/235.
Implementation details:
- for every coordinate the corresponding chunk of the input array and the required input slice are determined
- coordinates are grouped based on associated chunks
- a task is defined for each group and minimum required input array slice to apply
ndimage.map_coordinates
- output intensities are rearranged to match input order
Some tests are included, plan is to add some more following feedback on the choices of the implementation.
A good test would be to see how it performs for your use case @jni.
A good test would be to see how it performs for your use case @jni.
Yes, this would be very interesting to see
First, my apologies - I keep sitting down to look at this, and then deciding I need to come back later when I feel I can give it enough attention. Then the same thing happens again later.
Tests
I like the structure of the test validation, comparing the results from scipy.ndimage.map_coordinates with the new Dask implementation. I'd prefer each test case have an explicit assert statement at the end of it, the two actual test functions are not immediately clear with what they're checking.
numpy specific code
There is lots of numpy specific code here. That might cause a separate headache later on, with our goal to implement support for other, non-numpy array libraries (eg: cupy). I don't think that's a reason not to do it, especially if there are groups who would use a map_coordinates feature (Juan, possibly Adam Ginsburg). But I would prefer if we could minimise or eliminate calls to np.array
/np.asarray
. Again, this doesn't have to be your headache now.
Readability
If you could add comments to the main code function, indicating which parts handle each of the four steps you outline in the first comment on this PR thread, I think that would improve readability.
Also on readability, the docstring will need to be fleshed out a bit (but perhaps a bit later, this might be putting the cart before the horse right now).
... I still don't have thoughtful comments on your actual implementation (mostly I haven't fully understood it yet). Will try to sit down and give it another go soon-ish, but no promises!
Just wanted to say that I would find this feature very useful for making https://github.com/astropy/reproject more compatible with dask inputs!
Hey @GenevieveBuckley, first of all I really appreciate you having a look at this PR and giving your feedback and comments! Second of all apologies back for letting this sit still for quite a while now. I had worked on modifications some months back and am only pushing them now.
Tests I like the structure of the test validation, comparing the results from scipy.ndimage.map_coordinates with the new Dask implementation. I'd prefer each test case have an explicit assert statement at the end of it, the two actual test functions are not immediately clear with what they're checking.
Regarding the assert statements, it's probably not super clear that they're performed within validate_map_coordinates_general
, which is called from within a test function. In test_map_coordinates_large_input
, I added a comment to clarify that its testing functionality consists in finishing before timeout. Not really sure though whether it makes sense to test large inputs in this way (or at all?), I guess because 'large' is relative and also one would probably avoid heavy lifting during testing.
numpy specific code There is lots of numpy specific code here. That might cause a separate headache later on, with our goal to implement support for other, non-numpy array libraries (eg: cupy). I don't think that's a reason not to do it, especially if there are groups who would use a map_coordinates feature (Juan, possibly Adam Ginsburg). But I would prefer if we could minimise or eliminate calls to np.array/np.asarray. Again, this doesn't have to be your headache now.
That's a very good point to keep in mind. I eliminated some explicit conversions to numpy arrays, but we'd probably need to rewrite some things when implementing more general support.
Readability If you could add comments to the main code function, indicating which parts handle each of the four steps you outline in the first comment on this PR thread, I think that would improve readability.
Great idea, I added comments explaining the concrete steps that are performed in the code. That should hopefully make things clearer.
Also on readability, the docstring will need to be fleshed out a bit (but perhaps a bit later, this might be putting the cart before the horse right now).
Definitely, I'm open for suggestions. Actually, would there currently be a way to use @utils._update_wrapper
(that copies the original docstring), and add some additional comments to e.g. give additional details about the dask-image wrapping? If not, that might be useful.
your actual implementation (mostly I haven't fully understood it yet)
Hope the comments and explanations in the code make it clearer. I'm also linking a part of the previous discussion we had here: https://github.com/dask/dask-image/issues/235#issuecomment-848825188
Hey @astrofrog, thanks for your comment :) Yes you're right that this implementation doesn't work with chunked coordinate arrays. Allowing both the image and coordinate arrays to be dask arrays makes the problem much more complex. I guess this is because before the coordinates are present in memory, it's not known which chunks of the image array they need to be mapped onto. See this discussion https://github.com/dask/dask-image/issues/235.
I'd tend to think that while it's not necessarily safe to assume the coordinate array to be small in all use cases, it's reasonable to assume the coordinate array is at least smaller than the image array. Because in the contrary case, it's probably more efficient to arrange the coordinates on a regular grid or in some other sensible way. Also, this is true for most of my use cases (all?) of map_coordinates
.
What would be your thoughts on this?
For my use case (https://github.com/astropy/reproject), the coordinates might have a similar number of elements as the input array - we are essentially transforming images to different coordinate systems which can involve arbitrary distortions etc. We are currently able to deal with large arrays of coordinates in map_coordinates
by just calling map_coordinates
several times for different chunks of coordinates and then putting the results together at the end. Since we've already coded this up, it's not critical to me that map_coordinates
be able to deal with large arrays of coordinates, but just wanted to bring it up in case it might matter to others, and in case it turns out to be simple here.
Interesting. In the case of having regular coordinates I was thinking that sth like scipy.interpolate.RegularGridInterpolator
might be more appropriate than scipy.ndimage.map_coordinates
, but of course that's not the case for nonlinear transformations. And here coordinate arrays can be arbitrarily large.
What's also true though is that as you mention it's relatively straight forward to split up the coordinate array and call map_coordinates
several times. Therefore maybe it makes sense to use the current approach applied separately to each chunk of the coordinate array.
I guess this could in principle be done using the dask.distributed
functionality of launching tasks from within tasks. For a less brute-force approach, we could also have a look at whether the problem could be expressed as a shuffling one, as suggested by @jakirkham https://github.com/dask/dask-image/issues/235#issuecomment-850599585.
What's also true though is that as you mention it's relatively straight forward to split up the coordinate array and call map_coordinates several times. Therefore maybe it makes sense to use the current approach applied separately to each chunk of the coordinate array.
This makes a lot of sense to me. I can't see why it wouldn't work.
Thanks for your comment @jni! I'll try to implement this in the next week(s) - extending this PR's map_coordinates
implementation by applying it to each chunk of the coordinate array.
Happy New Year!
We are currently able to deal with large arrays of coordinates in map_coordinates by just calling map_coordinates several times for different chunks of coordinates and then putting the results together at the end.
What's also true though is that as you mention it's relatively straight forward to split up the coordinate array and call map_coordinates several times. Therefore maybe it makes sense to use the current approach applied separately to each chunk of the coordinate array.
Finally I pushed an implementation of this, which allows also the coordinates
to be given as a (potentially large) dask array.
I faced a little difficulty worth mentioning here. Following the approach we discussed above, when both input
and coordinates
are dask arrays, two rounds of compute are required. For each chunk of the coordinate array
- the coordinate values are computed
- and are then mapped onto the input array using the discussed approach.
To achieve this I used da.map_blocks
in combination with a little hack. Normally, the function used with map_blocks(func..)
receives precomputed data chunks. However, in this case, only the coordinate data chunk should be precomputed, while the input
array should remain a dask array. So I found the following way to avoid triggering the computation:
output = da.map_blocks(
_map_single_coordinates_array_chunk,
delayed(da.Array)(input.dask, input.name, input.chunks, input.dtype),
coordinates,
...)
(Edit: I posted a question regarding this on the dask discourse forum).
Apart from that, I adapted the tests and the implementation is consistent with the scipy
output for all combinations of numpy or dask arrays as inputs (without prefiltering). What the tests don't assess though is performance.
@astrofrog Would you be able to test dask_image.ndinterp.map_coordinates
with your use case of large coordinate arrays?
Actually, would there currently be a way to use @utils._update_wrapper (that copies the original docstring), and add some additional comments to e.g. give additional details about the dask-image wrapping? If not, that might be useful.
Just adding here that I just came across
dask.utils.derived_from(original_klass, version=None, ua_args=None, skipblocks=0, inconsistencies=None)
which could be useful to both automatically copy original docstrings and add comments about inconsistencies between dask-image and original functions.
Happy new year!
I quite like your little hack, it seems like a pretty elegant sidestep around the problem to me. I commented in more detail on the discourse thread.
Just dropping in to say that I am in awe. 😍 Happy New Year indeed! 😃
As a side note the RTD action failure appears to be unrelated to this PR, probably to do with some sphinx/jinja2 update...
File "/home/docs/checkouts/readthedocs.org/user_builds/dask-image/conda/237/lib/python3.7/site-packages/sphinx/util/rst.py", line 22, in <module>
from jinja2 import environmentfilter
ImportError: cannot import name 'environmentfilter' from 'jinja2' (/home/docs/checkouts/readthedocs.org/user_builds/dask-image/conda/237/lib/python3.7/site-packages/jinja2/__init__.py)
Thanks for having a closer look at the hack and your comments on the dask discourse @GenevieveBuckley :) So far it seems to be an okay solution.
As a side note the RTD action failure appears to be unrelated to this PR, probably to do with some sphinx/jinja2 update...
You're right, pinning jinja2<3.1
seems to fix the problem: https://github.com/readthedocs/readthedocs.org/issues/9038.
While exploring the performance of dask_image.ndinterp.map_coordinates
I found that associating the coordinates to the input array chunks was very inefficient, and optimized this a bit.
Here are some first performance comparisons (run on a macbook m2).
When everything fits into memory:
import dask.array as da
from dask_image import ndinterp
from scipy import ndimage
import numpy as np
from time import perf_counter
class catchtime:
def __enter__(self):
self.time = perf_counter()
return self
def __exit__(self, type, value, traceback):
self.time = perf_counter() - self.time
self.readout = f'Time: {self.time:.3f} seconds'
print(self.readout)
if __name__ == "__main__":
interp_order = 3
dim = 3
input_extent = 5*10**2
input_chunk_size = 100
coordinates_size = 10**7
coordinates_chunk_size = 10**6
input_np = np.random.randint(0, 100, size=[input_extent] * dim)
coordinates_np = np.random.randint(0, input_extent, size=(dim, coordinates_size))
input = da.from_array(input_np, chunks=[input_chunk_size] * dim)
coordinates = da.from_array(coordinates_np, chunks=(dim, coordinates_chunk_size))
print(input_extent, input_chunk_size, coordinates_size, coordinates_chunk_size)
with catchtime() as t:
rnp = ndimage.map_coordinates(input_np, coordinates=coordinates_np, order=interp_order) # 13.0 seconds
with catchtime() as t:
r = ndinterp.map_coordinates(input, coordinates=coordinates, order=interp_order) # 0.001 seconds
with catchtime() as t:
rc = r.compute(scheduler='threads') # 5.7 seconds
So here there seems to be a performance gain.
When things don't fit into memory anymore:
interp_order = 3
dim = 3
input_extent = 5 * 10**3
input_chunk_size = 10**2
coordinates_size = 10**10
coordinates_chunk_size = 10**4
input = da.random.randint(0, 100, size=[input_extent] * dim, chunks=[input_chunk_size] * dim)
coordinates = da.random.randint(0, input_extent, size=(dim, coordinates_size), chunks=(dim, coordinates_chunk_size))
with catchtime() as t:
r = ndinterp.map_coordinates(input, coordinates=coordinates, order=interp_order) # 0.364 seconds
with catchtime() as t:
r[0].compute(scheduler='threads') # 125.381 seconds
The last computation is really slow, but it finishes 😅
@m-albert I guess the dask diagnostics tools can help figure out where most of the time is going in that situation? (ie constantly spilling to disk, or something else?) I don't have much experience here though.
@jni Great idea. This is what comes out of profiling the two cases above: medium-sized arrays and larger-than-memory arrays.
The first case looks okay, on average 400% CPU are used by 8 workers, and memory looks fine I guess. The output array is divided into 10 chunks and interestingly, after finishing to compute the first 8 result chunks, a lot of caching seems to be done. This is probably the input array which is required by all tasks.
The second case is a bit more cryptic to me. Interestingly, for some reason only one worker is used.
When trying to use some of the distributed diagnostics options (as opposed to the local ones) I've noticed that the current implementation doesn't work using a distributed scheduler. I'll have a look into that next.
Maybe it is worth looking at the recent changes to rechunk
ing in Distributed ( https://github.com/dask/distributed/pull/7534 )
https://github.com/dask/dask-image/pull/278 should fix that readthedocs CI failure. Let us know if merging the main branch does not fix that particular problem.
Thanks for bringing the branch up to date @jakirkham.
Maybe it is worth looking at the recent changes to rechunking in Distributed ( https://github.com/dask/distributed/pull/7534 )
I was reading a bit into shuffling, its problem with dask's centralized scheduler and the recent changes. And think I understand a bit better now the relevance for map_coordinates
: In the general case in which each output coordinate chunk depends on many input array chunks, it'd be much more efficient if data flowed directly between workers instead of needing to go through the scheduler, which acts as a bottleneck.
Something I don't quite understand yet is how the new shuttling mechanism (that bypasses the scheduler) would be triggered in the context of arrays. For instance, I was assuming that some degree of direct communication between workers would already be happening by default when using a distributed backend (see here).
Does that mean it's worth trying to organise a conversation with Hendrik (author of https://github.com/dask/distributed/pull/7534)?
I don't know Hendrik, but perhaps we can work out a way to get your questions answered, particularly since this is still quite a new change in distributed.
That's interesting. Not entirely sure whether my questions would be super concrete at this point, but a conversation might clarify things a bit!
Something I don't quite understand yet is how the new shuttling mechanism (that bypasses the scheduler) would be triggered in the context of arrays. For instance, I was assuming that some degree of direct communication between workers would already be happening by default when using a distributed backend (see here).
Hm as far as I understand now, direct worker communication might be triggered by the scheduler sometimes, but the new peer to peer mechanism is explicitely implemented for
- array rechunking and
- dataframe shuffling (which rearranges a dataframe according to column entries).
Maybe there'd be a (very naive) workflow in which a map_coordinates
implementation could be mapped onto these two operations, with sth like this:
- convert the coordinate dask array into a dask dataframe
- determine the input chunk each coordinate maps onto
- shuffle the dataframe such that all rows (coordinates) that map onto the same input chunk land in the same partition
- apply
dask.array.overlap
to the input array to make sure coordinates close to chunk borders can be interpolated.overlap
can benefit from rechunking, and maybe here p2p could also help? - apply
ndimage.map_coordinates
on pairs of coordinate partitions and input (overlap) chunks in a hybrid ofmap_blocks
andmap_partitions