Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions include/stdexec/__detail/__affine_on.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ namespace STDEXEC
struct affine_on_t
{
template <sender _Sender>
constexpr auto operator()(_Sender&& __sndr) const -> __well_formed_sender auto
constexpr auto operator()(_Sender &&__sndr) const -> __well_formed_sender auto
{
return __make_sexpr<affine_on_t>({}, static_cast<_Sender&&>(__sndr));
return __make_sexpr<affine_on_t>({}, static_cast<_Sender &&>(__sndr));
}

constexpr auto operator()() const noexcept
Expand All @@ -62,10 +62,10 @@ namespace STDEXEC
}

template <class _Sender, class _Env>
static constexpr auto transform_sender(set_value_t, _Sender&& __sndr, _Env const & __env)
static constexpr auto transform_sender(set_value_t, _Sender &&__sndr, _Env const &__env)
{
static_assert(sender_expr_for<_Sender, affine_on_t>);
auto& [__tag, __ign, __child] = __sndr;
auto &[__tag, __ign, __child] = __sndr;
using __child_t = decltype(__child);
using __cv_child_t = __copy_cvref_t<_Sender, __child_t>;
using __sched_t = __call_result_or_t<get_scheduler_t, __not_a_scheduler<>, _Env const &>;
Expand Down Expand Up @@ -116,14 +116,26 @@ namespace STDEXEC

namespace __affine_on
{
template <class _Attrs>
struct __attrs
{
template <class _Tag>
constexpr auto query(__get_completion_behavior_t<_Tag>) const noexcept
template <class _Tag, class... _Env>
requires __queryable_with<_Attrs, __get_completion_behavior_t<_Tag>, _Env const &...>
constexpr auto query(__get_completion_behavior_t<_Tag>, _Env const &...) const noexcept
{
// FUTURE: when the child sender completes inline *and* the current scheduler also
// completes inline, we can return "inline" here instead of "__asynchronous_affine".
return __completion_behavior::__asynchronous_affine;
using __behavior_t =
__query_result_t<_Attrs, __get_completion_behavior_t<_Tag>, _Env const &...>;

// When the child sender completes inline, we can return "inline" here instead of
// "__asynchronous_affine".
if constexpr (__behavior_t::value == __completion_behavior::__inline_completion)
{
return __completion_behavior::__inline_completion;
}
else
{
return __completion_behavior::__asynchronous_affine;
}
}
};
} // namespace __affine_on
Expand All @@ -132,9 +144,9 @@ namespace STDEXEC
struct __sexpr_impl<affine_on_t> : __sexpr_defaults
{
static constexpr auto __get_attrs = //
[](__ignore, __ignore, __ignore) noexcept
[]<class _Child>(__ignore, __ignore, _Child const &) noexcept
{
return __affine_on::__attrs{};
return __affine_on::__attrs<env_of_t<_Child>>{};
};
};
} // namespace STDEXEC
194 changes: 142 additions & 52 deletions include/stdexec/__detail/__as_awaitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,123 +84,214 @@ namespace STDEXEC
using __expected_t =
std::variant<std::monostate, __value_or_void_t<_Value>, std::exception_ptr>;

// Helper to cast a coroutine_handle<void> to coroutine_handle<_Promise>
template <class _Promise>
constexpr auto __coroutine_handle_cast(__std::coroutine_handle<> __hcoro) noexcept
-> __std::coroutine_handle<_Promise>
{
return __std::coroutine_handle<_Promise>::from_address(__hcoro.address());
}
template <class _Tag, class _Sender, class... _Env>
concept __completes_inline_for = __never_sends<_Tag, _Sender, _Env...>
|| STDEXEC::__completes_inline<_Tag, env_of_t<_Sender>, _Env...>;

template <class _Sender, class... _Env>
concept __completes_inline = __completes_inline_for<set_value_t, _Sender, _Env...>
&& __completes_inline_for<set_error_t, _Sender, _Env...>
&& __completes_inline_for<set_stopped_t, _Sender, _Env...>;

