网站首页 全球最实用的IT互联网站!

人工智能P2P分享Wind搜索发布信息网站地图标签大全

当前位置:诺佳网 > 人工智能 > 人形机器人 >

【ROS2】ROS 2 中 WaitSet(等待集)的简介与使用

时间:2026-03-17 18:26

人气:

作者:admin

标签:

导读:【ROS2】ROS 2 中 WaitSet(等待集)的简介与使用 XLevon AtomGit开源社区...

1、官方示例代码

#include <iostream>
#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

/* This example creates a subclass of Node and uses a wait-set based loop to wait on
 * a subscription to have messages available and then handles them manually without an executor */

class WaitSetSubscriber : public rclcpp::Node
{
public:
  explicit WaitSetSubscriber(rclcpp::NodeOptions options)
  : Node("wait_set_subscriber", options)
  {
    rclcpp::CallbackGroup::SharedPtr cb_group_waitset = this->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive, false);
    auto subscription_options = rclcpp::SubscriptionOptions();
    subscription_options.callback_group = cb_group_waitset;
    auto subscription_callback = [this](std_msgs::msg::String::UniquePtr msg) {
        RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
      };
    subscription_ = this->create_subscription<std_msgs::msg::String>(
      "topic",
      10,
      subscription_callback,
      subscription_options);
    wait_set_.add_subscription(subscription_);
    thread_ = std::thread([this]() -> void {spin_wait_set();});
  }

  ~WaitSetSubscriber()
  {
    if (thread_.joinable()) {
      thread_.join();
    }
  }

  void spin_wait_set()
  {
    while (rclcpp::ok()) {
      // Wait for the subscriber event to trigger. Set a 1 ms margin to trigger a timeout.
      const auto wait_result = wait_set_.wait(std::chrono::milliseconds(501));
      switch (wait_result.kind()) {
        case rclcpp::WaitResultKind::Ready:
          {
            std_msgs::msg::String msg;
            rclcpp::MessageInfo msg_info;
            if (subscription_->take(msg, msg_info)) {
              std::shared_ptr<void> type_erased_msg = std::make_shared<std_msgs::msg::String>(msg);
              subscription_->handle_message(type_erased_msg, msg_info);
            }
            break;
          }
        case rclcpp::WaitResultKind::Timeout:
          if (rclcpp::ok()) {
            RCLCPP_WARN(this->get_logger(), "Timeout. No message received after given wait-time");
          }
          break;
        default:
          RCLCPP_ERROR(this->get_logger(), "Error. Wait-set failed.");
      }
    }
  }

private:
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
  rclcpp::WaitSet wait_set_;
  std::thread thread_;
};

#include "rclcpp_components/register_node_macro.hpp"

RCLCPP_COMPONENTS_REGISTER_NODE(WaitSetSubscriber)

2、代码解析

以上代码是 ROS 2 中基于 WaitSet(等待集)的手动消息处理示例,核心特点是不依赖 ROS 2 默认的 Executor(执行器),而是通过 WaitSet 主动等待订阅者的消息事件,手动接收并处理消息。这种方式能让开发者完全掌控消息处理的时机和线程模型,是 ROS 2 进阶编程中 “手动控制事件循环” 的典型实现。

2.1、整体功能总结

这个程序实现了一个 ROS 2 订阅者节点(wait_set_subscriber),核心逻辑:

  1. 创建独立的回调组和订阅者,将订阅者加入 WaitSet(等待集);
  2. 启动一个独立线程,在该线程中运行基于 WaitSet 的事件循环;
  3. 循环等待订阅者有消息可用(超时时间 501ms):
  • 有消息时:手动 take(取出)消息,调用订阅者的回调函数处理;
  • 超时无消息时:打印警告日志;
  • 等待失败时:打印错误日志;
  1. 节点销毁时,等待线程退出,保证资源安全释放;
  2. 核心特性:脱离 ROS 2 内置的 spin()/spin_some() 执行器,完全手动控制消息的等待、接收和处理流程。

2.2、核心前置概念

