Flow & bind functions
What do u guys think about adding some helpers functions like flow & bind, just like returns already has?
I try to use them and imho its looks better then chain of .and_then().and_then().
result: Result[int, ValueError] = flow(
"1",
bind(str_to_float),
bind(float_to_int),
)
def str_to_float(s: str) -> Result[float, ValueError]:
...
def float_to_int(f: float) -> Result[int, ValueError]:
...
I don't test my code on mypy, but pyright is ok with types of flow & bind, so i think its possible to adapt.
@minmax
I wanted to something similar to what you mention and just experimented some. Not sure if I understand exactly what you mean, but I managed to come with something similar to what you mention using the @as_result-decorator and the and_then-method such as:
from typing import Any
from result import Result, as_result
@as_result(KeyError)
def process_step1(input: dict[str, Any]) -> dict[str, Any]:
del input["test1"]
return input
@as_result(ZeroDivisionError)
def process_step2(input: dict[str, Any]) -> dict[str, Any]:
input["test2"] /= input["test3"]
return input
def main(input: dict[str, Any]) -> Result[dict[str, Any], Exception]:
return (
process_step1(input)
.and_then(process_step2)
.and_then(process_step1)
)
if __name__ == '__main__':
final_result = main({"test1": 1, "test2": 3, "test3": 3})
if final_result.is_ok():
print(f"Result: {final_result.unwrap()}")
if final_result.is_err():
print(f"{type(final_result.unwrap_err())}: {final_result.unwrap_err()}")
Unfortunately there are no examples in the readme or anywhere else, as far as I can find. The docstrings are not really clear in this case either so it was not really possible (at least for me) to understand how to use is this way. But with a bit of trial and error it was possible to figure it out.
@golgor - great example, @as_result seems to be return or pure from fucntional languages that takes a function output and wraps it in Result, correct?
Maybe the last part of an example can be something like:
final_result.and_then(print_result)
?
Also thought of this:
match final_result:
case Ok(content):
print(f"Result: {content}")
case Err(content):
print(f"{type(final_result.unwrap_err())}: {content}")
Here is what I found to be working with and_then (aka bind):
from result import Result, Ok, Err
def inc(x: int | float) -> Result:
return Ok(x + 1)
print(Ok(1).and_then(inc)) # Ok(2)
print(Err(None).and_then(inc)) # Err(None)
What do u guys think about adding some helpers functions like flow & bind, just like
returnsalready has?
I think you already have bind - it is called and_then. What would be a type signature of flow()? How about flow(1, str_to_float, float_to_int)? This way flow() is a wrapper around initial value and a list of functions:
def flow(value, *callables):
r = Ok(value)
for f in callables:
r = r.and_then(f)
return r
@epogrebnyak
To be honest I have no experience with functional programming languages, but I understand as you mention. It takes a function as input (that is how a decorator works) and wraps the output in a Result.
I also looked into using a match-statement and it makes it easier to read. You might even have separate management on different kinds of errors:
match final_result:
case Ok(content):
print(f"Result: {content}")
case Err(content):
if type(content) == KeyError:
print(f"KeyError: {content}")
if type(content) == ZeroDivisionError:
print(f"ZeroDivisionError: {content}")
I have this use-case at work where we get IoT-data as JSON-structures (i.e. Python dicts) and we want to perform a series of transforms on them. I did some testing with create a wrapper class for the "pipeline data". My main idea was to properly management of more complicated structures such as dicts. I also want to get some kind of result from each step.
Not sure if it just makes it more complicated, and I haven't tested it properly, but might give you some ideas. Sorry for the lengthy code:
from __future__ import annotations
import copy
import json
from typing import Any, Generic, TypeVar
from result import Err, Ok, Result, as_result
Message = dict[str, Any]
V = TypeVar("V", bound=Message)
@as_result(KeyError)
def process_step1(input: PipelineData[Message]) -> PipelineData[Message]:
data = input.get_last_step()
del data["test1"]
input.add_step("Remove 'test1'", data)
return input
@as_result(ZeroDivisionError)
def process_step2(input: PipelineData[Message]) -> PipelineData[Message]:
data = input.get_last_step()
data["test2"] /= data["test3"]
input.add_step("divide test2 with test3", data)
return input
@as_result(ValueError)
def process_step3(input: PipelineData[Message]) -> PipelineData[Message]:
data = input.get_last_step()
data["test3"] = 5
input.add_step("set test3 to 5", data)
return input
@as_result(ValueError)
def process_step4(input: PipelineData[Message]) -> PipelineData[Message]:
data = input.get_last_step()
data["test1"] = 5
input.add_step("recreate and set test1 to 5", data)
return input
def main(input: PipelineData[Message]) -> Result[PipelineData[Message], Exception]:
return (
process_step1(input)
.and_then(process_step2)
.and_then(process_step3)
.and_then(process_step4)
)
class PipelineData(Generic[V]):
"""A class to hold data for processing in a pipeline.
It keeps a record of all processing steps and the data from each step, so it is possible to get the data from
any previous steps at any time in the pipeline. It also provides the option to make the steps 'immutable' by
not allowing the same keys for the steps, i.e. overwriting one step with another.
It is advised to set the step name to something descriptive of what the step does, e.g. 'Sum of run_log'.
This will make it much easier to debug the pipeline if something goes wrong.
"""
def __init__(self, data: V, safe: bool = True) -> None:
"""Initializes a new instance of PipelineData.
Saves the provided data in the processing steps under the name 'Original'.
Args:
data (V): Any kind of data to be processed in the pipeline.
"""
self._safe = safe
self._processing_steps: dict[str, V] = {"original": data}
def add_step(self, step_name: str, data: V) -> None:
"""Add a processing step.
This saves a new processing step and the data from that step.
"""
if self._safe and step_name in self._processing_steps:
raise ValueError(f"Step name '{step_name}' already exists!")
self._processing_steps[step_name] = data
def get_last_step(self) -> V:
"""Get the data from the last processing step.
This is intended to serve the data from the last processing step ready for the next step. The data is copied
using deepcopy to avoid references the old data when changes are made.
Returns:
T: The data from the last processing step.
"""
data_copy = copy.deepcopy(self._processing_steps)
return data_copy[self.last_step_name]
@property
def last_step_name(self) -> str:
"""Helper function to get the name of the last processing step.
Returns:
str: The name of the last step performed.
"""
return list(self._processing_steps)[-1]
@property
def original_data(self) -> V:
"""Helper function to get the original data.
Returns:
V: The original data as provided during initialization.
"""
return self._processing_steps["original"]
def __str__(self) -> str:
return f"{json.dumps(self._processing_steps, indent=4)}"
if __name__ == "__main__":
data = PipelineData({"test1": 1, "test2": 3, "test3": 3})
final_result = main(data)
match final_result:
case Ok(content):
print(f"Result: {content}")
case Err(content):
if type(content) == KeyError:
print(f"KeyError: {content}")
if type(content) == ZeroDivisionError:
print(f"ZeroDivisionError: {content}")
Realised final_result.and_then(print_result) will not work as and_then on Err just passes itself without applying any function.
As for printing - better use match err: case Err(KeyError(message)) or isinstance(content, KeyError).
The big example of a JSON pipeline... I would suggest keep it as simple as possible, seems like very many concerned jammed into one class, seems very risky. I'd keep the original data and list of transformations if I was doing something similar, not sure dict is a perfect data structure for this.
As for printing - better use
match err: case Err(KeyError(message))or isinstance(content, KeyError).
That won't work with the standard Exception-classes, I get an error from both Pylance (PyRight) and runtime error when executing. The following will work, but still raise a PyRight-error:
match final_result:
case Ok(content):
print(f"Result: {content}")
case Err(KeyError):
print(f"KeyError: {KeyError}")
case Err(ZeroDivisionError):
print(f"ZeroDivisionError: {ZeroDivisionError}")
case _:
print("Something else went wrong!")
I do see your concern about the PipelineData. It was more an experiment, maybe it is possible to use something similar but simpler.
That won't work with the standard Exception-classes, I get an error from both Pylance (PyRight) and runtime error when executing. The following will work, but still raise a PyRight-error:
Here is a code that works and also passes mypy check:
https://github.com/epogrebnyak/result-playground/blob/a82477fce6adaea76bd85d1575aa7060c83aaf99/and_then.py#L15-L34
Overall just printing final result is fine, there is not much that print(f"KeyError: {KeyError}") add to final_result.value.repr.
In practice, I tried to use my own implementation of flow, but came to the conclusion that it is too limited and not convenient in practice. "and_then" is certainly more convenient and flexible.
Actually, I propose to close the ticket as not useful.
OnlyOk = Result[T, Never]
AnyResult = Result[Any, Any]
@overload
def flow(
val: T,
*functions: *tuple[Callable[[OnlyOk[T]], Result[R, E]]],
) -> Result[R, E]: ...
@overload
def flow(
val: T,
*functions: *tuple[
Callable[[OnlyOk[T]], Result[R1, E1]],
Callable[[Result[R1, E1]], Result[R, E]],
],
) -> Result[R, E]: ...
...
@overload
def flow(
val: T,
*functions: *tuple[
Callable[[OnlyOk[T]], Result[R1, E1]],
Callable[[Result[R1, E1]], Result[R2, E2]],
Callable[[Result[R2, E2]], Result[R3, E3]],
Callable[[Result[R3, E3]], Result[R4, E4]],
Callable[[Result[R4, E4]], Result[R5, E5]],
Callable[[Result[R5, E5]], Result[R, E]],
],
) -> Result[R, E]: ...
def flow(
val: T,
*functions: *tuple[
Callable[[OnlyOk[T]], AnyResult],
*tuple[Callable[[AnyResult], AnyResult], ...],
],
) -> Result[R, E]:
"""Build a pipe from functions and pass value into it.
Typing:
-------
flow(
A,
(Container[A] -> Container[B]),
(Container[B] -> Container[C]),
) -> Container[C]
Example:
-------
flow(
1.5,
bind(int),
bind(str)
) == Ok("1")
"""
return functools.reduce(
lambda value, func: func(value),
functions,
Ok(val),
)