libunifex icon indicating copy to clipboard operation
libunifex copied to clipboard

Investigate changing customisations of algorithms to be based on connect instead of invocation of algorithm itself

Open lewissbaker opened this issue 2 years ago • 2 comments

The current implementation of scheduler affinity for the coroutine task type relies on special behaviour of being able to identify when the coroutine awaits a sender returned by schedule(some_scheduler) so that it can treat that as a change of the current scheduler to some other scheduler.

The current solution complicates the implementation of schedule() a lot and makes it difficult to extend to other kinds of expressions as well.

One option could be to turn invocation of sender algorithms into generic algorithms that just curry their arguments into a sender<CPO, Args...> type which then customises connect() to then pass the arguments through to connect(receiver, CPO, args...).

As all senders would then be effectively just an argument currying mechanism we could customise await_transform() when the awaited object is of type sender<tag_t<schedule>, SomeScheduler> to handle this.

This would also allow simpler customisation of entire sender expressions when, e.g. the expression is passed to on(sched, some_sender_expression). The sender expression can just be an expression-template of a well-known structure/type that the scheduler can use to customise on(). See below for an example of how this might work.

Implementation Sketch

It might also help if you think of connect() instead as async_invoke().

For example: We can define a generic sender type, parameterised on a CPO which is just responsible for currying arguments.

template<typename CPO, typename... Args>
struct sender {
  constexpr sender(const sender& other) = default;
  constexpr sender(sender&& other) = default;

  template<typename... Args2>
    requires sizeof...(Args) == sizeof...(Args2) && (std::constructible<Args, Args2> && ...)
  constexpr sender(Args2&&... args2) : args(static_cast<Args2&&>(args2)...)

  // Allow further currying args to produce another sender.
  template<typename... ExtraArgs>
    requires (std::copy_constructible<Args> && ...) && (std::constructible<remove_cvref_t<ExtraArgs>, ExtraArgs> && ...)
  sender<CPO, Args..., remove_cvref_t<ExtraArgs>...> operator()(ExtraArgs&&... extraArgs) & const {
    return std::apply([&](const Args&... args) {
      return sender<CPO, Args..., remove_cvref_t<ExtraArgs>...>{args..., static_cast<ExtraArgs&&>(extraArgs)...};
    }, this->args);
  }
  
  // Allow pipe operator to prepend an argument.
  template<typename Arg>
  friend sender<CPO, remove_cvref_t<Arg>, Args...> operator|(Arg&& arg, const sender& s) {
    return std::apply([&](const Args&... args) {
      return sender<CPO, remove_cvref_t<Arg>, Args...>{static_cast<Arg&&>(arg), args...};
    }, s.args);
  }
  
  // etc.. for other value categories

  // Customise async_invoke() to forward to the CPO
  template<typename Receiver, typename... ExtraArgs>
    requires std::invocable<tag_t<async_invoke>, Receiver, CPO, const Args&..., ExtraArgs...>
  friend decltype(auto) tag_invoke(tag_t<async_invoke>, Receiver r, const sender& s, ExtraArgs&&... extraArgs) {
    return std::apply([&](const Args&... args) {
      return async_invoke(move(r), CPO{}, args..., static_cast<ExtraArgs&&>(extraArgs)...);
    }, s.args);
  }

  std::tuple<Args...> args;
};

Then we can define CPOs to have a default implementation of async_invoke() that has the default implementation. We can use a helper here to allow invocation of the CPO to return the sender.

template<typename CPO>
struct sender_cpo_base {
  template<typename... Args>
  sender<CPO, remove_cvref_t<Args>...> operator()(Args&&... args) const {
    return sender<CPO, remove_cvref_t<Args>...>{static_cast<Args&&>(args)...};
  }
};

struct just_t : sender_cpo_base<just_t> {
  template<typename Receiver, typename... Args>
  struct default_op_state {
    Receiver r;
    std::tuple<remove_cvref_t<Args>...> args;
    