在解析代码前,先理解 3 个关键概念(ROS 2 事件驱动核心):

  1. WaitSet(等待集):
  • 本质:ROS 2 封装的事件等待机制,底层对接 DDS 的 WaitSet,可监听订阅者、定时器、服务、客户端等事件;
  • 作用:主动阻塞等待 “目标事件触发”(如订阅者有消息),替代 Executor 的自动事件分发;
  • 核心方法:add_subscription()(添加监听的订阅者)、wait()(阻塞等待事件,可设超时)。
  1. Executor(执行器):
  • ROS 2 默认的事件处理机制(spin()/spin_some() 底层就是 Executor),自动监听事件、分发回调;
  • 本示例完全绕过 Executor,用 WaitSet + 手动线程实现事件循环,灵活性更高但需手动处理所有逻辑。
  1. CallbackGroup(回调组):
  • 用于管理回调函数的线程模型,示例中创建 MutuallyExclusive(互斥)回调组,保证同一时间只有一个回调执行;
  • 这里的回调组主要是为订阅者绑定上下文,实际消息处理由手动线程完成。

2.3、逐模块代码解析

  1. 头文件引入(基础依赖)
#include <iostream>      // 基础输入输出(示例中未直接使用,预留)
#include <memory>        // 智能指针(SharedPtr/UniquePtr)
#include <thread>        // 线程创建与管理(std::thread)

#include "rclcpp/rclcpp.hpp"                // ROS 2 核心 API
#include "std_msgs/msg/string.hpp"          // ROS 2 标准字符串消息
#include "rclcpp_components/register_node_macro.hpp"  // 组件化注册宏(节点可编译为组件)

重点: 用于创建独立的事件循环线程;rclcpp_components/register_node_macro.hpp 支持将节点注册为 ROS 2 组件(可选特性)。

  1. 节点类定义(核心逻辑)
class WaitSetSubscriber : public rclcpp::Node
{
public:
  // 构造函数:初始化节点、回调组、订阅者、WaitSet、事件线程
  explicit WaitSetSubscriber(rclcpp::NodeOptions options)
  : Node("wait_set_subscriber", options)  // 节点名 + 自定义节点选项
  {
    // ========== 第一步:创建独立的回调组 ==========
    // 类型:MutuallyExclusive(互斥),第二个参数 false 表示不自动添加到节点的默认执行器
    rclcpp::CallbackGroup::SharedPtr cb_group_waitset = this->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive, false);

    // ========== 第二步:配置订阅者选项(绑定回调组) ==========
    auto subscription_options = rclcpp::SubscriptionOptions();
    subscription_options.callback_group = cb_group_waitset;  // 订阅者绑定到自定义回调组

    // ========== 第三步:定义订阅者回调函数(Lambda) ==========
    auto subscription_callback = [this](std_msgs::msg::String::UniquePtr msg) {
        // 回调逻辑:打印收到的消息
        RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
      };

    // ========== 第四步:创建订阅者 ==========
    subscription_ = this->create_subscription<std_msgs::msg::String>(
      "topic",                  // 订阅的话题名
      10,                       // 队列大小
      subscription_callback,    // 消息回调函数
      subscription_options      // 自定义选项(绑定回调组)
    );

    // ========== 第五步:将订阅者加入 WaitSet(监听其消息事件) ==========
    wait_set_.add_subscription(subscription_);

