RxPY icon indicating copy to clipboard operation
RxPY copied to clipboard

[Feature] call_latest for more versatile combine_latest behavior

Open Ivorforce opened this issue 3 years ago • 0 comments

Is your feature request related to a problem? Please describe. No - this is about a convenience function.

Describe the solution you'd like A function that allows for calling a stream of functions with a stream of args, kwargs.

rx.call_latest(rx.just(dict))(
    a=a,
    b=b,
    c=c,
)

Describe alternatives you've considered There is an alternative using combine_latest and map:

rx.combine_latest(a, b, c).pipe(
    ops.map(lambda *args: dict(a=args[0], b=args[1], c=args[2])
)

The downside to this is the loss of context between argument order and name. This is error-prone. Especially, inconvenient code constructs often invite bad habits and can lead to even more bugs or less readable code.

Implementation / Solution I happen to have an implementation ready as I use it myself. The interface is makes it pipeable (for the function, if it is a stream).

def call_latest(*args, **kwargs):
    def map(fn):
        kwargs_keys, kwargs_vals = zip(*kwargs.items()) if kwargs else (tuple(), tuple())
        combined_args = [fn, *kwargs_vals, *args]

        def caller(input: tuple):
            iterator = iter(input)

            fun = next(iterator)
            kwargs = dict(zip(kwargs_keys, iterator))

            return fun(*iterator, **kwargs)

        return rxo.map(caller)(rx.combine_latest(*combined_args))

    return map

Let me know if anyone else is interested in this. It's not, as far as I know, part of any specifications - but I have had many uses for it over the months.

Ivorforce avatar Apr 28 '22 15:04 Ivorforce