编写动作服务器和客户端 (Python) [待校准@6825]
Goal目标: Python在Python中实现动作服务器和客户端。 [待校准@6826]
Tutorial教程级别: Intermediate中级 [待校准@6713]
时间: 15分钟 [Alyssa@6755]
背景
[需手动修复的语法]Action是ROS 2中异步通讯的一种形式。* [需手动修复的语法]Action客户端 * 向 * 动作服务器 * 发送目标请求。* [需手动修复的语法]Action服务器 * 向 * 动作客户端 * 发送目标反馈和结果。 [待校准@6827]
先决条件
您将需要上一教程 “ 创建动作 [待校准@6711] ” 中定义的 action_tutorials_interfaces
包和 Fibonacci.action
接口。 [待校准@6757]
任务
1编写动作服务器 [待校准@6828]
让我们集中精力编写一个动作服务器,使用我们在 创建动作 [待校准@6711] 教程中创建的动作来计算斐波那契序列。 [待校准@6766]
到目前为止,您已经创建了包,并使用 ros2 run
运行您的节点。但是,为了在本教程中保持简单,我们将把动作服务器的范围限定为一个文件。如果你想看看动作教程的完整包是什么样子的,请查看 action_tutorials 。 [待校准@6829]
在您的主目录中打开一个新文件,让我们调用它 fibonacci_action_server.py
,并添加以下代码: [待校准@6830]
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = Fibonacci.Result()
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
if __name__ == '__main__':
main()
第8行定义了 FibonacciActionServer
类,它是 Node
的子类。通过调用 Node
构造函数来初始化类,命名我们的节点 fibonacci_action_server
: [待校准@6831]
super().__init__('fibonacci_action_server')
在构造函数中,我们还实例化一个新的动作服务器: [待校准@6832]
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
一个动作服务器需要四个参数: [待校准@6833]
一个ROS 2节点,用于将动作客户端添加到:
self
。 [待校准@6834]动作类型:
Fibonacci
(第5行输入)。 [待校准@6835]动作名称: “'fibonacci'”。 [待校准@6776]
用于执行可接受目标的调用返回函数:
self.execute_callback
。此调用必须返回动作类型的结果消息。 [待校准@6836]
我们还在课堂上定义了一种 execute_callback
方法: [待校准@6837]
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = Fibonacci.Result()
return result
一旦目标被接受,将调用此方法来执行目标。 [待校准@6838]
让我们尝试运行我们的动作服务器: [待校准@6839]
python3 fibonacci_action_server.py
python3 fibonacci_action_server.py
python fibonacci_action_server.py
在另一个终端中,我们可以使用命令行界面发送目标: [待校准@6840]
ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
在运行动作服务器的终端中,您应该会看到一条记录的消息 “正在执行目标...”,然后是未设置目标状态的警告。默认情况下,如果在execute调用中没有设置目标句柄状态,它将假定 * 中止 * 状态。 [待校准@6841]
我们可以使用目标手柄上的方法 succeed() 来表明目标是成功的: [待校准@6842]
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
goal_handle.succeed()
result = Fibonacci.Result()
return result
现在,如果您重新启动动作服务器并发送另一个目标,您应该会看到目标以 SUCCEEDED
状态完成。 [待校准@6843]
现在,让我们的目标执行实际计算并返回请求的斐波那契序列: [待校准@6844]
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
sequence = [0, 1]
for i in range(1, goal_handle.request.order):
sequence.append(sequence[i] + sequence[i-1])
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = sequence
return result
计算序列后,我们将其分配给result message字段,然后再返回。 [待校准@6845]
再次,重新启动动作服务器并发送另一个目标。你应该看到目标以正确的结果序列结束。 [待校准@6846]
1.2发布反馈 [待校准@6847]
动作的好处之一是能够在目标执行期间向动作客户提供反馈。我们可以通过调用目标手柄的 publish_feedback() 方法,让我们的动作服务器为动作客户端发布反馈。 [待校准@6848]
我们将替换 sequence
变量,并使用反馈消息来存储序列。每次更新for循环中的反馈消息后,我们都会发布反馈消息并休眠以获得戏剧性效果: [待校准@6849]
import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]
for i in range(1, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
if __name__ == '__main__':
main()
重新启动动作服务器后,我们可以通过使用带有 --feedback
选项的命令行工具来确认反馈现在已发布: [待校准@6850]
ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
2编写动作客户端 [待校准@6851]
我们还将把动作客户端的范围限定为一个文件。打开一个新文件,让我们调用它 fibonacci_action_client.py
,并添加以下样板代码: [待校准@6852]
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
future = action_client.send_goal(10)
rclpy.spin_until_future_complete(action_client, future)
if __name__ == '__main__':
main()
我们已经定义了 FibonacciActionClient
类,它是 Node
的子类。通过调用 Node
构造函数来初始化类,命名我们的节点 fibonacci_action_client
: [待校准@6853]
super().__init__('fibonacci_action_client')
同样在类构造函数中,我们使用上一个关于 创建动作 [待校准@6711] 的教程中的自定义动作定义创建了一个动作客户端: [待校准@6854]
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
我们通过传递三个参数来创建 ActionClient
: [待校准@6855]
一个ROS 2节点,用于将动作客户端添加到:
self
[待校准@6856]动作类型:
Fibonacci
[待校准@6857]动作名称: “'fibonacci'” [待校准@6858]
我们动作客户端不能与动作服务器相同动作名称和类型。 [待校准@6859]
我们还定义方法 send_goal
在 FibonacciActionClient
类: [待校准@6860]
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)
此方法等待动作服务器可用,然后向服务器发送目标。它返回了我们以后可以等待的未来。 [待校准@6861]
在类定义之后,我们定义了一个函数 main()
,该函数初始化ROS 2并创建我们的 FibonacciActionClient
节点的实例。然后它发送一个目标,并等待直到该目标完成。 [待校准@6862]
最后,我们在Python程序的入口调用 main()
。 [待校准@6863]
让我们先运行之前构建的动作服务器来测试我们的动作客户端: [待校准@6864]
python3 fibonacci_action_server.py
python3 fibonacci_action_server.py
python fibonacci_action_server.py
在另一个终端中,运行动作客户端: [待校准@6865]
python3 fibonacci_action_client.py
python3 fibonacci_action_client.py
python fibonacci_action_client.py
当动作服务器成功执行目标时,您应该会看到它打印的消息: [待校准@6866]
[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
# etc.
动作客户端应该启动,然后快速完成。此时,我们有一个功能正常的动作客户端,但是我们没有看到任何结果或得到任何反馈。 [待校准@6867]
2.1得到结果 [待校准@6868]
所以我们可以发送一个目标,但是我们怎么知道它什么时候完成呢?我们可以通过几个步骤获得结果信息。首先,我们需要为我们发送的目标获得一个目标句柄。然后,我们可以使用目标句柄来请求结果。 [待校准@6869]
以下是此示例的完整代码: [待校准@6870]
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10)
rclpy.spin(action_client)
if __name__ == '__main__':
main()
[需手动修复的语法] ActionClient.send_goal_async() 法返回目标句柄的未来。首先,我们注册一个对未来完成时的调用: [待校准@6871]
self._send_goal_future.add_done_callback(self.goal_response_callback)
请注意,当动作服务器接受或拒绝目标请求时,未来就完成了。让我们更详细地看看 goal_response_callback
。我们可以检查目标是否被拒绝,并尽早返回,因为我们知道不会有结果: [待校准@6872]
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
现在我们有了一个目标句柄,我们可以用它来请求方法 get_result_async() 的结果。类似于发送目标,我们将得到一个未来,当结果准备好时就会完成。让我们注册一个调用,就像我们对目标响应所做的那样: [待校准@6873]
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
在调用back时,我们记录结果序列并关闭ROS 2以进行干净退出: [待校准@6874]
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
通过在单独终端中运行动作服务器,请继续尝试运行我们的斐波那契动作客户端! [待校准@6875]
python3 fibonacci_action_client.py
python3 fibonacci_action_client.py
python fibonacci_action_client.py
您应该会看到被接受目标和最终结果的记录消息。 [待校准@6876]
2.2获得反馈 [待校准@6877]
我们的动作客户端可以发送目标。不错!但是如果我们能从动作服务器得到一些关于我们发送的目标的反馈,那就太好了。 [待校准@6878]
以下是此示例的完整代码: [待校准@6870]
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10)
rclpy.spin(action_client)
if __name__ == '__main__':
main()
以下是反馈消息的调用返回函数: [待校准@6879]
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
在调用back中,我们得到消息的反馈部分,并将 partial_sequence
字段打印到屏幕上。 [待校准@6880]
我们需要将调用注册回动作客户端。这是通过在我们发送目标时另外将调用传递回动作客户端来实现的: [待校准@6881]
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
我们都准备好了。如果我们运行我们的动作客户端,你应该会看到反馈被打印到屏幕上。 [待校准@6882]
总结
在本教程中,您将Python动作服务器和动作客户端逐行组合在一起,并将它们配置为交换目标、反馈和结果。 [待校准@6883]