    // ========== 第六步:启动独立线程运行 WaitSet 事件循环 ==========
    // 线程执行 spin_wait_set() 方法,脱离主线程的 spin()
    thread_ = std::thread([this]() -> void {spin_wait_set();});
  }

  // ========== 析构函数:安全释放线程资源 ==========
  ~WaitSetSubscriber()
  {
    // 检查线程是否可连接(避免重复 join),确保线程退出后再销毁节点
    if (thread_.joinable()) {
      thread_.join();
    }
  }

  // ========== 核心方法:WaitSet 事件循环 ==========
  void spin_wait_set()
  {
    // 循环条件:ROS 2 上下文正常(未调用 shutdown)
    while (rclcpp::ok()) {
      // 1. 阻塞等待订阅者事件,超时时间 501ms
      // 若订阅者有消息,返回 Ready;超时返回 Timeout;失败返回 Error
      const auto wait_result = wait_set_.wait(std::chrono::milliseconds(501));

      // 2. 根据等待结果处理
      switch (wait_result.kind()) {
        // 情况 1:有消息可用(Ready)
        case rclcpp::WaitResultKind::Ready:
          {
            // 定义存储消息的变量和消息信息(时间戳、发布者等)
            std_msgs::msg::String msg;
            rclcpp::MessageInfo msg_info;

            // 手动 take(取出)消息:从订阅者队列中取出一条消息
            // take 返回 bool:true 表示成功取出,false 表示队列空(理论上不会发生,因 WaitSet 已通知 Ready)
            if (subscription_->take(msg, msg_info)) {
              // 将消息封装为类型擦除的共享指针(适配 handle_message 的参数要求)
              std::shared_ptr<void> type_erased_msg = std::make_shared<std_msgs::msg::String>(msg);
              // 手动调用订阅者的回调函数处理消息
              subscription_->handle_message(type_erased_msg, msg_info);
            }
            break;
          }

        // 情况 2:超时(Timeout)
        case rclcpp::WaitResultKind::Timeout:
          // 若 ROS 2 上下文仍正常,打印超时警告
          if (rclcpp::ok()) {
            RCLCPP_WARN(this->get_logger(), "Timeout. No message received after given wait-time");
          }
          break;

        // 情况 3:等待失败(Error)
        default:
          RCLCPP_ERROR(this->get_logger(), "Error. Wait-set failed.");
      }
    }
  }

private:
  // ========== 成员变量 ==========
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;  // 订阅者智能指针
  rclcpp::WaitSet wait_set_;                                             // 等待集(监听订阅者事件)
  std::thread thread_;                                                   // 事件循环线程
};

// ========== 组件化注册:将节点注册为 ROS 2 组件(可选) ==========
RCLCPP_COMPONENTS_REGISTER_NODE(WaitSetSubscriber)
  1. 核心逻辑补充说明
  • take() vs callback:
    • 普通订阅者由 Executor 自动调用 take() + handle_message();
    • 本示例中,WaitSet 通知 “有消息” 后,手动调用 take() 取出消息,再调用 handle_message() 触发回调,完全复刻 Executor 的核心逻辑,但由开发者掌控时机。
  • 线程安全:
    • 事件循环运行在独立线程 thread_ 中,与主线程隔离;
    • 析构函数中 join() 线程,避免节点销毁时线程仍在运行导致的资源泄漏。
  • 超时设置 501ms:
    • 预留 1ms 余量,避免与 500ms 等常见周期冲突,确保超时逻辑能稳定触发。

2.4、运行逻辑梳理(完整流程)

  1. 程序启动 → 初始化 ROS 2 上下文,创建 WaitSetSubscriber 节点;
  2. 节点构造函数执行:
    • 创建回调组 → 配置订阅者 → 绑定到回调组 → 将订阅者加入 WaitSet;
    • 启动独立线程,执行 spin_wait_set() 方法;
  3. 线程进入 spin_wait_set() 循环:
  • 调用 wait_set_.wait(501ms) 阻塞等待订阅者消息;
  • 若发布者向 topic 发布消息:
    • WaitSet 检测到事件,返回 Ready;
    • 手动 take() 取出消息,调用 handle_message() 触发回调,打印消息;
  • 若 501ms 内无消息:
    • WaitSet 返回 Timeout,打印超时警告;
  • 若等待失败(如订阅者销毁):
    • 返回 Error,打印错误日志;
  1. 用户按下 Ctrl+C → rclcpp::ok() 变为 false,循环退出;
  2. 节点析构 → join() 线程,释放资源,程序退出。

2.5、应用场景与价值

这个示例的核心价值是 脱离 Executor 手动控制事件循环,适用于以下场景:

  1. 自定义事件循环:
    • 需要将 ROS 2 事件(消息、定时器)集成到外部事件循环(如 Qt 事件循环、工业控制循环);
    • 需精确控制消息处理的时机(如每 100ms 批量处理一次消息,而非收到即处理)。
  2. 高性能 / 低延迟场景:
    • 绕过 Executor 的封装,直接调用底层 take()/handle_message(),减少中间层开销;
    • 对消息处理线程做精细化调度(如绑定到特定 CPU 核心)。
  3. 多事件协同等待:
    • WaitSet 可同时监听多个订阅者、定时器、服务端事件,实现 “任意一个事件触发就处理” 的逻辑(如同时等待传感器消息和控制指令)。
  4. 嵌入式 / 资源受限系统:
    • 替代笨重的 Executor,仅保留必要的 WaitSet 逻辑,减少内存 / CPU 占用。
  5. 调试 / 故障排查:
    • 手动控制消息接收流程,便于断点调试(如在 take() 后暂停,检查消息内容)。

