编写动作服务器和客户端 (Python) [待校准@6825]

Goal目标: Python在Python中实现动作服务器和客户端。 [待校准@6826]

Tutorial教程级别: Intermediate中级 [待校准@6713]

时间: 15分钟 [Alyssa@6755]

背景

[需手动修复的语法]Action是ROS 2中异步通讯的一种形式。* [需手动修复的语法]Action客户端 * 向 * 动作服务器 * 发送目标请求。* [需手动修复的语法]Action服务器 * 向 * 动作客户端 * 发送目标反馈和结果。 [待校准@6827]

先决条件

您将需要上一教程 “ 创建动作 [待校准@6711] ” 中定义的 action_tutorials_interfaces 包和 Fibonacci.action 接口。 [待校准@6757]

任务

1编写动作服务器 [待校准@6828]

让我们集中精力编写一个动作服务器,使用我们在 创建动作 [待校准@6711] 教程中创建的动作来计算斐波那契序列。 [待校准@6766]

到目前为止,您已经创建了包,并使用 ros2 run 运行您的节点。但是,为了在本教程中保持简单,我们将把动作服务器的范围限定为一个文件。如果你想看看动作教程的完整包是什么样子的,请查看 action_tutorials[待校准@6829]

在您的主目录中打开一个新文件,让我们调用它 fibonacci_action_server.py ,并添加以下代码: [待校准@6830]

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

第8行定义了 FibonacciActionServer 类,它是 Node 的子类。通过调用 Node 构造函数来初始化类,命名我们的节点 fibonacci_action_server : [待校准@6831]

        super().__init__('fibonacci_action_server')

在构造函数中,我们还实例化一个新的动作服务器: [待校准@6832]

        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

一个动作服务器需要四个参数: [待校准@6833]

  1. 一个ROS 2节点,用于将动作客户端添加到: self[待校准@6834]

  2. 动作类型: Fibonacci (第5行输入)。 [待校准@6835]

  3. 动作名称: “'fibonacci'”。 [待校准@6776]

  4. 用于执行可接受目标的调用返回函数: self.execute_callback 。此调用必须返回动作类型的结果消息。 [待校准@6836]

我们还在课堂上定义了一种 execute_callback 方法: [待校准@6837]

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result

一旦目标被接受,将调用此方法来执行目标。 [待校准@6838]

让我们尝试运行我们的动作服务器: [待校准@6839]

python3 fibonacci_action_server.py

在另一个终端中,我们可以使用命令行界面发送目标: [待校准@6840]

ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

在运行动作服务器的终端中,您应该会看到一条记录的消息 “正在执行目标...”,然后是未设置目标状态的警告。默认情况下,如果在execute调用中没有设置目标句柄状态,它将假定 * 中止 * 状态。 [待校准@6841]

我们可以使用目标手柄上的方法 succeed() 来表明目标是成功的: [待校准@6842]

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        goal_handle.succeed()
        result = Fibonacci.Result()
        return result

现在,如果您重新启动动作服务器并发送另一个目标,您应该会看到目标以 SUCCEEDED 状态完成。 [待校准@6843]

现在,让我们的目标执行实际计算并返回请求的斐波那契序列: [待校准@6844]

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            sequence.append(sequence[i] + sequence[i-1])

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = sequence
        return result

计算序列后,我们将其分配给result message字段,然后再返回。 [待校准@6845]

再次,重新启动动作服务器并发送另一个目标。你应该看到目标以正确的结果序列结束。 [待校准@6846]

1.2发布反馈 [待校准@6847]

动作的好处之一是能够在目标执行期间向动作客户提供反馈。我们可以通过调用目标手柄的 publish_feedback() 方法,让我们的动作服务器为动作客户端发布反馈。 [待校准@6848]

我们将替换 sequence 变量,并使用反馈消息来存储序列。每次更新for循环中的反馈消息后,我们都会发布反馈消息并休眠以获得戏剧性效果: [待校准@6849]

