User flow kernels
Below is a tentative (rough) design for the API of user flow kernels in Python, which could in principle translate in C++ and in Numba jitted functions (hopefully!). Please note that this is really open for discussion!
The kernel could be defined by a simple function:
def basic_kernel(
inputs_node: dict[str, float],
inputs_receivers: dict[str, list[float]],
outputs_node: dict[str, float],
receivers: dict[Literal["distance", "weight"], list[float]],
outlet_or_pit: bool,
):
"""Basic flow kernel example.
The kernel computes an output erosion value at the current node,
which is half the difference between the elevation at the
current node and the elevation at the lowest flow receiver node.
Parameters
----------
inputs_node : dict
Values of the input fields at the current node.
inputs_receivers : dict
Values of the input fields at each of the flow receiver nodes.
outputs_node : dict
Values of the output fields at the current node.
receivers : dict
Receiver properties (distance and weight from current node to receiver node).
outlet_or_pit : bool
True if the current node is an outlet or a pit
"""
# note: receiver properties (distance, weight) are not needed in this example
if outlet_or_pit:
# no erosion at domain boundaries
outputs_node["erosion"] = 0.0
else:
outputs_node["erosion"] = (inputs_node["elevation"] - min(inputs_receivers["elevation"])) / 2
Applying the kernel could be done for example by calling a method of the FlowGraph class where we pass input and output fields as arrays (each array element correspond to a grid/graph node):
import fastscapelib as fs
# grid/graph setup
grid = fs.RasterGrid(...)
graph = flow_graph(grid, [fs.SingleFlowRouter()])
# init inputs
rng = np.random.Generator(np.random.PCG64(1234))
elevation = rng.uniform(0, 1, size=grid.shape)
# allocate outputs
erosion = np.empty_like(elevation)
# update flow paths
graph.update_routes(elevation)
# apply kernel here
graph.apply_kernel(
basic_kernel,
inputs={"elevation": elevation},
outputs={"erosion": erosion},
direction="bottomup",
)
graph.apply_kernel(...) would:
- check that the shapes / sizes of the input and output arrays match the shape of the grid / number of flow graph nodes
- create and maybe pre-allocate the containers that will be passed to the kernel function as arguments
- traverse the graph in the "bottomup" direction (could also be "topdown") and at each node:
- fill and maybe resize the input/output containers to pass to the kernel function
- call the user kernel function
- collect the output value(s) and fill the output array(s)
A few comments / questions:
- In the majority of cases the input and output fields are both scalar fields defined on the grid / graph nodes. It may be nice in the future to somehow use similar kernels for computing values on the edges of the graph, but let's stick with the case of graph nodes for now.
- I expect that all scalar fields have floating point values, but maybe there will be exceptions (integers, etc.)?
- In the basic example above, there is only one input "elevation" and one output field "erosion". We have real cases with multiple inputs (e.g., SPL eroder with "elevation" and "drainage_area"). Multiple outputs might be possible but less likely.
- We should allow an input field and an output field be both references / pointers to the same array or buffer (e.g., in case we want in the kernel function to update the "elevation" in place.
- The basic example above doesn't require any arbitrary (scalar) parameter, but we will need that as well in real cases.
- There may be cases where a kernel needs to iterate over the flow donors of the current node, such that we would also need to add
inputs_donorsanddonorsarguments to the kernel function signature (maybe optional?). - User flow kernels introduce a convenient layer of abstraction at the cost of some overhead (filling / resizing the temporary containers passed to each kernel call), but I hope we'll manage to keep the latter acceptable.
I've been experimenting a bit with Numba and I've written a basic proof-of-concept (details below). The main idea is to first jit-compile the user-defined kernel function and then wrap it in a C callback (via numba.cfunc) + rely on C-like structures, so that we can provide a clean signature for the user kernel function.
pros:
- I really like the simplicity and readability of the user-defined kernel function. I think it is very user-friendly.
cons:
- A major limitation is that the C-like structures are static ("elevation" and "erosion" fields are hard-coded in the example below)
- I haven't checked the performance cost (in C++) of filling (arrays of) C-like structures before passing them to the callback
- We have to use fixed-size arrays, which are set to an arbitrary maximum number of elements for storing the field values at each of the flow receivers, but I think this is not a big deal (a little waste of memory)
- There is no safe-guards regarding indexing the C-like arrays within the user-defined kernel function, but I think that's OK (such kernel functions should be handled by users that know what they are doing)
In the example below I omitted a few things from the description in the top comment (for simplicity). Also, I think that both inputs and outputs can be merged as one argument.
- Define the (hard-coded) structure for storing field values and implement the Python function decorator:
import numba as nb
fields_type = nb.types.Record.make_c_struct([
("elevation", nb.types.float64),
("erosion", nb.types.float64),
])
def jit_flow_kernel(func):
"""Decorator for creating a Fastscapelib flow kernel from a Python
function.
The input function ``func`` is just-time-compiled and then wrapped in a
C/C++ callback so that is can be executed efficiently while travesing
the flow graph.
The input function must have the following signature:
``func(n_receivers, fields_node, fields_receivers) -> None``
Where
- ``n_receivers`` is the number of flow receivers
- ``fields_node`` are field values at the current node
- ``fields_receivers`` are field values at each of the receiver nodes
"""
jitted_func = nb.njit(func)
signature = nb.types.void(
nb.types.intc,
nb.types.CPointer(fields_type),
nb.types.CPointer(fields_type),
)
# use fixed and arbitrary max. number of receivers
n_receivers_max = 15
@nb.cfunc(signature)
def wrapped(n_receivers, fields_node_ptr, fields_receivers_ptr):
fields_node = nb.carray(fields_node_ptr, 1)
fields_receivers = nb.carray(fields_receivers_ptr, n_receivers_max)
jitted_func(n_receivers, fields_node[0], fields_receivers)
return wrapped.ctypes
- Example of a decorated (jitted) user flow kernel function:
@jit_flow_kernel
def test(n_receivers, fields_node, fields_receivers):
lowest_rec_elevation = fields_node.elevation
for i in range(n_receivers):
rec_elevation = fields_receivers[i].elevation
if rec_elevation < lowest_rec_elevation:
lowest_rec_elevation = rec_elevation
fields_node.erosion = (fields_node.elevation - lowest_rec_elevation) / 2.0
- Emulate kernel function execution from C (C++) using CFFI:
from cffi import FFI
# define the fields C structure
src = """
typedef struct fields_type {
double elevation;
double erosion;
} fields_type;
"""
ffi = FFI()
ffi.cdef(src)
# create and fill test data
data_node = ffi.new('fields_type[1]')
data_node[0].elevation = 10.
data_node_ptr = ffi.cast('fields_type*', data_node)
data_node_addr = int(ffi.cast('size_t', data_node_ptr))
data_receivers = ffi.new('fields_type[15]')
data_receivers[0].elevation = 7.
data_receivers[1].elevation = 8.
data_receivers[2].elevation = 6.
data_receivers_ptr = ffi.cast('fields_type*', data_receivers)
data_receivers_addr = int(ffi.cast('size_t', data_receivers_ptr))
n_receivers = 3
# call the C callback
test(n_receivers, data_node_addr, data_receivers_addr)
# check that erosion has been assigned with the right value
# (using the test kernel and data above, this should return 2.0)
data_node[0].erosion
A major limitation is that the C-like structures are static ("elevation" and "erosion" fields are hard-coded in the example below)
To overcome this limitation, one solution could be to use fixed-size nested c-style arrays instead of c-style structures, i.e., fields_node[1] (fields_receivers[1][x]) corresponds to the value of the 2nd field at the current node (at the x-th receiver).
This would limit both the max number of receivers and max number of fields allowed, but I think we could set those numbers (e.g., 20x20) that are high enough for 99% of cases while still having a small memory footprint. The overall cost in performance (filling those arrays before executing the callback) would be minimized too for such simple data structures.
For convenience, we can allow users setting and passing the field names to the decorator and decorated kernel function, e.g.,
@jit_flow_kernel(field_names=("elevation", "erosion"))
def test(n_receivers, fields_node, fields_receivers, field_names):
# not sure how this would impact performance
elevation_idx = field_names.index("elevation")
erosion_idx = field_names.index("erosion")
lowest_rec_elevation = fields_node[elevation_idx]
...
Closed in #157.