Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
coro promise mixin to make senders awaitable. doesn't handle done()
  • Loading branch information
ericniebler committed Aug 24, 2021
commit 3ce7736b0f7f3be967586f942d4c7c2c0176d802
11 changes: 8 additions & 3 deletions examples/hello_coro.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@

#include "./task.hpp"

task<int> async_answer() {
co_return 42;
using namespace std::execution;

template <typed_sender S1, typed_sender S2>
task<int> async_answer(S1 s1, S2 s2) {
// Senders are implicitly awaitable (int this coroutine type):
co_await (S2&&) s2;
co_return co_await (S1&&) s1;
}

int main() {
// Awaitables are implicitly senders:
auto [i] = std::this_thread::sync_wait(async_answer()).value();
auto [i] = std::this_thread::sync_wait(async_answer(just(42), just())).value();
std::cout << "The answer is " << i << '\n';
}
3 changes: 2 additions & 1 deletion examples/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ struct task {
void await_resume() const noexcept {
}
};
// In a base class so it can be specialized when T is void:
struct _promise_base {
void return_value(T value) noexcept {
data_.template emplace<1>(std::move(value));
}
std::variant<std::monostate, T, std::exception_ptr> data_{};
};
struct promise_type : _promise_base {
struct promise_type : _promise_base, std::execution::with_awaitable_senders<promise_type> {
task get_return_object() noexcept {
return task(coro::coroutine_handle<promise_type>::from_promise(*this));
}
Expand Down
172 changes: 164 additions & 8 deletions include/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include <cassert>
#include <stdexcept>
#include <type_traits>
#include <tuple>
Expand Down Expand Up @@ -168,6 +169,25 @@ namespace std::execution {
sender<S> &&
__has_sender_types<sender_traits<remove_cvref_t<S>>>;

template <class... As>
using __back_t = decltype((static_cast<As(*)()>(0),...)());
template <class... As>
requires (sizeof...(As) == 1)
using __single_t = __back_t<As...>;
template <class... As>
requires (sizeof...(As) <= 1)
using __single_or_void_t = __back_t<void, As...>;

template<class S>
using __single_sender_result_t =
typename sender_traits<remove_cvref_t<S>>
::template value_types<__single_or_void_t, __single_t>;

template<class S>
concept __single_typed_sender =
typed_sender<S> &&
requires { typename __single_sender_result_t<S>; };

/////////////////////////////////////////////////////////////////////////////
// [execution.op_state]
inline namespace __start {
Expand Down Expand Up @@ -275,7 +295,7 @@ namespace std::execution {

inline constexpr struct __fn {
private:
template <__awaitable A, receiver R>
template <class A, class R>
static __impl::__op<__id_t<remove_cvref_t<R>>> __impl(A&& a, R&& r) {
exception_ptr ex;
try {
Expand Down Expand Up @@ -344,6 +364,145 @@ namespace std::execution {
template<class S, class R>
using connect_result_t = tag_invoke_result_t<connect_t, S, R>;

/////////////////////////////////////////////////////////////////////////////
// NOT TO SPEC: as_awaitable and with_awaitable_senders
inline namespace __with_awaitable_senders {
namespace __impl {
struct __void {};
template <class Value>
using __value_or_void_t =
conditional_t<is_void_v<Value>, __void, Value>;
template <class Value>
using __expected_t =
variant<monostate, __value_or_void_t<Value>, std::exception_ptr, set_done_t>;

template <class Value>
struct __rec_base {
explicit __rec_base(__expected_t<Value>* result, coro::coroutine_handle<> continuation) noexcept
: result_(result)
, continuation_(continuation)
{}

__rec_base(__rec_base&& r) noexcept
: result_(std::exchange(r.result_, nullptr))
, continuation_(std::exchange(r.continuation_, nullptr))
{}

template <class... Us>
requires constructible_from<Value, Us...> ||
(is_void_v<Value> && sizeof...(Us) == 0)
friend void tag_invoke(set_value_t, __rec_base&& self, Us&&... us)
noexcept(is_nothrow_constructible_v<Value, Us...> ||
is_void_v<Value>) {
self.result_->template emplace<1>((Us&&) us...);
self.continuation_.resume();
}

friend void tag_invoke(set_error_t, __rec_base&& self, exception_ptr eptr) noexcept {
self.result_->template emplace<2>((exception_ptr&&) eptr);
self.continuation_.resume();
}

__expected_t<Value>* result_;
coro::coroutine_handle<> continuation_;
};

template <typename P_, typename Value>
struct __awaitable_base {
using Promise = __t<P_>;
struct __rec : __rec_base<Value> {
using __rec_base<Value>::__rec_base;

friend void tag_invoke(set_done_t, __rec&& self) noexcept {
self.result_->template emplace<3>(set_done);
self.continuation_.resume(); // TODO: propose unhandled_done() ?
//self.continuation_.promise().unhandled_done().resume();
}

// Forward other tag_invoke overloads to the promise
template <__same_<__rec> Self, class... As, invocable<__member_t<Self, Promise>&, As...> CPO>
friend auto tag_invoke(CPO cpo, Self&& self, As&&... as)
noexcept(is_nothrow_invocable_v<CPO, __member_t<Self, Promise>&, As...>)
-> invoke_result_t<CPO, __member_t<Self, Promise>&, As...> {
auto continuation = coro::coroutine_handle<Promise>::from_address(
self.continuation_.address());
__member_t<Self, Promise>& p = continuation.promise();
return ((CPO&&) cpo)(p);
}
};

bool await_ready() const noexcept {
return false;
}

Value await_resume() {
switch (result_.index()) {
case 0:
assert(!"Should never get here");
break;
case 1: // value
if constexpr (!is_void_v<Value>)
return (Value&&) std::get<1>(result_);
else
return;
case 2: // exception
std::rethrow_exception(std::get<2>(result_));
case 3: // done
assert(!"Not implemented yet");
break;
}
terminate();
}

protected:
__expected_t<Value> result_;
};

template <typename P_, typename S_>
struct __awaitable
: __awaitable_base<P_, __single_sender_result_t<__t<S_>>> {
private:
using Promise = __t<P_>;
using Sender = __t<S_>;
using Base = __awaitable_base<P_, __single_sender_result_t<Sender>>;
using __rec = typename Base::__rec;
connect_result_t<Sender, __rec> op_;
public:
__awaitable(Sender&& sender, coro::coroutine_handle<Promise> h)
noexcept(/* TODO: is_nothrow_connectable_v<Sender, __rec>*/ false)
: op_(connect((Sender&&)sender, __rec{&this->result_, h}))
{}

void await_suspend(coro::coroutine_handle<Promise>) noexcept {
start(op_);
}
};
}

inline constexpr struct as_awaitable_t {
template <__single_typed_sender S, class Promise>
auto operator()(S&& s, Promise& promise) const
noexcept(/*TODO*/ false)
-> __impl::__awaitable<__id_t<Promise>, __id_t<remove_cvref_t<S>>> {
auto h = coro::coroutine_handle<Promise>::from_promise(promise);
return {(S&&) s, h};
}
} as_awaitable{};

template <class Promise>
struct with_awaitable_senders {
template <class Value>
decltype(auto) await_transform(Value&& value) {
if constexpr (__awaitable<Value>)
return (Value&&) value;
else if constexpr (sender<Value>)
return as_awaitable((Value&&) value, static_cast<Promise&>(*this));
else
return (Value&&) value;
}
};
}

/////////////////////////////////////////////////////////////////////////////
// [execution.senders.consumer.start_detached]
// TODO: turn this into start_detached
Expand Down Expand Up @@ -414,8 +573,9 @@ namespace std::execution {
schedule((S&&) s);
};

// NOT TO SPEC
template <scheduler S>
using __schedule_result_t = decltype(schedule(std::declval<S>()));
using schedule_result_t = decltype(schedule(std::declval<S>()));

/////////////////////////////////////////////////////////////////////////////
// [execution.senders.factories]
Expand Down Expand Up @@ -494,6 +654,7 @@ namespace std::execution {
friend void tag_invoke(set_done_t, __as_receiver&&) noexcept {}
};
}

inline constexpr struct execute_t {
template<scheduler Sch, class F>
requires invocable<F&> && move_constructible<F>
Expand Down Expand Up @@ -714,16 +875,11 @@ namespace std::this_thread {
// [execution.senders.consumers.sync_wait]
inline namespace __sync_wait {
namespace __impl {
template <class... As>
using __back_t = decltype((static_cast<As(*)()>(0),...)());
template <class... As>
requires (sizeof...(As) == 1)
using __single_t = __back_t<As...>;
template <class S>
using __sync_wait_result_t =
typename execution::sender_traits<
remove_cvref_t<S>
>::template value_types<tuple, __impl::__single_t>;
>::template value_types<tuple, execution::__single_t>;

template <class Tuple>
struct __state {
Expand Down