template <class _Value>
struct __receiver_base
{
using receiver_concept = receiver_t;

template <class... _Us>
requires __std::constructible_from<__value_or_void_t<_Value>, _Us...>
void set_value(_Us&&... __us) noexcept
{
STDEXEC_TRY
{
__result_->template emplace<1>(static_cast<_Us&&>(__us)...);
__continuation_.resume();
__result_.template emplace<1>(static_cast<_Us&&>(__us)...);
}
STDEXEC_CATCH_ALL
{
STDEXEC::set_error(static_cast<__receiver_base&&>(*this), std::current_exception());
__result_.template emplace<2>(std::current_exception());
}
}

template <class _Error>
void set_error(_Error&& __err) noexcept
{
if constexpr (__decays_to<_Error, std::exception_ptr>)
__result_->template emplace<2>(static_cast<_Error&&>(__err));
__result_.template emplace<2>(static_cast<_Error&&>(__err));
else if constexpr (__decays_to<_Error, std::error_code>)
__result_->template emplace<2>(std::make_exception_ptr(std::system_error(__err)));
__result_.template emplace<2>(std::make_exception_ptr(std::system_error(__err)));
else
__result_->template emplace<2>(std::make_exception_ptr(static_cast<_Error&&>(__err)));
__continuation_.resume();
__result_.template emplace<2>(std::make_exception_ptr(static_cast<_Error&&>(__err)));
}

__expected_t<_Value>* __result_;
__std::coroutine_handle<> __continuation_;
__expected_t<_Value>& __result_;
};

template <class _Promise, class _Value>
struct __receiver : __receiver_base<_Value>
struct __sync_receiver : __receiver_base<_Value>
{
constexpr void set_stopped() noexcept
constexpr explicit __sync_receiver(__expected_t<_Value>& __result,
__std::coroutine_handle<_Promise> __continuation) noexcept
: __receiver_base<_Value>{__result}
, __continuation_{__continuation}
{}

void set_stopped() noexcept
{
auto __continuation = __coroutine_handle_cast<_Promise>(this->__continuation_);
// Do not use type deduction here so that we perform any conversions necessary on
// the stopped continuation:
__std::coroutine_handle<> __on_stopped = __continuation.promise().unhandled_stopped();
__on_stopped.resume();
// no-op: the __result_ variant will remain engaged with the monostate
// alternative, which signals that the operation was stopped.
}

// Forward get_env query to the coroutine promise
constexpr auto get_env() const noexcept -> env_of_t<_Promise&>
{
auto const __continuation = __coroutine_handle_cast<_Promise>(this->__continuation_);
return STDEXEC::get_env(__continuation.promise());
return STDEXEC::get_env(__continuation_.promise());
}

__std::coroutine_handle<_Promise> __continuation_;
};

// The receiver type used to connect to senders that could complete asynchronously.
template <class _Promise, class _Value>
struct __async_receiver : __sync_receiver<_Promise, _Value>
{
constexpr explicit __async_receiver(__expected_t<_Value>& __result,
__std::coroutine_handle<_Promise> __continuation) noexcept
: __sync_receiver<_Promise, _Value>{__result, __continuation}
{}

template <class... _Us>
void set_value(_Us&&... __us) noexcept
{
this->__sync_receiver<_Promise, _Value>::set_value(static_cast<_Us&&>(__us)...);
this->__continuation_.resume();
}

template <class _Error>
void set_error(_Error&& __err) noexcept
{
this->__sync_receiver<_Promise, _Value>::set_error(static_cast<_Error&&>(__err));
this->__continuation_.resume();
}

constexpr void set_stopped() noexcept
{
STDEXEC_TRY
{
// Resuming the stopped continuation unwinds the coroutine stack until we reach
// a promise that can handle the stopped signal. The coroutine referred to by
// __continuation_ will never be resumed.
__std::coroutine_handle<> __on_stopped =
this->__continuation_.promise().unhandled_stopped();
__on_stopped.resume();
}
STDEXEC_CATCH_ALL
{
this->__result_.template emplace<2>(std::current_exception());
this->__continuation_.resume();
}
}
};

template <class _Sender, class _Promise>
using __receiver_t = __receiver<_Promise, __detail::__value_t<_Sender, _Promise>>;
using __sync_receiver_t = __sync_receiver<_Promise, __detail::__value_t<_Sender, _Promise>>;

template <class _Sender, class _Promise>
using __async_receiver_t = __async_receiver<_Promise, __detail::__value_t<_Sender, _Promise>>;

