Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add stdexec::associate
This diff defines `stdexec::associate` and adds some initial tests to
confirm it works properly. Still a work in progress.
  • Loading branch information
ispeters committed Dec 17, 2025
commit 66f394c604d6f6cfb65f49d9a000d80f81fb304b
216 changes: 216 additions & 0 deletions include/stdexec/__detail/__associate.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright (c) 2025 Ian Petersen
* Copyright (c) 2025 NVIDIA Corporation
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "__execution_fwd.hpp"

#include "__basic_sender.hpp"
#include "__concepts.hpp"
#include "__diagnostics.hpp"
#include "__queries.hpp"
#include "__scope_concepts.hpp"
#include "__senders.hpp"
#include "__sender_adaptor_closure.hpp"

namespace stdexec {
/////////////////////////////////////////////////////////////////////////////
// [exec.associate]
namespace __associate {
template <scope_token _Token, sender _Sender>
struct __associate_data {
using __wrap_result_t = decltype(__declval<_Token&>().wrap(__declval<_Sender>()));
using __wrap_sender_t = std::remove_cvref_t<__wrap_result_t>;

using __assoc_t = decltype(__declval<_Token&>().try_associate());

using __sender_ref =
std::unique_ptr<__wrap_sender_t, decltype([](auto* p) noexcept { std::destroy_at(p); })>;

// BUGBUG: should the spec require __token to be declared as a const _Token, or should this be
// changed to declare __token as a mutable _Token?
explicit __associate_data(const _Token __token, _Sender&& __sndr) noexcept(
__nothrow_constructible_from<__wrap_sender_t, __wrap_result_t>
&& noexcept(__token.wrap(static_cast<_Sender&&>(__sndr)))
&& noexcept(__token.try_associate()))
: __sndr_(__token.wrap(static_cast<_Sender&&>(__sndr)))
, __assoc_([&] {
__sender_ref guard{std::addressof(__sndr_)};

auto assoc = __token.try_associate();

if (assoc) {
(void) guard.release();
}

return assoc;
}()) {
}

__associate_data(const __associate_data& __other) noexcept(
__nothrow_copy_constructible<__wrap_sender_t> && noexcept(__other.__assoc_.try_associate()))
requires copy_constructible<__wrap_sender_t>
: __assoc_(__other.__assoc_.try_associate()) {
if (__assoc_) {
std::construct_at(&__sndr_, __other.__sndr_);
}
}

__associate_data(__associate_data&& __other)
noexcept(__nothrow_move_constructible<__wrap_sender_t>)
: __associate_data(std::move(__other).release()) {
}

~__associate_data() {
if (__assoc_) {
std::destroy_at(&__sndr_);
}
}

std::pair<__assoc_t, __sender_ref> release() && noexcept {
__sender_ref u(__assoc_ ? std::addressof(__sndr_) : nullptr);
return {std::move(__assoc_), std::move(u)};
}

private:
__associate_data(std::pair<__assoc_t, __sender_ref> __parts)
: __assoc_(std::move(__parts.first)) {
if (__assoc_) {
std::construct_at(&__sndr_, std::move(*__parts.second));
}
}

union {
__wrap_sender_t __sndr_;
};
__assoc_t __assoc_;
};

template <scope_token _Token, sender _Sender>
__associate_data(_Token, _Sender&&) -> __associate_data<_Token, _Sender>;

////////////////////////////////////////////////////////////////////////////////////////////////
struct associate_t {
template <sender _Sender, scope_token _Token>
auto operator()(_Sender&& __sndr, _Token&& __token) const
noexcept(__nothrow_constructible_from<
__associate_data<std::remove_cvref_t<_Token>, _Sender>,
_Token,
_Sender
>) -> __well_formed_sender auto {
return __make_sexpr<associate_t>(
__associate_data(static_cast<_Token&&>(__token), static_cast<_Sender&&>(__sndr)));
}

template <scope_token _Token>
STDEXEC_ATTRIBUTE(always_inline)
auto operator()(_Token&& __token) const noexcept {
return __closure(*this, static_cast<_Token&&>(__token));
}
};

struct __associate_impl : __sexpr_defaults {
static constexpr auto get_attrs = []<class _Child>(__ignore, const _Child& __child) noexcept {
return __sync_attrs{__child};
};

static constexpr auto get_completion_signatures =
[]<class _Sender, class... _Env>(_Sender&&, _Env&&...) noexcept
-> transform_completion_signatures<
__completion_signatures_of_t<typename __data_of<_Sender>::__wrap_sender_t>,
completion_signatures<set_stopped_t()>
> {
static_assert(sender_expr_for<_Sender, associate_t>);
return {};
};

static constexpr auto get_state =
[]<class _Self, class _Receiver>(_Self&& __self, _Receiver& __rcvr) noexcept(
(same_as<_Self, std::remove_cvref_t<_Self>>
|| __nothrow_constructible_from<std::remove_cvref_t<_Self>, _Self>) &&
__nothrow_callable<
connect_t,
typename std::remove_cvref_t<__data_of<_Self>>::__wrap_sender_t,
_Receiver
>) {
auto&& [_, data] = std::forward<_Self>(__self);

using associate_data_t = std::remove_cvref_t<decltype(data)>;
using assoc_t = associate_data_t::__assoc_t;
using sender_ref_t = associate_data_t::__sender_ref;

using op_t = connect_result_t<typename sender_ref_t::element_type, _Receiver>;

struct op_state {
assoc_t __assoc_;
union {
_Receiver* __rcvr_;
op_t __op_;
};

explicit op_state(std::pair<assoc_t, sender_ref_t> parts, _Receiver r)
: __assoc_(std::move(parts.first)) {
if (__assoc_) {
::new ((void*) std::addressof(__op_))
op_t(connect(std::move(*parts.second), std::move(r)));
} else {
__rcvr_ = std::addressof(r);
}
}

explicit op_state(associate_data_t&& ad, _Receiver& r)
: op_state(std::move(ad).release(), r) {
}

explicit op_state(const associate_data_t& ad, _Receiver& r)
requires copy_constructible<associate_data_t>
: op_state(associate_data_t(ad).release(), r) {
}

~op_state() {
if (__assoc_) {
std::destroy_at(&__op_);
}
}

void __run() noexcept {
if (__assoc_) {
stdexec::start(__op_);
} else {
stdexec::set_stopped(std::move(*__rcvr_));
}
}
};

return op_state{__forward_like<_Self>(data), __rcvr};
};

static constexpr auto start = [](auto& __state, auto&) noexcept -> void {
__state.__run();
};
};
} // namespace __associate

using __associate::associate_t;

/// @brief The associate sender adaptor, which associates a sender with the
/// async scope referred to by the given token
/// @hideinitializer
inline constexpr associate_t associate{};

template <>
struct __sexpr_impl<associate_t> : __associate::__associate_impl { };
} // namespace stdexec
3 changes: 2 additions & 1 deletion include/stdexec/__detail/__sender_adaptor_closure.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ namespace stdexec {

template <sender _Sender, __sender_adaptor_closure_for<_Sender> _Closure>
STDEXEC_ATTRIBUTE(always_inline)
auto operator|(_Sender&& __sndr, _Closure&& __clsur) -> __call_result_t<_Closure, _Sender> {
auto operator|(_Sender&& __sndr, _Closure&& __clsur)
noexcept(__nothrow_callable<_Closure, _Sender>) -> __call_result_t<_Closure, _Sender> {
return static_cast<_Closure&&>(__clsur)(static_cast<_Sender&&>(__sndr));
}

Expand Down
1 change: 1 addition & 0 deletions include/stdexec/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

// include these after __execution_fwd.hpp
#include "__detail/__as_awaitable.hpp" // IWYU pragma: export
#include "__detail/__associate.hpp" // IWYU pragma: export
#include "__detail/__basic_sender.hpp" // IWYU pragma: export
#include "__detail/__bulk.hpp" // IWYU pragma: export
#include "__detail/__completion_signatures.hpp" // IWYU pragma: export
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ set(stdexec_test_sources
stdexec/algos/factories/test_just_stopped.cpp
stdexec/algos/factories/test_read.cpp
stdexec/algos/factories/test_schedule.cpp
stdexec/algos/adaptors/test_associate.cpp
stdexec/algos/adaptors/test_starts_on.cpp
stdexec/algos/adaptors/test_on.cpp
stdexec/algos/adaptors/test_on2.cpp
Expand Down
121 changes: 121 additions & 0 deletions test/stdexec/algos/adaptors/test_associate.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2025 Ian Petersen
* Copyright (c) 2025 NVIDIA Corporation
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <catch2/catch.hpp>
#include <stdexec/execution.hpp>

namespace ex = stdexec;

namespace {
struct null_token {
struct assoc {
constexpr operator bool() const noexcept {
return true;
}

constexpr assoc try_associate() const noexcept {
return {};
}
};

template <ex::sender Sender>
constexpr Sender&& wrap(Sender&& sndr) const noexcept {
return std::forward<Sender>(sndr);
}

constexpr assoc try_associate() const noexcept {
return {};
}
};

TEST_CASE("associate returns a sender", "[adaptors][associate]") {
auto snd = ex::associate(ex::just(), null_token{});
STATIC_REQUIRE(ex::sender<decltype(snd)>);
(void) snd;
}

TEST_CASE("associate is appropriately noexcept", "[adaptors][associate]") {
// double-check our dependencies
STATIC_REQUIRE(noexcept(ex::just()));
STATIC_REQUIRE(noexcept(null_token{}));

// null_token is no-throw default constructible and tokens must be no-throw
// copyable and movable so this whole thing had better be no-throw
STATIC_REQUIRE(noexcept(ex::associate(null_token{})));

// constructing and passing in a no-throw sender should let the whole
// expression be no-throw
STATIC_REQUIRE(noexcept(ex::associate(ex::just(), null_token{})));
STATIC_REQUIRE(noexcept(ex::just() | ex::associate(null_token{})));

// conversely, trafficking in senders with potentially-throwing copy
// constructors should lead to the whole expression becoming potentially-throwing
const auto justString = ex::just(std::string{"Copying strings is potentially-throwing"});
STATIC_REQUIRE(!noexcept(ex::associate(justString, null_token{})));
STATIC_REQUIRE(!noexcept(justString | ex::associate(null_token{})));
(void) justString;
}

template <class Sender, class... CompSig>
constexpr bool expected_completion_signatures() {
using expected_sigs = ex::completion_signatures<CompSig...>;
using actual_sigs = ex::completion_signatures_of_t<Sender>;
return expected_sigs{} == actual_sigs{};
}

TEST_CASE("associate has appropriate completion signatures", "[adaptors][associate]") {
STATIC_REQUIRE(
expected_completion_signatures<
decltype(ex::associate(ex::just(), null_token{})),
ex::set_value_t(),
ex::set_stopped_t()
>());

STATIC_REQUIRE(
expected_completion_signatures<
decltype(ex::associate(ex::just(std::string{}), null_token{})),
ex::set_value_t(std::string),
ex::set_stopped_t()
>);

STATIC_REQUIRE(
expected_completion_signatures<
decltype(ex::associate(ex::just_stopped(), null_token{})),
ex::set_stopped_t()
>);

STATIC_REQUIRE(
expected_completion_signatures<
decltype(ex::associate(ex::just_error(5), null_token{})),
ex::set_error_t(int),
ex::set_stopped_t()
>);
}

// TODO: confirm that running an associate-sender produces the expected output
// variations:
// - with a null_token, it's the identity
// - with an always_expired_token, it's just_stopped
// - change the state of a token between copies; the copy is just_stopped
// TODO: confirm that `associate(foo(), token{})` destroys resources owned by foo()
// when token{} is expired
// TODO: check the pass-through nature of __sync_attrs
// TODO: check the pass-through stop request behaviour
// TODO: confirm that senders-of-references forward references when associated
// TODO: confirm timing of destruction of opstate relative to release of association
// TODO: confirm that the TODO list is exhaustive
} // namespace