高效的进程内通讯

背景

ROS应用程序类型调用y由单个 "nodes" 组成,它执行狭窄的任务,并与系统的其他部分分离。这促进故障隔离,更快开发版本、模块化和代码重用,但经常的性能。在ROS 1初步开发后,对节点有效组成的需求变得明显,并且开发了节点组。ROS 2旨在提高设计Nodelets解决一些基金ament问题需要重组节点。 [待校准@7492]

在此例程中,我们将重点介绍如何通过分别定义节点,但在不更改节点代码或限制其功能的情况下将它们组合到不同的流程布局中来手动组成节点。 [待校准@7493]

安装例程 [待校准@7494]

有关安装ROS 2的详细信息,请参见 installation instructions[待校准@7495]

如果您已经从包中安装了ROS 2,请确保您已经安装了 ros-foxy-intra-process-demo 。如果您从源文件下载了archive或构建的ROS 2,它将已经是安装的一部分。 [待校准@7496]

运行和理解例程 [待校准@7497]

有几个不同的例程: 有些玩具问题设计高亮特征内过程通讯s功能有些端到端例子使用OpenCV例程需要表明能力重组节点不同配置。 [待校准@7498]

双节点管道例程 [待校准@7499]

此例程旨在显示进程内发布/订阅连接在发布和订阅 “std::unique_ptr” s时可能导致消息的零副本传输。 [待校准@7500]

首先让我们看一下源文件: [待校准@7501]

https://github.com/ros2/例程/blob/狡猾/进程内 _ 例程/src/two_node_pipeline/two_node_pipeline.cpp

#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

using namespace std::chrono_literals;

// Node that produces messages.
struct Producer : public rclcpp::Node
{
  Producer(const std::string & name, const std::string & output)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a publisher on the output topic.
    pub_ = this->create_publisher<std_msgs::msg::Int32>(output, 10);
    std::weak_ptr<std::remove_pointer<decltype(pub_.get())>::type> captured_pub = pub_;
    // Create a timer which publishes on the output topic at ~1Hz.
    auto callback = [captured_pub]() -> void {
        auto pub_ptr = captured_pub.lock();
        if (!pub_ptr) {
          return;
        }
        static int32_t count = 0;
        std_msgs::msg::Int32::UniquePtr msg(new std_msgs::msg::Int32());
        msg->data = count++;
        printf(
          "Published message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
        pub_ptr->publish(std::move(msg));
      };
    timer_ = this->create_wall_timer(1s, callback);
  }

  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
  rclcpp::TimerBase::SharedPtr timer_;
};

// Node that consumes messages.
struct Consumer : public rclcpp::Node
{
  Consumer(const std::string & name, const std::string & input)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a subscription on the input topic which prints on receipt of new messages.
    sub_ = this->create_subscription<std_msgs::msg::Int32>(
      input,
      10,
      [](std_msgs::msg::Int32::UniquePtr msg) {
        printf(
          " Received message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
      });
  }

  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;
};

int main(int argc, char * argv[])
{
  setvbuf(stdout, NULL, _IONBF, BUFSIZ);
  rclcpp::init(argc, argv);
  rclcpp::executors::SingleThreadedExecutor executor;

  auto producer = std::make_shared<Producer>("producer", "number");
  auto consumer = std::make_shared<Consumer>("consumer", "number");

  executor.add_node(producer);
  executor.add_node(consumer);
  executor.spin();

  rclcpp::shutdown();

  return 0;
}

通过查看 main 函数,您可以看到,我们有一个生产者和一个消费者节点,我们将它们添加到单线程执行程序中,然后调用spin。 [待校准@7503]

如果您查看 Producer 结构中 "producer" 节点的实现,您可以看到,我们已经创建了一个发布 "number" 话题的出版商,以及一个周期调用y的计时器创建了一条新消息,打印出其在内存中的地址及其内容的值,然后将其发布。 [待校准@7504]

consumer 节点稍微简单一点,你可以在 Consumer 结构中看到它的实现,因为它只订阅 "number" 话题,并打印它收到的消息的地址和值。 [小鱼@7505]

预期是生产者将打印出地址和价值,而消费者将打印出匹配的地址和价值。这个例程表明进程内通讯确实有效,并且避免了不必要的拷贝,至少对于简单的图表是这样。 [待校准@7506]

让我们通过执行 ros2 run intra_process_demo two_node_pipeline 可执行文件来运行例程 (不要忘记先源文件安装文件): [待校准@7507]

$ ros2 run intra_process_demo two_node_pipeline
Published message with value: 0, and address: 0x7fb02303faf0
Published message with value: 1, and address: 0x7fb020cf0520
 Received message with value: 1, and address: 0x7fb020cf0520
Published message with value: 2, and address: 0x7fb020e12900
 Received message with value: 2, and address: 0x7fb020e12900
Published message with value: 3, and address: 0x7fb020cf0520
 Received message with value: 3, and address: 0x7fb020cf0520
Published message with value: 4, and address: 0x7fb020e12900
 Received message with value: 4, and address: 0x7fb020e12900
