Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions behaviortree_ros2/include/behaviortree_ros2/bt_topic_sub_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class RosTopicSubNode : public BT::ConditionNode
// The callback will broadcast to all the instances of RosTopicSubNode<T>
auto callback = [this](const std::shared_ptr<TopicT> msg)
{
last_msg = msg;
broadcaster(msg);
};
subscriber = node->create_subscription<TopicT>(topic_name, 1, callback, option);
Expand All @@ -75,6 +76,7 @@ class RosTopicSubNode : public BT::ConditionNode
rclcpp::CallbackGroup::SharedPtr callback_group;
rclcpp::executors::SingleThreadedExecutor callback_group_executor;
boost::signals2::signal<void (const std::shared_ptr<TopicT>)> broadcaster;
std::shared_ptr<TopicT> last_msg;


};
Expand Down Expand Up @@ -173,6 +175,16 @@ class RosTopicSubNode : public BT::ConditionNode
*/
virtual NodeStatus onTick(const std::shared_ptr<TopicT>& last_msg) = 0;

/** latch the message that has been processed. If returns false and no new message is
* received, before next call there will be no message to process. If returns true,
* the next call will process the same message again, if no new message received.
*
* This can be equated with latched vs non-latched topics in ros 1.
*
* @return false will clear the message after ticking/processing.
*/
virtual bool latchLastMessage() const { return false; }

private:

bool createSubscriber(const std::string& topic_name);
Expand Down Expand Up @@ -260,6 +272,11 @@ template<class T> inline
sub_instance_ = it->second;
}

// Check if there was a message received before the creation of this subscriber action
if (sub_instance_->last_msg)
{
last_msg_ = sub_instance_->last_msg;
}

// add "this" as received of the broadcaster
signal_connection_ = sub_instance_->broadcaster.connect(
Expand All @@ -276,12 +293,18 @@ template<class T> inline
// First, check if the subscriber_ is valid and that the name of the
// topic_name in the port didn't change.
// otherwise, create a new subscriber
std::string topic_name;
getInput("topic_name", topic_name);

if(!sub_instance_)
{
std::string topic_name;
getInput("topic_name", topic_name);
createSubscriber(topic_name);
}
else if(topic_name_ != topic_name)
{
sub_instance_.reset();
createSubscriber(topic_name);
}

auto CheckStatus = [](NodeStatus status)
{
Expand All @@ -294,8 +317,10 @@ template<class T> inline
};
sub_instance_->callback_group_executor.spin_some();
auto status = CheckStatus (onTick(last_msg_));
last_msg_ = nullptr;

if (!latchLastMessage())
{
last_msg_.reset();
}
return status;
}

Expand Down