Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add a test for the dynamic version
  • Loading branch information
maikel committed Nov 14, 2023
commit f57e5a2b0297623e23e02391148665d893bfef43
59 changes: 38 additions & 21 deletions include/exec/sequence/merge_each.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ namespace exec {
template <class _ItemReceiver, class _Receiver, class _ErrorsVariant>
struct __dynamic_item_operation_base {
_ItemReceiver __item_receiver_;
__operation_base<_Receiver, _ErrorsVariant>* __parent_;
using __stop_token_t = stop_token_of_t<env_of_t<_ItemReceiver>>;
using __stop_callback_t = typename __stop_token_t::template callback_type<
__dynamic_item_stop<_Receiver, _ErrorsVariant>>;
std::optional<__stop_callback_t> __on_item_receiver_stopped_{};
__operation_base<_Receiver, _ErrorsVariant>* __parent_;
};

template <class _ItemReceiverId, class _ReceiverId, class _ErrorsVariant>
Expand All @@ -220,10 +220,6 @@ namespace exec {

template <class _ItemReceiverId, class _ReceiverId, class _ErrorsVariant>
class __dynamic_next_receiver<_ItemReceiverId, _ReceiverId, _ErrorsVariant>::__t {
public:
using __id = __dynamic_next_receiver;
using is_receiver = void;

private:
using _Receiver = stdexec::__t<_ReceiverId>;
using _ItemReceiver = stdexec::__t<_ItemReceiverId>;
Expand All @@ -240,7 +236,7 @@ namespace exec {
template <same_as<set_next_t> _SetNext, same_as<__t> _Self, class _Sender>
requires __callable<_SetNext, _Receiver&, _Sender>
friend auto tag_invoke(_SetNext, _Self& __self, _Sender&& sender) noexcept(
__nothrow_callable<_SetNext, _Receiver&, _Sender>) -> next_sender_of_t<_Receiver, _Sender> {
__nothrow_callable<_SetNext, _Receiver&, _Sender>) { // -> next_sender_of_t<_Receiver, _Sender> {
return exec::set_next(__self.__op_->__parent_->__receiver_, static_cast<_Sender&&>(sender));
}

Expand Down Expand Up @@ -269,6 +265,14 @@ namespace exec {
stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_));
__parent->__notify_completion();
}
public:
using __id = __dynamic_next_receiver;
using is_receiver = void;

explicit __t(
__dynamic_item_operation_base<_ItemReceiver, _Receiver, _ErrorsVariant>* __op) noexcept
: __op_{__op} {
}
};

template <class _Item, class _ItemReceiverId, class _ReceiverId, class _ErrorsVariant>
Expand All @@ -287,7 +291,7 @@ namespace exec {

std::optional<subscribe_result_t<
_Subsequence,
__dynamic_next_receiver<_ItemReceiverId, _ReceiverId, _ErrorsVariant>>>
stdexec::__t<__dynamic_next_receiver<_ItemReceiverId, _ReceiverId, _ErrorsVariant>>>>
__op_;

__subsequence_operation(
Expand All @@ -309,7 +313,7 @@ namespace exec {
using _ItemReceiver = stdexec::__t<_ItemReceiverId>;
using _Receiver = stdexec::__t<_ReceiverId>;
using __subsequence_operation_t =
stdexec::__t<__subsequence_operation<_Item, _ItemReceiverId, _ReceiverId, _ErrorsVariant>>;
__subsequence_operation<_Item, _ItemReceiverId, _ReceiverId, _ErrorsVariant>;

using __dynamic_next_receiver_t =
stdexec::__t<__dynamic_next_receiver<_ItemReceiverId, _ReceiverId, _ErrorsVariant>>;
Expand Down Expand Up @@ -345,7 +349,7 @@ namespace exec {
template <same_as<set_stopped_t> _SetStopped, same_as<__t> _Self>
friend void tag_invoke(_SetStopped, _Self&& __self) noexcept {
__operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_;
stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__receiver_));
stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_));
__parent->__notify_completion();
}

Expand All @@ -354,12 +358,16 @@ namespace exec {
__operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_;
__parent->__emplace_error(static_cast<_Error&&>(__error));
__parent->__stop_source_.request_stop();
stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__receiver_));
stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_));
__parent->__notify_completion();
}
public:
using __id = __receive_subsequence;
using is_receiver = void;

explicit __t(__subsequence_operation_t* __op) noexcept
: __op_{__op} {
}
};