    friend void tag_invoke(tag_t<start>, default_op_state& op) noexcept {
      std::apply([&](Args&&... args) noexcept {
        set_value(move(op.r), move(args)...);
      }, move(op.args));
    }
  };

  template<typename Receiver, typename... Args>
    requires /* ... */
  default_op_state<Receiver, remove_cvref_t<Args>...> tag_invoke(tag_t<async_invoke>, Receiver r, just_t, Args&&... args) {
    return {move(r), {static_cast<Args&&>(args)...}};
  }
};

struct transform_t : sender_cpo_base<transform_t> {
  template<typename Receiver, typename Sender, typename... Funcs>
  struct default_op_state {
    struct receiver {
      op_state& op;
      
      template<typename... Values>
      friend void tag_invoke(tag_t<set_value>, receiver&& r, Values&&... values) {
        try {
          std::apply([&](Funcs&&... funcs) {
            if constexpr (sizeof...(Funcs) == 1) {
              set_value(move(r.op.r), move(funcs)(static_cast<Values&&>(values)...)...);
            } else {
              set_value(move(r.op.r), move(funcs)(values...)...);
            }
          }, move(r.op.funcs));
        } catch (...) {
          set_error(move(r.op.r), std::current_exception());
        }
      }
    };
    
    friend void tag_invoke(tag_t<start>, default_op_state& op) noexcept {
      start(op.childOp);
    }
    
    template<typename... Fs2>
    default_op_state(Receiver r, Sender&& s, Fs2&&... fs)
    : r(move(r))
    , funcs(static_cast<Fs2&&>(fs)...)
    , childOp(async_invoke(receiver{*this}, static_cast<Sender&&>(s)))
    {}
    
    Receiver r;
    std::tuple<Fs...> funcs;
    async_invoke_result_t<receiver, Sender> childOp;
};

Then a customisation of the algorithm could be implemented as follows:

struct my_scheduler {
  template<typename Receiver>
  struct schedule_op { ... };

  template<typename Receiver>
  friend auto tag_invoke(tag_t<async_invoke>, Receiver r, schedule_t, my_scheduler s) {
    return schedule_op<Receiver>{ ... };
  }
};

This doesn't yet handle things like sender-queries or sender-traits which also need to be considered, however.

Sketch of Scheduler Customisation

struct my_scheduler {
  // ...
};

// customise: on(my_scheduler{}, bulk(src, count, f))
template<typename Receiver, typename Src, typename Count, typename Func>
auto tag_invoke(tag_t<async_invoke>, Receiver r, tag_t<on>, my_scheduler s, sender<tag_t<bulk>, Src, Count, Func> bulkOp) {
  // customisation goes here...
}

lewissbaker avatar Mar 04 '22 22:03 lewissbaker

I have a prototype of a simple, non-intrusive way to add receiver-guided sender algorithm customization. It involves adding a connector receiver query. A connector is a binary callable that can be used to connect a sender to a receiver. A call to connect(s, r) is evaluated as follows:

  1. If connector(r)(s, r) is well-formed, that is what is used.
  2. Otherwise, if tag_invoke(connect, s, r) is well-formed, that is what is used.
  3. Otherwise, the expression connect(s, r) is ill-formed.

This is prototyped here, together with a pattern-matching facility to make discovering the deep structure of a sender tree simple.

EDIT: The pattern matching stuff (matches[_v], etc) is not a fundamental part of customization mechanism and can safely be ignored when evaluating the efficacy of customizing connect like this.

ericniebler avatar May 28 '22 23:05 ericniebler

@lewissbaker I would really appreciate your thoughts about how compatible this approach is with your language-based CPO mechanism. I don't want P2300 adopting a design for the connect CPO that makes migration to a language-based mechanism difficult.

ericniebler avatar May 31 '22 23:05 ericniebler