Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Clean repeat_ states
  • Loading branch information
justend29 committed Jan 27, 2026
commit 88883afe8cc805eac3222de53c2268a946fbaacf
27 changes: 18 additions & 9 deletions include/exec/repeat_n.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ namespace exec {

_Receiver __rcvr_;
std::size_t __count_;
trampoline_scheduler __sched_;
trampoline_scheduler __sched_{};

protected:
~__repeat_n_state_base() noexcept = default;
};

template <class _SenderId, class _ReceiverId>
Expand Down Expand Up @@ -101,25 +104,29 @@ namespace exec {
__child_count_pair(_Child, std::size_t) -> __child_count_pair<_Child>;

template <class _Sender, class _Receiver>
struct __repeat_n_state : __repeat_n_state_base<_Receiver> {
struct __repeat_n_state final : __repeat_n_state_base<_Receiver> {
using __child_count_pair_t = __decay_t<__data_of<_Sender>>;
using __child_t = decltype(__child_count_pair_t::__child_);
using __receiver_t = STDEXEC::__t<__receiver<__id<_Sender>, __id<_Receiver>>>;
using __child_on_sched_sender_t =
__result_of<exec::sequence, schedule_result_t<trampoline_scheduler>, __child_t &>;
using __child_op_t = STDEXEC::connect_result_t<__child_on_sched_sender_t, __receiver_t>;

constexpr explicit __repeat_n_state(_Sender &&__sndr, _Receiver &&__rcvr)
: __repeat_n_state_base<_Receiver>{
static_cast<_Receiver &&>(__rcvr),
constexpr explicit __repeat_n_state(_Sender &&__sndr, _Receiver &&__rcvr) noexcept(
std::is_nothrow_constructible_v<__child_t, STDEXEC::__tuple_element_t<1, _Sender &&>>
&& noexcept(__connect()))
: __repeat_n_state_base<
_Receiver
>{static_cast<_Receiver &&>(__rcvr),
STDEXEC::__get<1>(static_cast<_Sender &&>(__sndr)).__count_}
, __child_(STDEXEC::__get<1>(static_cast<_Sender &&>(__sndr)).__child_) {
if (this->__count_ != 0) {
__connect();
}
}

constexpr auto __connect() -> __child_op_t & {
constexpr __child_op_t &__connect()
noexcept(STDEXEC::__nothrow_connectable<__child_on_sched_sender_t, __receiver_t>) {
return __child_op_.__emplace_from(
STDEXEC::connect,
exec::sequence(STDEXEC::schedule(this->__sched_), __child_),
Expand All @@ -134,11 +141,11 @@ namespace exec {
}
}

constexpr void __cleanup() noexcept final {
constexpr void __cleanup() noexcept override {
__child_op_.reset();
}

constexpr void __repeat() noexcept final {
constexpr void __repeat() noexcept override {
STDEXEC_ASSERT(this->__count_ > 0);
STDEXEC_TRY {
if (--this->__count_ == 0) {
Expand All @@ -149,7 +156,9 @@ namespace exec {
}
}
STDEXEC_CATCH_ALL {
STDEXEC::set_error(std::move(this->__rcvr_), std::current_exception());
if constexpr (!STDEXEC::__nothrow_connectable<__child_on_sched_sender_t, __receiver_t>) {
STDEXEC::set_error(std::move(this->__rcvr_), std::current_exception());
}
}
}

Expand Down
64 changes: 41 additions & 23 deletions include/exec/repeat_until.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,25 @@ namespace exec {

template <class _Receiver>
struct __repeat_state_base {
constexpr explicit __repeat_state_base(_Receiver &&__rcvr)
: __rcvr_{static_cast<_Receiver &&>(__rcvr)} {
constexpr explicit __repeat_state_base(_Receiver &&__rcvr) noexcept
: __rcvr_{std::move(__rcvr)} {
static_assert(
std::is_nothrow_default_constructible_v<trampoline_scheduler>,
"trampoline_scheduler c'tor is always expected to be noexcept");
}

virtual constexpr void __cleanup() noexcept = 0;
virtual constexpr void __repeat() noexcept = 0;

_Receiver __rcvr_;
trampoline_scheduler __sched_;
trampoline_scheduler __sched_{};

protected:
~__repeat_state_base() noexcept = default;
};

template <class _Bool, bool _Expected>
concept __bool_constant = __decay_t<_Bool>::value == _Expected;
template <class _Boolean, bool _Expected>
concept __bool_constant = __decay_t<_Boolean>::value == _Expected;

template <class _ReceiverId>
struct __receiver {
Expand All @@ -55,22 +61,22 @@ namespace exec {
using __id = __receiver;
using receiver_concept = STDEXEC::receiver_t;

template <class... _Bools>
constexpr void set_value(_Bools &&...__bools) noexcept {
if constexpr ((__bool_constant<_Bools, true> && ...)) {
template <class... _Booleans>
constexpr void set_value(_Booleans &&...__bools) noexcept {
if constexpr ((__bool_constant<_Booleans, true> && ...)) {
// Always done:
__state_->__cleanup();
STDEXEC::set_value(std::move(__state_->__rcvr_));
} else if constexpr ((__bool_constant<_Bools, false> && ...)) {
} else if constexpr ((__bool_constant<_Booleans, false> && ...)) {
// Never done:
__state_->__repeat();
} else {
// Mixed results:
constexpr bool __is_nothrow = noexcept(
(static_cast<bool>(static_cast<_Bools &&>(__bools)) && ...));
(static_cast<bool>(static_cast<_Booleans &&>(__bools)) && ...));
STDEXEC_TRY {
// If the child sender completed with true, we're done
const bool __done = (static_cast<bool>(static_cast<_Bools &&>(__bools)) && ...);
const bool __done = (static_cast<bool>(static_cast<_Booleans &&>(__bools)) && ...);
if (__done) {
__state_->__cleanup();
STDEXEC::set_value(std::move(__state_->__rcvr_));
Expand All @@ -81,8 +87,7 @@ namespace exec {
STDEXEC_CATCH_ALL {
if constexpr (!__is_nothrow) {
__state_->__cleanup();
STDEXEC::set_error(
std::move(__state_->__rcvr_), std::current_exception());
STDEXEC::set_error(std::move(__state_->__rcvr_), std::current_exception());
}
}
}
Expand All @@ -98,8 +103,7 @@ namespace exec {
STDEXEC_CATCH_ALL {
if constexpr (!__nothrow_decay_copyable<_Error>) {
__state_->__cleanup();
STDEXEC::set_error(
std::move(__state_->__rcvr_), std::current_exception());
STDEXEC::set_error(std::move(__state_->__rcvr_), std::current_exception());
}
}
}
Expand All @@ -122,20 +126,23 @@ namespace exec {
STDEXEC_PRAGMA_IGNORE_GNU("-Wtsan")

template <class _Sender, class _Receiver>
struct __repeat_state : __repeat_state_base<_Receiver> {
struct __repeat_state final : __repeat_state_base<_Receiver> {
using __child_t = __decay_t<__data_of<_Sender>>;
using __receiver_t = STDEXEC::__t<__receiver<__id<_Receiver>>>;
using __child_on_sched_sender_t =
__result_of<exec::sequence, schedule_result_t<trampoline_scheduler>, __child_t &>;
using __child_op_t = STDEXEC::connect_result_t<__child_on_sched_sender_t, __receiver_t>;

constexpr explicit __repeat_state(_Sender &&__sndr, _Receiver &&__rcvr)
constexpr explicit __repeat_state(_Sender &&__sndr, _Receiver &&__rcvr) noexcept(
std::is_nothrow_constructible_v<__child_t, STDEXEC::__tuple_element_t<1, _Sender &&>>
&& noexcept(__connect()))
: __repeat_state_base<_Receiver>(static_cast<_Receiver &&>(__rcvr))
, __child_(STDEXEC::__get<1>(static_cast<_Sender &&>(__sndr))) {
__connect();
}

constexpr auto __connect() -> __child_op_t & {
constexpr __child_op_t &__connect()
noexcept(STDEXEC::__nothrow_connectable<__child_on_sched_sender_t, __receiver_t>) {
return __child_op_.__emplace_from(
STDEXEC::connect,
exec::sequence(STDEXEC::schedule(this->__sched_), __child_),
Expand All @@ -146,16 +153,18 @@ namespace exec {
STDEXEC::start(*__child_op_);
}

constexpr void __cleanup() noexcept final {
constexpr void __cleanup() noexcept override {
__child_op_.reset();
}

constexpr void __repeat() noexcept final {
constexpr void __repeat() noexcept override {
STDEXEC_TRY {
STDEXEC::start(__connect());
}
STDEXEC_CATCH_ALL {
STDEXEC::set_error(static_cast<_Receiver &&>(this->__rcvr_), std::current_exception());
if constexpr (!STDEXEC::__nothrow_connectable<__child_on_sched_sender_t, __receiver_t>) {
STDEXEC::set_error(static_cast<_Receiver &&>(this->__rcvr_), std::current_exception());
}
}
}

Expand Down Expand Up @@ -202,8 +211,6 @@ namespace exec {
__mbind_front_q<__values_t, _Sender>::template __f
>;

struct __repeat_tag { };

struct __repeat_until_tag { };

struct __repeat_until_impl : __sexpr_defaults {
Expand Down Expand Up @@ -304,4 +311,15 @@ namespace STDEXEC {
return STDEXEC::get_completion_signatures<__sndr_t, _Env...>();
}
};

template <>
struct __sexpr_impl<exec::repeat_t> : __sexpr_defaults {
template <class _Sender, class... _Env>
static consteval auto get_completion_signatures() {
static_assert(sender_expr_for<_Sender, exec::repeat_t>);
using __sndr_t =
__detail::__transform_sender_result_t<exec::repeat_t, set_value_t, _Sender, env<>>;
return STDEXEC::get_completion_signatures<__sndr_t, _Env...>();
}
};
} // namespace STDEXEC
4 changes: 2 additions & 2 deletions include/stdexec/__detail/__bulk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ namespace STDEXEC {
>
requires is_execution_policy_v<std::remove_cvref_t<_Policy>>
STDEXEC_ATTRIBUTE(host, device)
auto operator()(_Sender&& __sndr, _Policy&& __pol, _Shape __shape, _Fun __fun) const
constexpr auto operator()(_Sender&& __sndr, _Policy&& __pol, _Shape __shape, _Fun __fun) const
-> __well_formed_sender auto {
return __make_sexpr<_AlgoTag>(
__data{__pol, __shape, static_cast<_Fun&&>(__fun)}, static_cast<_Sender&&>(__sndr));
Expand All @@ -197,7 +197,7 @@ namespace STDEXEC {
template <typename _Policy, __std::integral _Shape, __std::copy_constructible _Fun>
requires is_execution_policy_v<std::remove_cvref_t<_Policy>>
STDEXEC_ATTRIBUTE(always_inline)
auto operator()(_Policy&& __pol, _Shape __shape, _Fun __fun) const {
constexpr auto operator()(_Policy&& __pol, _Shape __shape, _Fun __fun) const {
return __closure(
*this,
static_cast<_Policy&&>(__pol),
Expand Down
32 changes: 19 additions & 13 deletions include/stdexec/__detail/__let.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ namespace STDEXEC {
}

[[nodiscard]]
constexpr auto get_env() const noexcept {
constexpr decltype(__env::__join(
__declval<const _Env2&>(),
__declval<STDEXEC::env_of_t<_Receiver&>>())) get_env() const noexcept {
return __env::__join(__env_, STDEXEC::get_env(__rcvr_));
}

Expand Down Expand Up @@ -648,7 +650,20 @@ namespace STDEXEC {
};

template <class _SetTag>
struct __let_impl : __sexpr_defaults {
class __let_impl : public __sexpr_defaults {
template <typename _Sender, typename _Receiver>
using __state_t = __gather_completions_of_t<
_SetTag,
__child_of_t<_Sender>,
__fwd_env_t<env_of_t<_Receiver>>,
__q<__decayed_tuple>,
__mbind_front_q<
__state,
_SetTag,
__child_of_t<_Sender>,
__decay_t<__fn_of_t<_Sender>>,
_Receiver>>;
public:
static constexpr auto get_attrs =
[]<class _Child, class _Fun>(
const __data<_Child, _Fun>& __data) noexcept -> decltype(auto) {
Expand Down Expand Up @@ -679,23 +694,14 @@ namespace STDEXEC {

static constexpr auto get_state =
[]<class _Receiver, __decay_copyable _Sender>(_Sender&& __sndr, _Receiver&& __rcvr)
-> __state_t<_Sender, _Receiver>
requires sender_in<__child_of_t<_Sender>, __fwd_env_t<env_of_t<_Receiver>>>
// TODO(ericniebler): make this conditionally noexcept
{
static_assert(sender_expr_for<_Sender, __let_tag<_SetTag>>);
using __child_t = __child_of_t<_Sender>;
using __fn_t = __decay_t<__fn_of_t<_Sender>>;
using __mk_state = __mbind_front_q<__state, _SetTag, __child_t, __fn_t, _Receiver>;
using __state_t = __gather_completions_of_t<
_SetTag,
__child_t,
__fwd_env_t<env_of_t<_Receiver>>,
__q<__decayed_tuple>,
__mk_state
>;
auto& [__tag, __data] = __sndr;
auto& [__child, __fn] = __data;
return __state_t(
return __state_t<_Sender, _Receiver>(
STDEXEC::__forward_like<_Sender>(__child),
STDEXEC::__forward_like<_Sender>(__fn),
static_cast<_Receiver&&>(__rcvr));
Expand Down
5 changes: 3 additions & 2 deletions include/stdexec/__detail/__sender_adaptor_closure.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ namespace STDEXEC {
template <sender _Sender>
requires __callable<_Fn, _Sender, _As...>
STDEXEC_ATTRIBUTE(host, device, always_inline)
auto operator()(_Sender&& __sndr) && noexcept(__nothrow_callable<_Fn, _Sender, _As...>) {
constexpr auto
operator()(_Sender&& __sndr) && noexcept(__nothrow_callable<_Fn, _Sender, _As...>) {
return STDEXEC::__apply(
static_cast<_Fn&&>(__fn_),
static_cast<__tuple<_As...>&&>(__args_),
Expand All @@ -103,7 +104,7 @@ namespace STDEXEC {
template <sender _Sender>
requires __callable<const _Fn&, _Sender, const _As&...>
STDEXEC_ATTRIBUTE(host, device, always_inline)
auto operator()(_Sender&& __sndr) const & noexcept(
constexpr auto operator()(_Sender&& __sndr) const & noexcept(
__nothrow_callable<const _Fn&, _Sender, const _As&...>) {
return STDEXEC::__apply(__fn_, __args_, static_cast<_Sender&&>(__sndr));
}
Expand Down
10 changes: 6 additions & 4 deletions include/stdexec/__detail/__tuple.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,10 @@ namespace STDEXEC {
template <class... _Ts>
using __tuple_t = __mcall<_CvRef, __tuple<_Ts...>>;

template <class... _Ts, class... _Us, __callable<_Us..., __mcall<_CvRef, _Ts>...> _Fn>
void operator()(_Fn&& __fn, __tuple_t<_Ts...>&& __tupl, _Us&&... __us) const
noexcept(__nothrow_callable<_Fn, _Us..., __mcall<_CvRef, _Ts>...>);
template <class... _Ts, class... _Us, __callable<_Us..., __mcall1<_CvRef, _Ts>...> _Fn>
auto operator()(_Fn&& __fn, __tuple_t<_Ts...>&& __tupl, _Us&&... __us) const
noexcept(__nothrow_callable<_Fn, _Us..., __mcall1<_CvRef, _Ts>...>)
-> __call_result_t<_Fn, _Us..., __mcall1<_CvRef, _Ts>...>;
};

template <class _Tuple>
Expand All @@ -232,7 +233,8 @@ namespace STDEXEC {
requires __callable<__impl_t<_Tuple>, _Fn, _Tuple, _Us...>
STDEXEC_ATTRIBUTE(always_inline, host, device)
constexpr auto operator()(_Fn&& __fn, _Tuple&& __tupl, _Us&&... __us) const
noexcept(__nothrow_callable<__impl_t<_Tuple>, _Fn, _Tuple, _Us...>) -> decltype(auto) {
noexcept(__nothrow_callable<__impl_t<_Tuple>, _Fn, _Tuple, _Us...>)
-> __call_result_t<__impl_t<_Tuple>, _Fn, _Tuple, _Us...> {
constexpr size_t __size = STDEXEC_REMOVE_REFERENCE(_Tuple)::__size;

if constexpr (__size == 0) {
Expand Down
9 changes: 9 additions & 0 deletions test/exec/test_fork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,13 @@ namespace {
CHECK(i1 == 42);
CHECK(i2 == 42);
}

TEST_CASE("fork_join with empty value channel", "[adaptors][fork_join]") {
auto sndr = ::STDEXEC::just() | ::STDEXEC::then([]() noexcept -> void { })
| exec::fork_join(
::STDEXEC::then([]() noexcept -> void { }),
::STDEXEC::then([]() noexcept -> void { }));

::STDEXEC::sync_wait(std::move(sndr));
}
} // namespace
Loading