2.6、关键对比(WaitSet vs Executor)

维度 WaitSet(手动) Executor(自动,如 spin())
控制粒度 极细(完全掌控等待 / 接收 / 处理) 较粗(自动分发,开发者仅写回调)
灵活性 极高(可集成到任意事件循环) 较低(依赖 ROS 2 内置逻辑)
代码复杂度 高(需手动处理等待 / 错误 / 线程) 低(一行 spin() 搞定)
性能 略高(减少 Executor 封装开销) 略低(额外封装层)
适用场景 进阶 / 定制化需求 普通 / 快速开发需求

2.7、小结

  1. 核心功能:实现不依赖 Executor 的 ROS 2 订阅者,通过 WaitSet + 独立线程手动控制消息的等待、接收和处理;
  2. 关键技术:rclcpp::WaitSet 事件监听、take() 手动取消息、handle_message() 手动触发回调、独立线程管理;
  3. 核心价值:完全掌控消息处理流程,适用于自定义事件循环、高性能 / 低延迟、嵌入式系统等进阶场景;
  4. 学习重点:理解 ROS 2 事件驱动的底层逻辑(Executor 本质是 WaitSet + 自动循环),是 ROS 2 进阶开发的核心知识点。
    这个示例是 ROS 2 从 “使用封装” 到 “理解底层” 的关键过渡,掌握它能深入理解 ROS 2 事件处理的本质。

3、技术背景与应用场景

ROS 2 WaitSet(等待集):推出时间与适用场景

3.1、推出时间与版本演进

WaitSet 是 ROS 2 从底层对接 DDS 标准的核心特性,其演进历程如下:

  • 底层基础能力:随 ROS 2 首个正式版本 Ardent Apalone(2017 年 12 月) 引入,作为 rcl 层(ROS Client Library)的基础接口,直接封装 DDS 的 WaitSet 机制;
  • rclcpp 层标准化:在 Foxy Fitzroy(2020 年 6 月) 版本完成 rclcpp::WaitSet 接口定型,提供易用的 C++ 封装(如 add_subscription()、wait() 等方法);
  • 功能完善与稳定:
    • Humble Hawksbill(2022 年 5 月,LTS):补充超时处理、多事件监听、错误码标准化,成为生产级可用特性;
    • Iron Irwini(2023 年):优化性能,支持更多事件类型(如定时器、服务、客户端、GuardCondition);
  • 兼容性:所有 ROS 2 版本(Ardent 及以后)均支持 WaitSet,无版本兼容限制,是 ROS 2 事件驱动的底层核心。

3.2、核心原理

WaitSet 是 ROS 2 最底层的事件等待机制,本质是:

  1. 开发者将需要监听的事件源(订阅者、定时器、服务端、客户端等)加入 WaitSet;
  2. 调用 wait() 方法阻塞当前线程,直到任意一个事件源触发(如订阅者有消息、定时器到期)或超时;
  3. 开发者手动处理触发的事件(如取出消息、执行定时器逻辑),完全掌控事件处理的时机和流程。

它是 ROS 2 内置 Executor(spin()/spin_some())的底层实现基础——Executor 本质就是 “WaitSet + 自动事件分发 + 线程管理” 的封装。