import time

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.partial_sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            feedback_msg.partial_sequence.append(
                feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = feedback_msg.partial_sequence
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

重新启动动作服务器后,我们可以通过使用带有 --feedback 选项的命令行工具来确认反馈现在已发布: [待校准@6850]

ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

2编写动作客户端 [待校准@6851]

我们还将把动作客户端的范围限定为一个文件。打开一个新文件,让我们调用它 fibonacci_action_client.py ,并添加以下样板代码: [待校准@6852]

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    future = action_client.send_goal(10)

    rclpy.spin_until_future_complete(action_client, future)


if __name__ == '__main__':
    main()

我们已经定义了 FibonacciActionClient 类,它是 Node 的子类。通过调用 Node 构造函数来初始化类,命名我们的节点 fibonacci_action_client : [待校准@6853]

        super().__init__('fibonacci_action_client')

同样在类构造函数中,我们使用上一个关于 创建动作 [待校准@6711] 的教程中的自定义动作定义创建了一个动作客户端: [待校准@6854]

        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

我们通过传递三个参数来创建 ActionClient : [待校准@6855]

  1. 一个ROS 2节点,用于将动作客户端添加到: self [待校准@6856]

  2. 动作类型: Fibonacci [待校准@6857]

  3. 动作名称: “'fibonacci'” [待校准@6858]

我们动作客户端不能与动作服务器相同动作名称和类型。 [待校准@6859]

我们还定义方法 send_goalFibonacciActionClient 类: [待校准@6860]

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)

此方法等待动作服务器可用,然后向服务器发送目标。它返回了我们以后可以等待的未来。 [待校准@6861]

在类定义之后,我们定义了一个函数 main() ,该函数初始化ROS 2并创建我们的 FibonacciActionClient 节点的实例。然后它发送一个目标,并等待直到该目标完成。 [待校准@6862]

最后,我们在Python程序的入口调用 main()[待校准@6863]

让我们先运行之前构建的动作服务器来测试我们的动作客户端: [待校准@6864]

python3 fibonacci_action_server.py

在另一个终端中,运行动作客户端: [待校准@6865]

python3 fibonacci_action_client.py

当动作服务器成功执行目标时,您应该会看到它打印的消息: [待校准@6866]

[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
# etc.

动作客户端应该启动,然后快速完成。此时,我们有一个功能正常的动作客户端,但是我们没有看到任何结果或得到任何反馈。 [待校准@6867]

2.1得到结果 [待校准@6868]

所以我们可以发送一个目标,但是我们怎么知道它什么时候完成呢?我们可以通过几个步骤获得结果信息。首先,我们需要为我们发送的目标获得一个目标句柄。然后,我们可以使用目标句柄来请求结果。 [待校准@6869]

以下是此示例的完整代码: [待校准@6870]

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

[需手动修复的语法] ActionClient.send_goal_async() 法返回目标句柄的未来。首先,我们注册一个对未来完成时的调用: [待校准@6871]

        self._send_goal_future.add_done_callback(self.goal_response_callback)

请注意,当动作服务器接受或拒绝目标请求时,未来就完成了。让我们更详细地看看 goal_response_callback 。我们可以检查目标是否被拒绝,并尽早返回,因为我们知道不会有结果: [待校准@6872]

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

现在我们有了一个目标句柄,我们可以用它来请求方法 get_result_async() 的结果。类似于发送目标,我们将得到一个未来,当结果准备好时就会完成。让我们注册一个调用,就像我们对目标响应所做的那样: [待校准@6873]

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

在调用back时,我们记录结果序列并关闭ROS 2以进行干净退出: [待校准@6874]

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

通过在单独终端中运行动作服务器,请继续尝试运行我们的斐波那契动作客户端! [待校准@6875]

python3 fibonacci_action_client.py

您应该会看到被接受目标和最终结果的记录消息。 [待校准@6876]

2.2获得反馈 [待校准@6877]

我们的动作客户端可以发送目标。不错!但是如果我们能从动作服务器得到一些关于我们发送的目标的反馈,那就太好了。 [待校准@6878]

以下是此示例的完整代码: [待校准@6870]

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

以下是反馈消息的调用返回函数: [待校准@6879]

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))

在调用back中,我们得到消息的反馈部分,并将 partial_sequence 字段打印到屏幕上。 [待校准@6880]

我们需要将调用注册回动作客户端。这是通过在我们发送目标时另外将调用传递回动作客户端来实现的: [待校准@6881]

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

我们都准备好了。如果我们运行我们的动作客户端,你应该会看到反馈被打印到屏幕上。 [待校准@6882]

总结

在本教程中,您将Python动作服务器和动作客户端逐行组合在一起,并将它们配置为交换目标、反馈和结果。 [待校准@6883]