Proposal: `demux_enum_by` operator
Motivation/User Story
demux_enum makes writing pipelines for processing enum variants easy. A limitation of the operator is that it cannot be used to pass contextual information easily. For example, networking operators often return (Message, SocketAddress) pairs, where Message is an enum. Processing the message and sending the response requires a demux_enum on the Message, which strips the SocketAddress and makes it difficult to send a response back to SocketAddress. To work around this, all networking and example code creates separate versions of the Message enum - Message, which is what the wire protocol contains, and then MessageWithAddress, which holds the address inside it and is an alternative representation of (Message, SocketAddress). This is cumbersome.
Proposal
Introduce a demux_enum_by operator that accepts a MapFn: Fn(U) -> T where T is the enum we want to demux_enum on.
Challenges
Type of enum cannot be inferred from MapFn
The signature for demux_enum is demux_enum::<EnumType>() - specifying the enum type explicitly is required for various checks to be performed in the generated rust code. For example, we evaluate that all variants of the enumeration are covered by generating a no-op match statement that matches the variants specified in the hydroflow program.
This functionality cannot be performed using type inference alone since the type information is unavailable for macro processing. This limitation applies to the proposed demux_enum_by operator as well—it cannot infer the enum's type by examining the tokens for the supplied MapFn, so the user must specify the enum type explicitly.
The final signature for the demux_enum_by operator, therefore, looks something like this: demux_enum_by::<T>(MapFn: Fn(U) -> T).
Output of demux_enum_by
Since variants of enums aren't types themselves, the output side of demux_enum "flattens" the fields of the variant into tuples. This introduces some challenges in passing through the input to demux_enum to the output side.
The output of demux_enum_by isn't solved yet. Otherwise have a working prototype here: https://github.com/hydro-project/hydroflow/tree/rohit_demux_enum_by
#1761 keyed streams may help with this