Published message with value: 5, and address: 0x7fb02303cea0
 Received message with value: 5, and address: 0x7fb02303cea0
[...]

您会注意到的一件事是,消息以每秒约1的速度滴答作响。这是因为我们告诉计时器大约每秒触发一次。 [待校准@7508]

此外,您可能已经注意到第一条消息 (值为 0 ) 没有相应的 “接收消息...” 行。这是因为发布/订阅是 "best effort" ,我们没有启用任何类似 "latching" 的行为。这意味着,如果发布者在订阅建立之前发布消息,则订阅将不会收到该消息。此竞争条件可能导致前几条消息丢失。在这种情况下,由于它们每秒只出现一次,通常只有第一条消息丢失。 [待校准@7509]

最后,您可以看到 “已发布消息... " and " 收到消息...” 具有相同值的行也具有相同的地址。这表明正在接收的消息的地址与发布的地址相同,并且不是副本。这是因为我们发布并订阅了 “std::unique_ptr” s,它允许消息的所有权在系统中安全地移动。您也可以使用 “const & `` and `` std::shared_ptr” 发布和订阅,但在这种情况下不会发生零复制。 [待校准@7510]

循环管道例程 [待校准@7511]

此例程与前一个例程相似,但是该例程只使用一个消息实例,而不是生产者为每次迭代创建新消息。这是通过在图表中创建一个循环来实现的,并且通过在旋转执行程序之前外部发布其中一个节点来实现 "kicking off" 通讯: [待校准@7512]

https://github.com/ros2/例程/blob/狡猾/进程内 _ 例程/src/cyclic_pipeline/cyclic_pipeline.cpp [待校准@7513]

#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

using namespace std::chrono_literals;