template <class _Item, class _ItemReceiverId, class _ReceiverId, class _ErrorsVariant>
Expand Down Expand Up @@ -475,7 +483,7 @@ namespace exec {
int __error_emplaced = __self.__parent_->__error_emplaced_.load(std::memory_order_acquire);
if (__error_emplaced == 2) {
std::visit(
__error_visitor<_Receiver>{&__self.__parent_->__receiver_},
__error_visitor<_Receiver>{__self.__parent_->__receiver_},
static_cast<_ErrorsVariant&&>(__self.__parent_->__errors_));
} else {
stdexec::set_value(static_cast<_Receiver&&>(__self.__parent_->__receiver_));
Expand All @@ -487,7 +495,7 @@ namespace exec {
int __error_emplaced = __self.__parent_->__error_emplaced_.load(std::memory_order_acquire);
if (__error_emplaced == 2) {
std::visit(
__error_visitor<_Receiver>{&__self.__parent_->__receiver_},
__error_visitor<_Receiver>{__self.__parent_->__receiver_},
static_cast<_ErrorsVariant&&>(__self.__parent_->__errors_));
} else {
exec::__set_value_unless_stopped(static_cast<_Receiver&&>(__self.__parent_->__receiver_));
Expand Down Expand Up @@ -524,14 +532,15 @@ namespace exec {
stdexec::start(__self.__op_);
}

subscribe_result_t<_Sender, __dynamic_receiver<_ReceiverId, _ErrorsVariant>> __op_;
subscribe_result_t<_Sender, stdexec::__t<__dynamic_receiver<_ReceiverId, _ErrorsVariant>>>
__op_;

public:
__t(_Sender&& sndr, _Receiver rcvr)
: __operation_base<_Receiver, _ErrorsVariant>{1, static_cast<_Receiver&&>(rcvr)}
, __op_{exec::subscribe(
static_cast<_Sender&&>(sndr),
__dynamic_receiver<_ReceiverId, _ErrorsVariant>{this})} {
stdexec::__t<__dynamic_receiver<_ReceiverId, _ErrorsVariant>>{this})} {
}
};

Expand Down Expand Up @@ -593,16 +602,24 @@ namespace exec {
friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
-> __to_sequence_completion_signatures<__value_type_t<_Self, _Env>, _Env>;

template <__decays_to<__t> _Self, class _Env>
requires(!__sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...>)
friend auto tag_invoke(get_item_types_t, _Self&&, _Env&&) -> __minvoke<
__mconcat<__q<item_types>>,
item_types_of_t<__copy_cvref_t<_Self, _Senders>, _Env>...>;
template <class _Self, class _Env>
static auto get_item_types() noexcept {
if constexpr (!__sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...>) {
using _Result = __minvoke<
__mconcat<__q<item_types>>,
item_types_of_t<__copy_cvref_t<_Self, _Senders>, _Env>...>;
return (_Result(*)()) nullptr;
} else {
using _Result = item_types_of_t<__value_type_t<_Self, _Env>, _Env>;
return (_Result(*)()) nullptr;
}
}

template <__decays_to<__t> _Self, class _Env>
requires __sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...>
friend auto tag_invoke(get_item_types_t, _Self&&, _Env&&)
-> item_types_of_t<__value_type_t<_Self, _Env>, _Env>;
-> decltype(get_item_types<_Self, _Env>()()) {
return {};
}

public:
using __id = __sequence;
Expand Down
46 changes: 39 additions & 7 deletions test/exec/sequence/test_merge_each.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "exec/sequence/transform_each.hpp"
#include "exec/sequence/ignore_all_values.hpp"
#include "exec/sequence/iterate.hpp"

#include <catch2/catch.hpp>

Expand All @@ -21,13 +22,44 @@ inline constexpr then_each_t then_each;

TEST_CASE("merge_each - with plain senders", "[sequence_senders][merge_each]") {
int checked = 0;
auto s1 = //
exec::merge_each(stdexec::just(42)) //
| then_each([&](int x) noexcept {
CHECK(x == 42);
SECTION("one just") {
auto s1 = //
exec::merge_each(stdexec::just(42)) //
| then_each([&](int x) noexcept {
CHECK(x == 42);
++checked;
})
| exec::ignore_all_values();
stdexec::sync_wait(s1);
CHECK(checked == 1);
}
SECTION("two senders") {
auto s1 = //
exec::merge_each(
stdexec::just(42), //
stdexec::just(43)) //
| then_each([&](int x) noexcept {
CHECK(x == 42 + checked);
++checked;
})
| exec::ignore_all_values();
stdexec::sync_wait(s1);
CHECK(checked == 2);
}
}

TEST_CASE("merge_each - with iterate", "[sequence_senders][merge_each]") {
std::array<int, 3> arr = {1, 2, 3};
auto view = std::views::all(arr);
int checked = 0;
auto s1 = //
exec::iterate(view) //
| then_each([=](int x) noexcept { return exec::iterate(std::views::iota(0, x)); }) //
| exec::merge_each() //
| then_each([&](int) noexcept {
++checked;
})
| exec::ignore_all_values();
}) //
| exec::ignore_all_values(); //
stdexec::sync_wait(s1);
CHECK(checked == 1);
CHECK(checked == 6);
}