时间:2026-03-17 18:26
人气:
作者:admin
#include <iostream>
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
/* This example creates a subclass of Node and uses a wait-set based loop to wait on
* a subscription to have messages available and then handles them manually without an executor */
class WaitSetSubscriber : public rclcpp::Node
{
public:
explicit WaitSetSubscriber(rclcpp::NodeOptions options)
: Node("wait_set_subscriber", options)
{
rclcpp::CallbackGroup::SharedPtr cb_group_waitset = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, false);
auto subscription_options = rclcpp::SubscriptionOptions();
subscription_options.callback_group = cb_group_waitset;
auto subscription_callback = [this](std_msgs::msg::String::UniquePtr msg) {
RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
};
subscription_ = this->create_subscription<std_msgs::msg::String>(
"topic",
10,
subscription_callback,
subscription_options);
wait_set_.add_subscription(subscription_);
thread_ = std::thread([this]() -> void {spin_wait_set();});
}
~WaitSetSubscriber()
{
if (thread_.joinable()) {
thread_.join();
}
}
void spin_wait_set()
{
while (rclcpp::ok()) {
// Wait for the subscriber event to trigger. Set a 1 ms margin to trigger a timeout.
const auto wait_result = wait_set_.wait(std::chrono::milliseconds(501));
switch (wait_result.kind()) {
case rclcpp::WaitResultKind::Ready:
{
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
if (subscription_->take(msg, msg_info)) {
std::shared_ptr<void> type_erased_msg = std::make_shared<std_msgs::msg::String>(msg);
subscription_->handle_message(type_erased_msg, msg_info);
}
break;
}
case rclcpp::WaitResultKind::Timeout:
if (rclcpp::ok()) {
RCLCPP_WARN(this->get_logger(), "Timeout. No message received after given wait-time");
}
break;
default:
RCLCPP_ERROR(this->get_logger(), "Error. Wait-set failed.");
}
}
}
private:
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
rclcpp::WaitSet wait_set_;
std::thread thread_;
};
#include "rclcpp_components/register_node_macro.hpp"
RCLCPP_COMPONENTS_REGISTER_NODE(WaitSetSubscriber)
以上代码是 ROS 2 中基于 WaitSet(等待集)的手动消息处理示例,核心特点是不依赖 ROS 2 默认的 Executor(执行器),而是通过 WaitSet 主动等待订阅者的消息事件,手动接收并处理消息。这种方式能让开发者完全掌控消息处理的时机和线程模型,是 ROS 2 进阶编程中 “手动控制事件循环” 的典型实现。
这个程序实现了一个 ROS 2 订阅者节点(wait_set_subscriber),核心逻辑:
在解析代码前,先理解 3 个关键概念(ROS 2 事件驱动核心):
#include <iostream> // 基础输入输出(示例中未直接使用,预留)
#include <memory> // 智能指针(SharedPtr/UniquePtr)
#include <thread> // 线程创建与管理(std::thread)
#include "rclcpp/rclcpp.hpp" // ROS 2 核心 API
#include "std_msgs/msg/string.hpp" // ROS 2 标准字符串消息
#include "rclcpp_components/register_node_macro.hpp" // 组件化注册宏(节点可编译为组件)
重点: 用于创建独立的事件循环线程;rclcpp_components/register_node_macro.hpp 支持将节点注册为 ROS 2 组件(可选特性)。
class WaitSetSubscriber : public rclcpp::Node
{
public:
// 构造函数:初始化节点、回调组、订阅者、WaitSet、事件线程
explicit WaitSetSubscriber(rclcpp::NodeOptions options)
: Node("wait_set_subscriber", options) // 节点名 + 自定义节点选项
{
// ========== 第一步:创建独立的回调组 ==========
// 类型:MutuallyExclusive(互斥),第二个参数 false 表示不自动添加到节点的默认执行器
rclcpp::CallbackGroup::SharedPtr cb_group_waitset = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, false);
// ========== 第二步:配置订阅者选项(绑定回调组) ==========
auto subscription_options = rclcpp::SubscriptionOptions();
subscription_options.callback_group = cb_group_waitset; // 订阅者绑定到自定义回调组
// ========== 第三步:定义订阅者回调函数(Lambda) ==========
auto subscription_callback = [this](std_msgs::msg::String::UniquePtr msg) {
// 回调逻辑:打印收到的消息
RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
};
// ========== 第四步:创建订阅者 ==========
subscription_ = this->create_subscription<std_msgs::msg::String>(
"topic", // 订阅的话题名
10, // 队列大小
subscription_callback, // 消息回调函数
subscription_options // 自定义选项(绑定回调组)
);
// ========== 第五步:将订阅者加入 WaitSet(监听其消息事件) ==========
wait_set_.add_subscription(subscription_);
// ========== 第六步:启动独立线程运行 WaitSet 事件循环 ==========
// 线程执行 spin_wait_set() 方法,脱离主线程的 spin()
thread_ = std::thread([this]() -> void {spin_wait_set();});
}
// ========== 析构函数:安全释放线程资源 ==========
~WaitSetSubscriber()
{
// 检查线程是否可连接(避免重复 join),确保线程退出后再销毁节点
if (thread_.joinable()) {
thread_.join();
}
}
// ========== 核心方法:WaitSet 事件循环 ==========
void spin_wait_set()
{
// 循环条件:ROS 2 上下文正常(未调用 shutdown)
while (rclcpp::ok()) {
// 1. 阻塞等待订阅者事件,超时时间 501ms
// 若订阅者有消息,返回 Ready;超时返回 Timeout;失败返回 Error
const auto wait_result = wait_set_.wait(std::chrono::milliseconds(501));
// 2. 根据等待结果处理
switch (wait_result.kind()) {
// 情况 1:有消息可用(Ready)
case rclcpp::WaitResultKind::Ready:
{
// 定义存储消息的变量和消息信息(时间戳、发布者等)
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
// 手动 take(取出)消息:从订阅者队列中取出一条消息
// take 返回 bool:true 表示成功取出,false 表示队列空(理论上不会发生,因 WaitSet 已通知 Ready)
if (subscription_->take(msg, msg_info)) {
// 将消息封装为类型擦除的共享指针(适配 handle_message 的参数要求)
std::shared_ptr<void> type_erased_msg = std::make_shared<std_msgs::msg::String>(msg);
// 手动调用订阅者的回调函数处理消息
subscription_->handle_message(type_erased_msg, msg_info);
}
break;
}
// 情况 2:超时(Timeout)
case rclcpp::WaitResultKind::Timeout:
// 若 ROS 2 上下文仍正常,打印超时警告
if (rclcpp::ok()) {
RCLCPP_WARN(this->get_logger(), "Timeout. No message received after given wait-time");
}
break;
// 情况 3:等待失败(Error)
default:
RCLCPP_ERROR(this->get_logger(), "Error. Wait-set failed.");
}
}
}
private:
// ========== 成员变量 ==========
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_; // 订阅者智能指针
rclcpp::WaitSet wait_set_; // 等待集(监听订阅者事件)
std::thread thread_; // 事件循环线程
};
// ========== 组件化注册:将节点注册为 ROS 2 组件(可选) ==========
RCLCPP_COMPONENTS_REGISTER_NODE(WaitSetSubscriber)
这个示例的核心价值是 脱离 Executor 手动控制事件循环,适用于以下场景:
| 维度 | WaitSet(手动) | Executor(自动,如 spin()) |
|---|---|---|
| 控制粒度 | 极细(完全掌控等待 / 接收 / 处理) | 较粗(自动分发,开发者仅写回调) |
| 灵活性 | 极高(可集成到任意事件循环) | 较低(依赖 ROS 2 内置逻辑) |
| 代码复杂度 | 高(需手动处理等待 / 错误 / 线程) | 低(一行 spin() 搞定) |
| 性能 | 略高(减少 Executor 封装开销) | 略低(额外封装层) |
| 适用场景 | 进阶 / 定制化需求 | 普通 / 快速开发需求 |
ROS 2 WaitSet(等待集):推出时间与适用场景
WaitSet 是 ROS 2 从底层对接 DDS 标准的核心特性,其演进历程如下:
WaitSet 是 ROS 2 最底层的事件等待机制,本质是:
它是 ROS 2 内置 Executor(spin()/spin_some())的底层实现基础——Executor 本质就是 “WaitSet + 自动事件分发 + 线程管理” 的封装。
| 维度 | WaitSet(手动) | Executor(自动,如 spin()) |
|---|---|---|
| 控制粒度 | 极细(底层事件级) | 较粗(回调级) |
| 灵活性 | 完全自定义(适配任意场景) | 固定逻辑(仅可配置线程数) |
| 代码复杂度 | 高(需手动处理等待 / 取消息 / 错误) | 低(仅需编写回调函数) |
| 性能 | 无封装开销(更高) | 有调度 / 分发开销(略低) |
| 学习成本 | 高(需理解 ROS 2 底层事件模型) | 低(开箱即用) |