pipes icon indicating copy to clipboard operation
pipes copied to clipboard

non exhaustive pipes

Open tnovotny opened this issue 5 years ago • 3 comments

It would be really nice if pipes could communicate that they are done. Take for example take, where there is some expensive transformation downstream, it would be sweet if take(n) would only 'trigger' n calculations. So maybe something like a done() method could be used to communicate upstream that the pushing can stop. For example the code for onReceive in transform looks like this.

    template<typename... Values, typename TailPipeline>
    void onReceive(Values&&... values, TailPipeline&& tailPipeline)
    {
        send(detail::invoke(function_.get(), FWD(values)...), tailPipeline);
    }

It does not check whether the tailPipeline wants the result. So if pipes had a way to check, then we could rewrite this as

    template<typename... Values, typename TailPipeline>
    void onReceive(Values&&... values, TailPipeline&& tailPipeline)
    {
        if( not tailPipeline.done() )
        {
            send(detail::invoke(function_.get(), FWD(values)...), tailPipeline);
        } 
        else 
        {
           done_ = true;
        }
    }

tnovotny avatar Oct 27 '19 10:10 tnovotny

Here a link to a simple working example based primarily on the existing pipes code. It no longer uses "smart output iterators" as they are too "dumb". I removed the proxies and the pipeline as they weren't need to get this running.

tnovotny avatar Oct 28 '19 11:10 tnovotny

Here is a more extensive example that removes some of the constraints in the previous one. In the example, the top part in the namespace pipes is essentially code form this library with some modifications. The lower part after using namespace pipes is mainly mine.

The example contains a transform >>= filter >>= take pipe because that is a compelling example of where this is useful.

I'm not sure whether this is a bug or not, but something like

std::vector<int> a{ 1, 2, 3, 4 };
std::vector<int> b{ 11, 22, 33, 44 };

mux( a, b ) >>= filter( []( auto i, auto j ) { return i + j > 20.0; } )
    >>= transform( []( auto i, auto j ) {  return std::make_tuple( i * 10, j - 5 ); } )
    >>= to_out_stream( std::cout ); 

fails with

binary '<<': no operator found which takes a right-hand operand of type 'T' (or there is no acceptable conversion)
with [T=std::tuple<int,int>]

I added extra code in my transform to handle the case where the transformation returns a tuple, as I didn't know how better to let it return multiple values.

template<typename... Values, typename Pipe>
  void
  recv( Pipe&& tail, Values&&... values ) {
     auto recv = [&]( auto&&... values ) {  tail.recv( FWD( values )... ); };
     using transformed = decltype( transform_( FWD( values )... ) );

     if( not done() ) {
       if constexpr( is_tuple<transformed>::value ) {
         std::apply( recv, transform_( FWD( values )... ) );
       } else {
         std::invoke( recv, transform_( FWD( values )... ) );
       }

       tail_done_ = tail.done();
     }
  }

I am still not sure why there is a distinction between pipes and pipelines. The code seems to work fine without it.

tnovotny avatar Nov 01 '19 21:11 tnovotny

It is probably a good idea.

A pipe needs to pass its output to another pipe or pipeline, whereas a pipeline doesn't (it ends with a closing point such as pipes::push_back). Therefore, an input can only be connected to a pipeline, not a pipe.

godefv avatar Oct 16 '20 14:10 godefv