Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Properly construct and reset stop callback
  • Loading branch information
maikel committed Nov 16, 2023
commit b3e7675aaf7be87be00c0f11a6c71aaaa59f6605
15 changes: 15 additions & 0 deletions include/exec/sequence/merge_each.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ namespace exec {
template <same_as<set_value_t> _SetValue, same_as<__t> _Self>
requires __callable<_SetValue, _ItemReceiver&&>
friend void tag_invoke(_SetValue, _Self&& __self) noexcept {
__self.__op_->__on_item_receiver_stopped_.reset();
__operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_;
stdexec::set_value(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_));
__parent->__notify_completion();
Expand All @@ -251,6 +252,7 @@ namespace exec {
template <same_as<set_stopped_t> _SetStopped, same_as<__t> _Self>
requires __callable<_SetStopped, _ItemReceiver&&>
friend void tag_invoke(_SetStopped, _Self&& __self) noexcept {
__self.__op_->__on_item_receiver_stopped_.reset();
__operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_;
stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_));
__parent->__notify_completion();
Expand All @@ -259,6 +261,7 @@ namespace exec {
template <same_as<set_error_t> _SetError, same_as<__t> _Self, class _Error>
requires __callable<set_stopped_t, _ItemReceiver&&>
friend void tag_invoke(_SetError, _Self&& __self, _Error&& __error) noexcept {
__self.__op_->__on_item_receiver_stopped_.reset();
__operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_;
__parent->__emplace_error(static_cast<_Error&&>(__error));
__parent->__stop_source_.request_stop();
Expand Down Expand Up @@ -339,6 +342,7 @@ namespace exec {
stdexec::start(__next_op);
} catch (...) {
__operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_;
__self.__op_->__on_item_receiver_stopped_.reset();
__parent->__emplace_error(std::current_exception());
__parent->__stop_source_.request_stop();
stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_));
Expand All @@ -348,13 +352,15 @@ namespace exec {

template <same_as<set_stopped_t> _SetStopped, same_as<__t> _Self>
friend void tag_invoke(_SetStopped, _Self&& __self) noexcept {
__self.__op_->__on_item_receiver_stopped_.reset();
__operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_;
stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_));
__parent->__notify_completion();
}

template <same_as<set_error_t> _SetError, same_as<__t> _Self, class _Error>
friend void tag_invoke(_SetError, _Self&& __self, _Error&& __error) noexcept {
__self.__op_->__on_item_receiver_stopped_.reset();
__operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_;
__parent->__emplace_error(static_cast<_Error&&>(__error));
__parent->__stop_source_.request_stop();
Expand Down Expand Up @@ -401,6 +407,9 @@ namespace exec {

template <same_as<__t> _Self>
friend void tag_invoke(stdexec::start_t, _Self& __self) noexcept {
__self.__on_item_receiver_stopped_.emplace(
stdexec::get_stop_token(stdexec::get_env(__self.__item_receiver_)),
__dynamic_item_stop<_Receiver, _ErrorsVariant>{__self.__parent_});
stdexec::start(__self.__receive_op_);
}
};
Expand Down Expand Up @@ -480,6 +489,7 @@ namespace exec {

template <same_as<set_value_t> _SetValue, same_as<__t> _Self>
friend void tag_invoke(_SetValue, _Self&& __self) noexcept {
__self.__parent_->__on_receiver_stopped_.reset();
int __error_emplaced = __self.__parent_->__error_emplaced_.load(std::memory_order_acquire);
if (__error_emplaced == 2) {
std::visit(
Expand All @@ -492,6 +502,7 @@ namespace exec {

template <same_as<set_stopped_t> _SetStopped, same_as<__t> _Self>
friend void tag_invoke(_SetStopped, _Self&& __self) noexcept {
__self.__parent_->__on_receiver_stopped_.reset();
int __error_emplaced = __self.__parent_->__error_emplaced_.load(std::memory_order_acquire);
if (__error_emplaced == 2) {
std::visit(
Expand All @@ -504,6 +515,7 @@ namespace exec {

template <same_as<set_error_t> _SetError, same_as<__t> _Self, class _Error>
friend void tag_invoke(_SetError, _Self&& __self, _Error&& __error) noexcept {
__self.__parent_->__on_receiver_stopped_.reset();
stdexec::set_error(
static_cast<_Receiver&&>(__self.__parent_->__receiver_), static_cast<_Error&&>(__error));
}
Expand All @@ -529,6 +541,9 @@ namespace exec {

template <same_as<__t> _Self>
friend void tag_invoke(stdexec::start_t, _Self& __self) noexcept {
__self.__on_receiver_stopped_.emplace(
stdexec::get_stop_token(stdexec::get_env(__self.__receiver_)),
__default_stop_callback{__self.__stop_source_});
stdexec::start(__self.__op_);
}

Expand Down