3.3、核心适用场景(按优先级)

  1. 自定义事件循环(最核心场景)
    • 集成外部事件循环:将 ROS 2 事件(消息、定时器)融入非 ROS 原生的事件循环(如 Qt/QML 事件循环、工业控制周期循环、Unity/Unreal 游戏引擎循环);
    • 批量事件处理:不 “收到消息即处理”,而是按固定周期(如 100ms)批量取出所有待处理消息 / 事件,适配高实时性的控制周期;
    • 单线程多事件协同:在一个线程中同时监听 “传感器消息 + 控制指令 + 定时器”,任意一个事件触发即处理,避免多线程同步开销。
  2. 高性能 / 低延迟场景
    • 嵌入式 / 边缘设备:绕过 Executor 的封装开销(如自动回调分发、线程池管理),直接调用底层 take() 取消息,减少 CPU / 内存占用;
    • 硬实时系统:手动控制 WaitSet 的等待超时、事件处理时机,可绑定到特定 CPU 核心,满足微秒级延迟要求(如工业机器人运动控制);
    • 高频消息处理:对激光雷达、图像等高频消息,通过 WaitSet 精准控制 “取消息→处理→反馈” 的全流程,避免 Executor 调度的不确定性。
  3. 精细化事件控制
    • 选择性处理事件:监听多个订阅者,但仅处理 “优先级高” 的事件(如先处理急停指令,再处理普通传感器消息);
    • 自定义超时逻辑:为不同事件设置差异化超时(如传感器消息超时 100ms 报警,控制指令超时 10ms 触发应急逻辑);
    • 事件依赖处理:等待 “多个事件都触发” 后再处理(如同时收到 “传感器数据 + 校准指令” 才执行校准,通过多次 WaitSet 等待实现)。
  4. 多事件源统一监听
    • 一个 WaitSet 可同时监听多种类型的事件源:订阅者(消息到达)、定时器(周期触发)、服务端(客户端请求)、客户端(服务响应)、GuardCondition(手动触发的条件);
    • 典型场景:机器人 “待机状态” 下,同时监听 “唤醒指令(订阅者)、定时唤醒(定时器)、手动触发(GuardCondition)”,任意一个事件触发即退出待机。
  5. 调试 / 底层开发
    • 深入理解 ROS 2 事件机制:通过 WaitSet 手动实现 Executor 的核心逻辑,理解 spin() 的底层原理;
    • 精准调试事件流程:在 wait() 后断点,检查哪些事件触发、触发时机,定位 “消息丢失”“定时器延迟” 等底层问题;
    • 自定义 Executor:基于 WaitSet 开发适配特定场景的 Executor(如单线程高优先级 Executor、批量处理 Executor)。
  6. 资源极度受限的场景
    • 单片机、裸机移植的 ROS 2 Micro(RMW 为 MicroXRCE-DDS):仅保留 WaitSet 核心逻辑,无需加载完整 Executor,大幅降低资源占用。

3.4、不适用场景

  • 普通快速开发:仅需简单订阅 / 发布消息,使用 spin()/spin_some() 更高效,无需手动编写 WaitSet 循环;
  • 多线程并发处理:需要多个回调并行执行时,Executor 的线程池机制比手动管理 WaitSet + 多线程更简单;
  • 对代码复杂度敏感的场景:WaitSet 需手动处理事件检测、消息取出、错误处理,代码量远大于 Executor。

3.5、与 Executor 的核心对比(补充)

维度 WaitSet(手动) Executor(自动,如 spin())
控制粒度 极细(底层事件级) 较粗(回调级)
灵活性 完全自定义(适配任意场景) 固定逻辑(仅可配置线程数)
代码复杂度 高(需手动处理等待 / 取消息 / 错误) 低(仅需编写回调函数)
性能 无封装开销(更高) 有调度 / 分发开销(略低)
学习成本 高(需理解 ROS 2 底层事件模型) 低(开箱即用)

3.6、小结

  1. 推出核心节点:2017 年随 ROS 2 首个版本引入底层能力,2020 年 Foxy 版本完成 C++ 接口标准化,2022 年 Humble 成为稳定的生产级特性;
  2. 核心定位:ROS 2 事件驱动的底层基石,Executor 是其 “封装后的易用版”;
  3. 最佳场景:自定义事件循环、高性能 / 低延迟系统、嵌入式 / 硬实时场景、多事件源统一监听,是 ROS 2 进阶开发(从 “使用” 到 “定制”)的核心工具。
温馨提示:以上内容整理于网络,仅供参考,如果对您有帮助,留下您的阅读感言吧!
相关阅读
本类排行
相关标签
本类推荐

CPU | 内存 | 硬盘 | 显卡 | 显示器 | 主板 | 电源 | 键鼠 | 网站地图

Copyright © 2025-2035 诺佳网 版权所有 备案号:赣ICP备2025066733号
本站资料均来源互联网收集整理,作品版权归作者所有,如果侵犯了您的版权,请跟我们联系。

关注微信