template <class _Value>
struct __sender_awaitable_base
{
[[nodiscard]]
constexpr auto await_ready() const noexcept -> bool
static constexpr auto await_ready() noexcept -> bool
{
return false;
}

constexpr auto await_resume() -> _Value
{
switch (__result_.index())
// If the operation completed with set_stopped (as denoted by the monostate
// alternative being active), we should not be resuming this coroutine at all.
STDEXEC_ASSERT(__result_.index() != 0);
if (__result_.index() == 2)
{
case 0: // receiver contract not satisfied
STDEXEC_ASSERT(false && +"_Should never get here" == nullptr);
break;
case 1: // set_value
if constexpr (!__same_as<_Value, void>)
return static_cast<_Value&&>(std::get<1>(__result_));
else
return;
case 2: // set_error
std::rethrow_exception(std::get<2>(__result_));
// The operation completed with set_error, so we need to rethrow the exception.
std::rethrow_exception(std::move(std::get<2>(__result_)));
}
std::terminate();
// The operation completed with set_value, so we can just return the value, which
// may be void.
return static_cast<std::add_rvalue_reference_t<_Value>>(std::get<1>(__result_));
}

protected:
__expected_t<_Value> __result_;
__expected_t<_Value> __result_{};
};

//////////////////////////////////////////////////////////////////////////////////////
// __sender_awaitable: awaitable type returned by as_awaitable when given a sender
// that does not have an as_awaitable member function
template <class _Promise, class _Sender>
struct __sender_awaitable : __sender_awaitable_base<__detail::__value_t<_Sender, _Promise>>
{
constexpr __sender_awaitable(_Sender&& sndr, __std::coroutine_handle<_Promise> __hcoro)
noexcept(__nothrow_connectable<_Sender, __receiver>)
: __op_state_(connect(static_cast<_Sender&&>(sndr),
__receiver{
{&this->__result_, __hcoro}
}))
constexpr explicit __sender_awaitable(_Sender&& __sndr,
__std::coroutine_handle<_Promise> __hcoro)
noexcept(__nothrow_connectable<_Sender, __receiver_t>)
: __opstate_(STDEXEC::connect(static_cast<_Sender&&>(__sndr),
__receiver_t(this->__result_, __hcoro)))
{}

constexpr void await_suspend(__std::coroutine_handle<_Promise>) noexcept
{
STDEXEC::start(__op_state_);
STDEXEC::start(__opstate_);
}

private:
using __receiver_t = __async_receiver_t<_Sender, _Promise>;
connect_result_t<_Sender, __receiver_t> __opstate_;
};

// When the sender is known to complete inline, we can connect and start the operation
// in await_suspend.
template <class _Promise, class _Sender>
requires __completes_inline<_Sender, env_of_t<_Promise&>>
struct __sender_awaitable<_Promise, _Sender>
: __sender_awaitable_base<__detail::__value_t<_Sender, _Promise>>
{
constexpr explicit __sender_awaitable(_Sender&& sndr, __ignore)
noexcept(__nothrow_move_constructible<_Sender>)
: __sndr_(static_cast<_Sender&&>(sndr))
{}

bool await_suspend(__std::coroutine_handle<_Promise> __hcoro)
{
{
auto __opstate = STDEXEC::connect(static_cast<_Sender&&>(__sndr_),
__receiver_t(this->__result_, __hcoro));
// The following call to start will complete synchronously, writing its result
// into the __result_ variant.
STDEXEC::start(__opstate);
}

if (this->__result_.index() == 0)
{
// The operation completed with set_stopped, so we need to call
// unhandled_stopped() on the promise to propagate the stop signal. That will
// result in the coroutine being torn down, so beware. We then resume the
// returned coroutine handle (which may be a noop_coroutine).
__std::coroutine_handle<> __on_stopped = __hcoro.promise().unhandled_stopped();
__on_stopped.resume();

// By returning true, we indicate that the coroutine should not be resumed
// (because it no longer exists).
return true;
}

// The operation completed with set_value or set_error, so we can just resume the
// current coroutine. await_resume with either return the value or throw as
// appropriate.
return false;
}

private:
using __receiver = __receiver_t<_Sender, _Promise>;
connect_result_t<_Sender, __receiver> __op_state_;
using __receiver_t = __sync_receiver_t<_Sender, _Promise>;
_Sender __sndr_;
};

template <class _Sender, class _Promise>
Expand All @@ -211,7 +302,6 @@ namespace STDEXEC
template <class _Sender, class _Promise>
concept __awaitable_adapted_sender = sender_in<_Sender, env_of_t<_Promise&>>
&& __minvocable_q<__detail::__value_t, _Sender, _Promise>
&& sender_to<_Sender, __receiver_t<_Sender, _Promise>>
&& requires(_Promise& __promise) {
{
__promise.unhandled_stopped()
Expand Down
Loading
Loading