// This node receives an Int32, waits 1 second, then increments and sends it.
struct IncrementerPipe : public rclcpp::Node
{
  IncrementerPipe(const std::string & name, const std::string & in, const std::string & out)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a publisher on the output topic.
    pub = this->create_publisher<std_msgs::msg::Int32>(out, 10);
    std::weak_ptr<std::remove_pointer<decltype(pub.get())>::type> captured_pub = pub;
    // Create a subscription on the input topic.
    sub = this->create_subscription<std_msgs::msg::Int32>(
      in,
      10,
      [captured_pub](std_msgs::msg::Int32::UniquePtr msg) {
        auto pub_ptr = captured_pub.lock();
        if (!pub_ptr) {
          return;
        }
        printf(
          "Received message with value:         %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
        printf("  sleeping for 1 second...\n");
        if (!rclcpp::sleep_for(1s)) {
          return;    // Return if the sleep failed (e.g. on ctrl-c).
        }
        printf("  done.\n");
        msg->data++;    // Increment the message's data.
        printf(
          "Incrementing and sending with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
        pub_ptr->publish(std::move(msg));    // Send the message along to the output topic.
      });
  }

  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub;
  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub;
};

int main(int argc, char * argv[])
{
  setvbuf(stdout, NULL, _IONBF, BUFSIZ);
  rclcpp::init(argc, argv);
  rclcpp::executors::SingleThreadedExecutor executor;

  // Create a simple loop by connecting the in and out topics of two IncrementerPipe's.
  // The expectation is that the address of the message being passed between them never changes.
  auto pipe1 = std::make_shared<IncrementerPipe>("pipe1", "topic1", "topic2");
  auto pipe2 = std::make_shared<IncrementerPipe>("pipe2", "topic2", "topic1");
  rclcpp::sleep_for(1s);  // Wait for subscriptions to be established to avoid race conditions.
  // Publish the first message (kicking off the cycle).
  std::unique_ptr<std_msgs::msg::Int32> msg(new std_msgs::msg::Int32());
  msg->data = 42;
  printf(
    "Published first message with value:  %d, and address: 0x%" PRIXPTR "\n", msg->data,
    reinterpret_cast<std::uintptr_t>(msg.get()));
  pipe1->pub->publish(std::move(msg));

  executor.add_node(pipe1);
  executor.add_node(pipe2);
  executor.spin();

  rclcpp::shutdown();

  return 0;
}

与以前的例程不同,此例程仅使用一个节点,该节点使用不同的名称和配置实例化两次。图表最终是 pipe1 -> pipe2 -> pipe1 ...循环。 [待校准@7514]

“pipe1-> 发布 (msg);” 行启动流程,但是从那时起,通过在自己的订阅调用中发布的每个调用在节点之间来回传递消息。 [待校准@7515]

这里的期望是节点每秒来回传递消息一次,每次递增消息的值。因为消息作为 unique_ptr 被发布和订阅,所以在开始时创建的相同消息被持续使用。 [待校准@7516]

为了测试这些期望,让我们运行它: [待校准@7517]

$ ros2 run intra_process_demo cyclic_pipeline
Published first message with value:  42, and address: 0x7fd2ce0a2bc0
Received message with value:         42, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 43, and address: 0x7fd2ce0a2bc0
Received message with value:         43, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 44, and address: 0x7fd2ce0a2bc0
Received message with value:         44, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 45, and address: 0x7fd2ce0a2bc0
Received message with value:         45, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 46, and address: 0x7fd2ce0a2bc0
Received message with value:         46, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 47, and address: 0x7fd2ce0a2bc0
Received message with value:         47, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
[...]

从42开始,您应该会在每次迭代中看到不断增加的数字...因为42,并且在整个过程中,它重复使用相同的消息,因为指针地址所定义的例程不会改变,这避免了不必要的复制。 [待校准@7518]

图像管道例程 [待校准@7519]

在这个例程中,我们将使用OpenCV来捕获、注释,然后查看图像。 [待校准@7520]

注解

如果您使用的是macOS,但这些示例不起作用,或者您收到了类似 ddsi_conn_write failed -1 的错误,那么您需要增加系统范围内的UDP数据包大小: [待校准@7521]

$ sudo sysctl -w net.inet.udp.recvspace=209715
$ sudo sysctl -w net.inet.udp.maxdgram=65500

重新启动后,这些更改将不会持续。 [待校准@7522]

简单管道 [待校准@7523]

首先,我们将有一个由三个节点组成的管道,排列如下: camera_node -> watermark_node -> image_view_node [待校准@7524]

[需手动修复的语法] camera_node 从你电脑上的相机设备 0 读取,在图像上写一些信息并发布。 watermark_node 签署了[需手动修复的语法] camera_node 的输出,并在出版前增加了更多的文本。最后, image_view_node 订阅了 watermark_node 的输出,在图像上写了更多的文本,然后用 “cv::imshow” 将其可视化。 [待校准@7525]

在每个节点中,正在发送或已经接收或两者的消息的地址被写入图像。水印和图像视图节点被设计为在不复制图像的情况下修改图像,因此只要节点处于相同的过程中,印在图像上的地址应该都是相同的如上所示,图表仍被组织在管道中。 [待校准@7526]

注解

在某些系统上 (我们已经在Linux上看到过),打印到屏幕上的地址可能不会改变。这是因为相同的唯一指针正在被重用。在这种情况下,管道仍在运行。 [待校准@7527]

让我们通过执行以下可执行文件来运行例程: [待校准@7528]

ros2 run intra_process_demo image_pipeline_all_in_one

你应该看到这样的东西: [待校准@7529]

您可以通过按空格键来暂停图像的渲染,也可以通过再次按空格键来恢复。你也可以按 qESC 退出。 [待校准@7530]

如果暂停图像查看器,您应该能够比较写在图像上的地址,并看到它们是相同的。 [待校准@7531]

带有两个图像查看器的管道 [待校准@7532]

现在让我们看一个和上面一样的例子,除了它有两个图像视图节点。所有节点仍在同一进程中,但现在应显示两个图像视图窗口。(macOS用户注意: 您的图像视图窗口可能位于彼此之上)。让我们使用以下命令运行它: [待校准@7533]

ros2 run intra_process_demo image_pipeline_with_two_image_view

就像上一个示例一样,您可以使用空格键暂停渲染,然后再次按空格键继续。您可以停止更新以检查写入屏幕的指针。 [待校准@7534]

如您在上面的示例图像中看到的,对于前两个条目,我们有一个具有所有指针相同的图像,然后是另一个具有与第一个图像相同的指针的图像,但是第二个图像上的最后一个指针是不同的。要理解为什么会发生这种情况,请考虑图的拓扑结构: [待校准@7535]

camera_node -> watermark_node -> image_view_node
                              -> image_view_node2

[需手动修复的语法] camera_nodewatermark_node 之间的链接可以使用相同的指针而不需要复制,因为只有一个进程内订阅应该传递消息。但是对于 watermark_node 和两个图像视图节点之间的联系,这种关系是一对多的,因此,如果图像视图节点使用 unique_ptr 调用,那么就不可能将相同指针的所有权传递给两者。然而,它可以被交付给他们中的一个。哪一个会得到原始指针是未定义的,而是最后一个要传递的指针。 [待校准@7536]

请注意,图像视图节点未订阅 unique_ptr 调用back。相反,他们订阅了 const shared_ptr 。这意味着系统将相同的 shared_ptr 传递给两个调用。当处理第一个进程内订阅时,内部存储的 unique_ptr 被提升为 shared_ptr 。每个调用将获得同一消息的共享所有权。 [待校准@7537]

带有进程间查看器的管道 [待校准@7538]

正确的另一件重要事情是避免在进行进程间订阅时中断进程内零复制行为。为了测试这一点,我们可以运行第一个图像管道例程, image_pipeline_all_in_one ,然后运行独立 image_view_node 的实例 (不要忘记在终端中用 ros2 run intra_process_demo 作为前缀)。这看起来像这样: [待校准@7539]

很难同时暂停两个图像,因此图像可能不会对齐,但需要注意的重要一点是, image_pipeline_all_in_one 图像视图为每个步骤显示相同的地址。这意味着即使订阅了外部视图,进程内的零副本也会保留。您还可以看到,进程间图像视图对于前两行文本和第三行文本中独立图像查看器的进程账号具有不同的进程账号。 [待校准@7540]