工作空间

创建工作空间

  • cd yahboomcar_ros2_ws
  • mkdir -p yahboomcar_ws_ptz/src
  • cd yahboomcar_ws_ptz

编译工作空间

  • colcon build
  • source install/setup.bash
  • echo "source ~/yahboomcar_ros2_ws/yahboomcar_ws/install/setup.bash" >> ~/.bashrc

ROS2 功能包

创建功能包命令

ros2 pkg create <package_name> --build-type <build-type> --dependencies <dependencies> --node-name <node-name>

  • cpp, ros2 pkg create pkg_helloworld_cpp --build-type ament_cmake --dependencies rclcpp --node-name helloworld
  • py, ros2 pkg create pkg_helloworld_py --build-type ament_python --dependencies rclpy --node-name helloworld

功能包简单流程

  • cd ~/yahboomcar_ros2_ws/yahboomcar_ws/src
  • ros2 pkg create pkg_helloworld_py --build-type ament_python --dependencies rclpy --node-name helloworld
  • cd ~/yahboomcar_ros2_ws/yahboomcar_ws
  • colcon build --packages-select pkg_helloworld_py
  • source install/setup.bash
  • ros2 run pkg_helloworld_py helloworld

自动生成的 helloworld.py 中,main 里面只有一条 print, 所以运行的时候,也只打印了一条字符串。

ros2 节点

把 helloworld.py 中引入 rclpy 的节点,再使用 logger 打印,然后 build, source, run,就可以看到循环打印了。

import rclpy                                        # ROS2 Python接口库
from rclpy.node import Node                         # ROS2 节点类
import time

"""
创建一个HelloWorld节点, 初始化时输出“hello world”日志
"""
class HelloWorldNode(Node):
    def __init__(self, name):
        super().__init__(name)                      # ROS2节点父类初始化
        while rclpy.ok():                           # ROS2系统是否正常运行
            self.get_logger().info("Hello World")   # ROS2日志输出
            time.sleep(0.5)                         # 休眠控制循环时间

def main(args=None):                                # ROS2节点主入口main函数
    rclpy.init(args=args)                           # ROS2 Python接口初始化
    node = HelloWorldNode("helloworld")             # 创建ROS2节点对象并进行初始化
    rclpy.spin(node)                                # 循环等待ROS2退出
    node.destroy_node()                             # 销毁节点对象
    rclpy.shutdown()                                # 关闭ROS2 Python接口
  • import rclpy from rclpy.node import Node
  • 开关 ros2 接口,rclpy.init(args=args) rclpy.shutdown()
  • 创建和销毁节点,node = HelloWorldNode("helloworld") node.destroy_node()
  • 等待节点退出,rclpy.spin(node)
  • 日志输出 self.get_logger().info("Hello World")
  • 检查 rclpy 正常, while rclpy.ok()

ros2 话题通信 publish

#导入rclpy库
import rclpy
from rclpy.node import Node
#导入String字符串消息
from std_msgs.msg import String 
#创建一个继承于Node基类的Topic_Pub节点子类 传入一个参数name
class Topic_Pub(Node):
    def __init__(self,name):
        super().__init__(name)
        #创建一个发布者,使用create_publisher的函数,传入的参数分别是:
        #话题数据类型、话题名称、保存消息的队列长度
        self.pub = self.create_publisher(String,"/topic_demo",1) 
        #创建一个定时器,间隔1s进入中断处理函数,传入的参数分别是:
        #中断函数执行的间隔时间,中断处理函数
        self.timer = self.create_timer(1,self.pub_msg)
    #定义中断处理函数
    def pub_msg(self):
        msg = String()  #创建一个String类型的变量msg
        msg.data = "Hi,I send a message." #给msg里边的data赋值
        self.pub.publish(msg) #发布话题数据

#主函数
def main():
    rclpy.init() #初始化
    pub_demo = Topic_Pub("publisher_node") #创建Topic_Pub类对象,传入的参数就是节点的名字
    rclpy.spin(pub_demo)    #执行rclpy.spin函数,里边传入一个参数,参数是刚才创建好的Topic_Pub类对象
    pub_demo.destroy_node()    #销毁节点对象
    rclpy.shutdown()        #关闭ROS2 Python接口
  • 创建 publisher, self.pub = self.create_publisher(String,"/topic_demo",1)
  • 发布话题数据,self.pub.publish(msg) #发布话题数据
  • 创建定时器,self.timer = self.create_timer(1,self.pub_msg)

调试:

  • ros2 run pkg_topic publisher_demo
  • ros2 topic list
  • ros2 topic echo /topic_demo

ros2 话题通信 subscriber

#导入相关的库
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Topic_Sub(Node):
    def __init__(self,name):
        super().__init__(name)  
        #创建订阅者使用的是create_subscription,传入的参数分别是:话题数据类型,话题名称,回调函数名称,队列长度
        self.sub = self.create_subscription(String,"/topic_demo",self.sub_callback,1) 
    #回调函数执行程序:打印接收的到信息
    def sub_callback(self,msg):
        print(msg.data)
def main():
    rclpy.init() #ROS2 Python接口初始化
    sub_demo = Topic_Sub("subscriber_node") # 创建对象并进行初始化
    rclpy.spin(sub_demo)
    sub_demo.destroy_node() #销毁节点对象
    rclpy.shutdown()        #关闭ROS2 Python接口
// setup.py
'subscriber_demo = pkg_topic.subscriber_demo:main'
  • 创建 subscriber, self.sub = self.create_subscription(String,"/topic_demo",self.sub_callback,1)

调试:

  • ros2 run pkg_topic publisher_demo
  • ros2 run pkg_topic subscriber_demo

ros2 服务通信 server

#导入相关的库文件
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class Service_Server(Node):
    def __init__(self,name):
        super().__init__(name)
        #创建一个服务端,使用的是create_service函数,传入的参数分别是:
        #服务数据的数据类型、服务的名称,服务回调函数(也就是服务的内容)
        self.srv = self.create_service(AddTwoInts, '/add_two_ints', self.Add2Ints_callback)
    #这里的服务回调函数的内容是把两个整型数相加,然后返回相加的结果    
    def Add2Ints_callback(self,request,response):
        response.sum = request.a + request.b
        print("response.sum = ",response.sum)
        return response
def main():
    rclpy.init()
    server_demo = Service_Server("publisher_node")
    rclpy.spin(server_demo)
    server_demo.destroy_node()      # 销毁节点对象
    rclpy.shutdown()                # 关闭ROS2 Python接口
  • 创建服务 self.srv = self.create_service(AddTwoInts, '/add_two_ints', self.Add2Ints_callback)

服务相关:

  • 查看服务接口, ros2 interface show example_interfaces/srv/AddTwoInts
  • 查看服务路径, ros2 pkg prefix example_interfaces
  • 服务 AddTwoInts 在 /opt/ros/foxy/share/example_interfaces/srv/AddTwoInts.srv

调试:

  • ros2 run pkg_service server_demo
  • ros2 service list
  • ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 1,b: 4}" 注意, : 后面需要空格,否则报错。

ros2 服务通信 client

#导入相关的库
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class Service_Client(Node):
    def __init__(self,name):
        super().__init__(name)
        #创建客户端,使用的是create_client函数,传入的参数是服务数据的数据类型、服务的话题名称
        self.client = self.create_client(AddTwoInts,'/add_two_ints')
        # 循环等待服务器端成功启动
        while not self.client.wait_for_service(timeout_sec=1.0):
            print("service not available, waiting again...")
        # 创建服务请求的数据对象
        self.request = AddTwoInts.Request()

    def send_request(self): 
        self.request.a = 10
        self.request.b = 90
        #发送服务请求
        self.future = self.client.call_async(self.request)

def main():
    rclpy.init() #节点初始化
    service_client = Service_Client("client_node") #创建对象
    service_client.send_request() #发送服务请求
    while rclpy.ok():
        rclpy.spin_once(service_client)
        #判断数据是否处理完成
        if service_client.future.done():
            try:
                #获得服务反馈的信息并且打印
                response = service_client.future.result()
                print("service_client.request.a = ",service_client.request.a)
                print("service_client.request.b = ",service_client.request.b)
                print("Result = ",response.sum)
            except Exception as e:
                service_client.get_logger().info('Service call failed %r' % (e,))
        break
    service_client.destroy_node()
    rclpy.shutdown()
  • 等待服务启动成功,self.client.wait_for_service(timeout_sec=1.0)
  • self.request = AddTwoInts.Request()
  • self.future = self.client.call_async(self.request)
  • if service_client.future.done():
  • response = service_client.future.result()

调试:

  • ros2 run pkg_service server_demo
  • ros2 run pkg_service client_demo

ros2 动作通讯 server

接口功能包

  • cd ~/yahboomcar_ros2_ws/yahboomcar_ws/src
  • ros2 pkg create --build-type ament_cmake pkg_interfaces
  • mkdir pkg_interfaces/action
  • vim pkg_interfaces/action/Progress.action 新建
    # 目标(Goal)部分
    int64 num
    ---
    # 结果(Result)部分
    int64 sum
    ---
    # 反馈(Feedback)部分
    float64 progress
  • vim pkg_interfaces/package.xml 添加
    <buildtool_depend>rosidl_default_generators</buildtool_depend>
    <depend>action_msgs</depend>
    <member_of_group>rosidl_interface_packages</member_of_group>
    • <buildtool_depend>rosidl_default_generators</buildtool_depend>:用于生成接口代码的工具。rosidl_default_generators 是 ROS 2 的一个工具包,用于从接口定义文件(如 .msg、.srv、.action)生成相应的 Python、C++ 等语言的代码。
    • <depend>action_msgs</depend>:动作接口的运行时依赖。action_msgs 是 ROS 2 提供的一个标准包,包含了动作(Action)接口的基本消息类型(如 GoalStatus、GoalInfo 等)。
    • <member_of_group>rosidl_interface_packages</member_of_group>:标记当前包为接口包。rosidl_interface_packages 是 ROS 2 中的一个组,用于标识包含接口定义(如 .msg、.srv、.action)的包。
  • vim pkg_interfaces/CMakeLists.txt 添加
    find_package(rosidl_default_generators REQUIRED)
    rosidl_generate_interfaces(${PROJECT_NAME}
    "action/Progress.action"
    )
    • find_package(rosidl_default_generators REQUIRED) 查找并加载 rosidl_default_generators 包。rosidl_default_generators 是 ROS 2 提供的一个工具包,用于从接口定义文件(如 .msg、.srv、.action)生成相应的 Python、C++ 等语言的代码。
    • rosidl_generate_interfaces(${PROJECT_NAME} "action/Progress.action") 生成接口代码。它会根据指定的接口定义文件(如 Progress.action),生成相应的 Python、C++ 等语言的代码。生成的代码会被放置在 install 目录中,供其他包使用。
  • colcon build --packages-select pkg_interfaces
  • source install/setup.bash
  • ros2 interface show pkg_interfaces/action/Progress

服务端

  • ros2 pkg create pkg_action --build-type ament_python --dependencies rclpy pkg_interfaces --node-name action_server_demo 注意 --dependencies 后面需要添加 pkg_interfaces
import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from pkg_interfaces.action import Progress
class Action_Server(Node):
    def __init__(self):
        super().__init__('progress_action_server')
        # 创建动作服务端
        self._action_server = ActionServer(self, Progress, 'get_sum', self.execute_callback)
        self.get_logger().info('动作服务已经启动!')
    def execute_callback(self, goal_handle):
        self.get_logger().info('开始执行任务....')
        # 生成连续反馈;
        feedback_msg = Progress.Feedback()
        sum = 0
        for i in range(1, goal_handle.request.num + 1):
            sum += i
            feedback_msg.progress = i / goal_handle.request.num
            self.get_logger().info('连续反馈: %.2f' % feedback_msg.progress)
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)
        # 生成最终响应。
        goal_handle.succeed()
        result = Progress.Result()
        result.sum = sum
        self.get_logger().info('任务完成!')
        return result
def main(args=None):
    rclpy.init(args=args)
    # 调用spin函数,并传入节点对象
    Progress_action_server = Action_Server()
    rclpy.spin(Progress_action_server)
    Progress_action_server.destroy_node() 
    # 释放资源
    rclpy.shutdown()
  • from rclpy.action import ActionServer from pkg_interfaces.action import Progress
  • self._action_server = ActionServer(self, Progress, 'get_sum', self.execute_callback)
  • feedback_msg = Progress.Feedback() 用于存储动作执行过程中的反馈数据。Progress.Feedback() 是根据动作接口文件 Progress.action 生成的反馈消息类型。
  • goal_handle.request.num 客户端发送的目标数据。goal_handle 是动作服务器的目标句柄,request 是客户端发送的目标请求对象。num 是目标请求中的一个字段,表示客户端发送的整数。
  • feedback_msg.progress = i / goal_handle.request.num 计算当前任务的进度,并将其存储在反馈消息中。feedback_msg.progress 是反馈消息中的一个字段,表示任务完成的进度(通常是百分比)。
  • goal_handle.publish_feedback(feedback_msg) 发布反馈消息给客户端。客户端可以通过回调函数接收到这些反馈消息,用于实时监控任务进度。
  • goal_handle.succeed() 标记动作执行成功。调用 succeed() 后,动作服务器会向客户端返回结果,并结束动作执行。
  • result = Progress.Result() 创建一个 Result 对象,用于存储动作执行的结果数据。 Progress.Result() 是根据动作接口文件 Progress.action 生成的结果消息类型。

调试:

  • colcon build --packages-select pkg_action
  • ros2 run pkg_action action_server_demo
  • ros2 action list
  • ros2 action send_goal /get_sum pkg_interfaces/action/Progress num:\ 10\

客户端

  • action_server_demo.py 同级目录下新建文件 action_client_demo.py
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from pkg_interfaces.action import Progress
class Action_Client(Node):
    def __init__(self):
        super().__init__('progress_action_client')
        # 创建动作客户端;
        self._action_client = ActionClient(self, Progress, 'get_sum')
    def send_goal(self, num):
        # 发送请求;
        goal_msg = Progress.Goal()
        goal_msg.num = num
        self._action_client.wait_for_server()
        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
        self._send_goal_future.add_done_callback(self.goal_response_callback)
    def goal_response_callback(self, future):
        # 处理目标发送后的反馈;
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('请求被拒绝')
            return
        self.get_logger().info('请求被接收,开始执行任务!')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)
    # 处理最终响应。
    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('最终计算结果:sum = %d' % result.sum)
        # 5.释放资源。
        rclpy.shutdown()
    # 处理连续反馈;
    def feedback_callback(self, feedback_msg):
        feedback = (int)(feedback_msg.feedback.progress * 100)
        self.get_logger().info('当前进度: %d%%' % feedback)
def main(args=None):
    rclpy.init(args=args)
    action_client = Action_Client()
    action_client.send_goal(10)
    rclpy.spin(action_client)
  • self._action_client = ActionClient(self, Progress, 'get_sum')
  • self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
    • 异步发送目标请求给动作服务器。
    • send_goal_async 是动作客户端的方法,用于发送目标请求并返回一个 Future 对象。
    • goal_msg 是客户端发送的目标消息。
    • feedback_callback=self.feedback_callback 是可选参数,用于指定接收反馈消息的回调函数。
  • self._send_goal_future.add_done_callback(self.goal_response_callback)
    • 为目标发送操作添加一个回调函数,当目标发送完成时调用。
    • self.goal_response_callback 是回调函数,用于处理目标发送的结果(例如目标是否被服务器接受)。
  • feedback = (int)(feedback_msg.feedback.progress * 100)
    • 从反馈消息中提取进度信息,并将其转换为整数百分比。
    • feedback_msg.feedback.progress 是反馈消息中的进度字段(通常是 0 到 1 之间的浮点数)。
  • goal_handle = future.result()
    • future 是目标发送操作(send_goal_async)返回的 Future 对象。
    • goal_handle 是目标句柄,用于跟踪目标的状态(例如是否被接受、是否完成)。
  • if not goal_handle.accepted 检查目标是否被动作服务器接受。
  • self._get_result_future = goal_handle.get_result_async()
    • 异步获取动作执行的结果。
    • get_result_async 是目标句柄的方法,用于异步获取结果并返回一个 Future 对象。
  • self._get_result_future.add_done_callback(self.get_result_callback) 为获取结果操作添加一个回调函数,当结果获取完成时调用。
  • result = future.result().result 从 Future 对象中提取动作执行的结果。future.result() 返回一个包含结果的对象,result 字段是实际的结果数据。

调试:

  • ros2 run pkg_action action_server_demo
  • ros2 run pkg_action action_client_demo

自定义接口

话题通信接口

  • mkdir pkg_interfaces/msg
  • vim pkg_interfaces/msg/Person.msg
    string   name
    int32    age
    float64  height
  • pkg_interfaces/package.xml
    <buildtool_depend>rosidl_default_generators</buildtool_depend>
    <exec_depend>rosidl_default_runtime</exec_depend>
    <depend>action_msgs</depend>
    <member_of_group>rosidl_interface_packages</member_of_group>
  • pkg_interfaces/CMakeLists.txt
    find_package(rosidl_default_generators REQUIRED)
    rosidl_generate_interfaces(${PROJECT_NAME}
    "action/Progress.action"
    "msg/Person.msg"
    )
  • colcon build --packages-select pkg_interfaces
  • source install/setup.bash
  • ros2 interface show pkg_interfaces/msg/Person

服务通信接口

  • mkdir -p pkg_interfaces/srv
  • vim pkg_interfaces/srv/Add.srv
    int32 num1
    int32 num2
    ---
    int32 sum
  • colcon build --packages-select pkg_interfaces
  • source ../install/setup.bash
  • ros2 interface show pkg_interfaces/srv/Add

参数服务

介绍

  • ros2 param list
  • 参数查询与修改
    ros2 param describe turtlesim background_b   # 查看某个参数的描述信息
    ros2 param get turtlesim background_b        # 查询某个参数的值
    ros2 param set turtlesim background_b 10     # 修改某个参数的值
  • 参数文件保存与加载
    ros2 param dump turtlesim >> turtlesim.yaml  # 将某个节点的参数保存到参数文件中
    ros2 param load turtlesim turtlesim.yaml     # 一次性加载某一个文件中的所有参数

参数实例

新建功能包

  • ros2 pkg create pkg_param --build-type ament_python --dependencies rclpy --node-name param_demo

代码

import rclpy                    # ROS2 Python接口库
from rclpy.node import Node     # ROS2 节点类
class ParameterNode(Node):
    def __init__(self, name):
        super().__init__(name)                                      # ROS2节点父类初始化
        self.timer = self.create_timer(2, self.timer_callback)      # 创建一个定时器(单位为秒的周期,定时执行的回调函数)
        self.declare_parameter('robot_name', 'muto')                # 创建一个参数,并设置参数的默认值
    def timer_callback(self):                                       # 创建定时器周期执行的回调函数
        robot_name_param = self.get_parameter('robot_name').get_parameter_value().string_value  # 从ROS2系统中读取参数的值
        self.get_logger().info('Hello %s!' % robot_name_param)      # 输出日志信息,打印读取到的参数值
def main(args=None):                                    # ROS2节点主入口main函数
    rclpy.init(args=args)                               # ROS2 Python接口初始化
    node = ParameterNode("param_declare")               # 创建ROS2节点对象并进行初始化
    rclpy.spin(node)                                    # 循环等待ROS2退出
    node.destroy_node()                                 # 销毁节点对象
    rclpy.shutdown()                                    # 关闭ROS2 Python接口
  • self.declare_parameter('robot_name', 'muto') 创建参数
  • robot_name_param = self.get_parameter('robot_name').get_parameter_value().string_value 读取参数

编译运行

  • colcon build --packages-select pkg_param
  • source install/setup.bash
  • ros2 run pkg_param param_demo
  • ros2 param set param_declare robot_name robot

元功能包

  • ros2 pkg create pkg_metapackage
  • 修改 package.xml
    <exec_depend>pkg_interfaces</exec_depend>
    <exec_depend>pkg_helloworld_py</exec_depend>
    <exec_depend>pkg_topic</exec_depend>
    <exec_depend>pkg_service</exec_depend>
    <exec_depend>pkg_action</exec_depend>
    <exec_depend>pkg_param</exec_depend>
  • colcon build --packages-select pkg_metapackage

分布式

实现

  • 小车 ./run_docker.sh, ros2 run demo_nodes_py talker
  • 虚拟机 ros2 run demo_nodes_py listener
  • 需要确保 bashrc 中的 ROS_DOMAIN_ID 相同。

取值

  • 建议ROS_DOMAIN_ID的取值在[0,101] 之间,包含0和101
  • 每个域ID内的节点总数是有限制的,需要小于等于120个
  • 如果域ID为101,那么该域的节点总数需要小于等于54个

计算规则

  • DDS协议规定以7400作为起始端口,也即可用端口为[7400,65535],又已知按照DDS协议默认情况下,每个域ID占用250个端口,那么域ID的个数为:(65535-7400)/250 = 232(个),对应的其取值范围为[0,231];
  • 在Linux下,可用的域ID为[0,101]与[215-231],在Windows和Mac中可用的域ID为[0,166],综上,为了兼容多平台,建议域ID在[0,101] 范围内取值。
  • 每个域ID默认占用250个端口,且每个ROS2节点需要占用两个端口,另外,按照DDS协议每个域ID的端口段内,第1、2个端口是Discovery Multicast端口与User Multicast端口,从第11、12个端口开始是域内第一个节点的Discovery Unicast端口与User Unicast,后续节点所占用端口依次顺延,那么一个域ID中的最大节点个数为:(250-10)/2 = 120(个)
  • 特殊情况:域ID值为101时,其后半段端口属于操作系统的预留端口,其节点最大个数为54个。

DDS

QoS

  • DEADLINE策略,表示通信数据必须要在每次截止时间内完成一次通信;
  • HISTORY策略,表示针对历史数据的一个缓存大小;
  • RELIABILITY策略,表示数据通信的模式,配置成BEST_EFFORT,就是尽力传输模式,网络情况不好的时候,也要保证数据流畅,此时可能会导致数据丢失,配置成RELIABLE,就是可信赖模式,可以在通信中尽量保证图像的完整性,我们可以根据应用功能场景选择合适的通信模式;
  • DURABILITY策略,可以配置针对晚加入的节点,也保证有一定的历史数据发送过去,可以让新节点快速适应系统。

ROS2时间相关API

rate

rclpy 中的 Rate 对象可以通过节点创建,Rate 对象的 sleep() 函数需要在子线程中执行,否咋会阻塞程序。

import rclpy
import threading
from rclpy.timer import Rate
rate = None
node = None
def do_some():
    global rate
    global node
    while rclpy.ok():
        node.get_logger().info("hello ---------")
        # 休眠
        rate.sleep()
def main():
    global rate
    global node
    rclpy.init()
    node = rclpy.create_node("rate_demo")
    # 创建 Rate 对象
    rate = node.create_rate(1.0)
    # 创建子线程
    thread = threading.Thread(target=do_some)
    thread.start()
    rclpy.spin(node)  # 进入事件循环
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Time

    node = rclpy.create_node("time_demo")
    right_now = node.get_clock().now()
    t1 = Time(seconds=10,nanoseconds=500000000)
    node.get_logger().info("s = %.2f, ns = %d" % (right_now.seconds_nanoseconds()[0], right_now.seconds_nanoseconds()[1]))
    node.get_logger().info("s = %.2f, ns = %d" % (t1.seconds_nanoseconds()[0], t1.seconds_nanoseconds()[1]))
    node.get_logger().info("ns = %d" % right_now.nanoseconds)
    node.get_logger().info("ns = %d" % t1.nanoseconds)

Duration

du1 = Duration(seconds = 2,nanoseconds = 500000000)

Time 与 Duration 运算

Time 和 Duration 之间可以直接进行数值计算。

    t1 = Time(seconds=10)
    t2 = Time(seconds=4)
    du1 = Duration(seconds=3)
    du2 = Duration(seconds=5)
    # 比较
    node.get_logger().info("t1 >= t2 ? %d" % (t1 >= t2))
    node.get_logger().info("t1 < t2 ? %d" % (t1 < t2))
    # 数学运算
    t3 = t1 + du1
    t4 = t1 - t2
    t5 = t1 - du1
    node.get_logger().info("t3 = %d" % t3.nanoseconds)
    node.get_logger().info("t4 = %d" % t4.nanoseconds)
    node.get_logger().info("t5 = %d" % t5.nanoseconds)
    # 比较
    node.get_logger().info("-" * 80)
    node.get_logger().info("du1 >= du2 ? %d" % (du1 >= du2))
    node.get_logger().info("du1 < du2 ? %d" % (du1 < du2))

ROS2 常用命令工具

包管理工具ros2 pkg

  • ros2 pkg create PKG_NAME --build-type ament_python --dependencies rclpy --node-name NODE_NAME
  • ros2 pkg list
  • ros2 pkg executables PKG_NAME

节点运行 ros2 run

  • ros2 run PKG_NAME NODE_NAM

节点相关工具 ros2 node

  • ros2 node list
  • ros2 node info 查看节点详细信息,包括订阅、发布的消息,开启的服务和动作等

主题相关工具 ros2 topic

  • ros2 topic list
  • ros2 topic info topic_name 显示主题消息类型,订阅者/发布者数量
  • ros2 topic type topic_name 查看话题的消息类型
  • ros2 topic hz topic_name 显示主题平均发布频率
  • ros2 topic echo topic_name 在终端打印主题消息,类似于一个订阅者
  • ros2 topic pub topic_name message_type message_content 在终端发布指定话题消息 ros2 topic pub turtle1/cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.2}}"
    • 参数-1只发布一次,ros2 topic pub -1 topic_name message_type message_content
    • 参数-t count循环发布count次结束,ros2 topic pub -t count topic_name message_type message_content
    • 参数-r count以count Hz的频率循环发布,ros2 topic pub -r count topic_name message_type message_content

接口相关工具ros2 interface

  • ros2 interface list 包括话题、服务、动作。
  • ros2 interface show interface_name 显示指定接口的详细内容

服务相关工具 ros2 service

  • ros2 service list
  • ros2 service call service_name service_Type arguments 调用指定服务. ros2 service call /spawn turtlesim/srv/Spawn "{x: 2, y: 2, theta: 0.2, name: ''}"

ROS2 Rviz2使用

Rviz2就是一款机器人开发过程中的数据可视化软件,机器人模型、传感器信息、环境信息等等.

Rviz2启动

rviz2

图像数据可视化

在 docker 中操作,

  • ros2 launch astra_camera astro_pro_plus.launch.xml 一个终端开启摄像头
  • rviz2 中左下方 add -> by topic, 选择 /camera/color/image_raw/image

参考

ROS2 Rqt工具箱

rqt 可以通过多种插件来工作,比如 topic, service, param.

ROS2 Launch之Python实现

单节点 launch

  • 功能包文件夹下面增加 launch 文件夹
  • launch 下新增 single_node_launch.py 文件, launch文件命名常以 LaunchName_launch.py,其中,LaunchName自定义,_launch.py是常认为固定的。
    from launch import LaunchDescription
    from launch_ros.actions import Node
    # 定义了一个变量node作为一个节点启动的返回值,最后调用LaunchDescription函数传入node参数执行返回。
    # 返回一个launch_description
    def generate_launch_description():
    node = Node(
        package='pkg_helloworld_py',
        executable='helloworld',
    )
    return LaunchDescription([node])
  • 修改 setup.py
    import os
    from glob import glob
    # data_files=[
    (os.path.join('share',package_name,'launch'),glob(os.path.join('launch','*launch.py'))),
  • 正常编译后,使用 ros2 launch pkg_helloworld_py single_node_launch.py 来加载运行。

多节点 launch

  • pkg_topic 功能包文件夹下面增加 launch 文件夹
  • 新建 multi_node_launch.py
    from launch import LaunchDescription
    from launch_ros.actions import Node
    def generate_launch_description():
    publisher_node = Node(
        package='pkg_topic',
        executable='publisher_demo',
        output='screen'
    )
    subscriber_node = Node(
        package='pkg_topic',
        executable='subscriber_demo',
        output='screen'
    )
    return LaunchDescription([
        publisher_node,
        subscriber_node
    ])
  • ros2 launch pkg_topic multi_node_launch.py
  • ros2 node list

话题重映射

  • pkg_topic 功能包文件夹下面增加 remap_name_launch.py
    from launch import LaunchDescription
    from launch_ros.actions import Node
    def generate_launch_description():
    publisher_node = Node(
        package='pkg_topic',
        executable='publisher_demo',
        output='screen',
        remappings=[("/topic_demo", "/topic_update")]
    )
    return LaunchDescription([
        publisher_node
    ])
  • 编译后 ros2 launch pkg_topic remap_name_launch.py
  • ros2 topic list

奇怪: 有些时候,编译后 launch 没有相关的文件。 但是如果在 setup.py 中,按照单个节点的方式去修改。然后编译的时候,先不带 --allow-overriding, 然后再加上。就可以在 launch 中找到了。

launch文件启动launch文件

  • multi_node_launch.py 同级目录下新建 include_launch.py
    from launch import LaunchDescription
    from launch_ros.actions import Node 
    import os
    from launch.actions import IncludeLaunchDescription
    from launch.launch_description_sources import PythonLaunchDescriptionSource
    from ament_index_python.packages import get_package_share_directory
    def generate_launch_description():
    hello_launch = IncludeLaunchDescription(PythonLaunchDescriptionSource(
        [os.path.join(get_package_share_directory('pkg_helloworld_py'), 'launch'),
        '/single_node_launch.py']),
    )
    return LaunchDescription([
        hello_launch
    ])
  • 编译后,ros2 launch pkg_topic include_launch.py

稍复杂的launch文件

  • multi_node_launch.py 同级目录下新建 complex_launch.py
    import os
    from ament_index_python import get_package_share_directory
    from launch import LaunchDescription
    from launch.actions import DeclareLaunchArgument
    from launch.actions import IncludeLaunchDescription
    from launch.actions import GroupAction
    from launch.launch_description_sources import PythonLaunchDescriptionSource
    from launch.substitutions import LaunchConfiguration
    from launch.substitutions import TextSubstitution
    from launch_ros.actions import Node
    from launch_ros.actions import PushRosNamespace
    def generate_launch_description():
    # args that can be set from the command line or a default will be used
    background_r_launch_arg = DeclareLaunchArgument("background_r", default_value=TextSubstitution(text="0"))
    background_g_launch_arg = DeclareLaunchArgument("background_g", default_value=TextSubstitution(text="255"))
    background_b_launch_arg = DeclareLaunchArgument("background_b", default_value=TextSubstitution(text="0"))
    chatter_ns_launch_arg = DeclareLaunchArgument("chatter_ns", default_value=TextSubstitution(text="my/chatter/ns"))
    # include another launch file
    launch_include = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(
                get_package_share_directory('demo_nodes_cpp'),
                'launch/topics/talker_listener.launch.py'))
    )
    # include another launch file in the chatter_ns namespace
    launch_include_with_namespace = GroupAction(
        actions=[
            # push-ros-namespace to set namespace of included nodes
            PushRosNamespace(LaunchConfiguration('chatter_ns')),
            IncludeLaunchDescription(
                PythonLaunchDescriptionSource(
                    os.path.join(
                        get_package_share_directory('demo_nodes_cpp'),
                        'launch/topics/talker_listener.launch.py'))
            ),
        ]
    )
    # start a turtlesim_node in the turtlesim1 namespace
    turtlesim_node = Node(
            package='turtlesim',
            namespace='turtlesim1',
            executable='turtlesim_node',
            name='sim'
        )
    # start another turtlesim_node in the turtlesim2 namespace
    # and use args to set parameters
    turtlesim_node_with_parameters = Node(
            package='turtlesim',
            namespace='turtlesim2',
            executable='turtlesim_node',
            name='sim',
            parameters=[{
                "background_r": LaunchConfiguration('background_r'),
                "background_g": LaunchConfiguration('background_g'),
                "background_b": LaunchConfiguration('background_b'),
            }]
        )
    # perform remap so both turtles listen to the same command topic
    forward_turtlesim_commands_to_second_turtlesim_node = Node(
            package='turtlesim',
            executable='mimic',
            name='mimic',
            remappings=[
                ('/input/pose', '/turtlesim1/turtle1/pose'),
                ('/output/cmd_vel', '/turtlesim2/turtle1/cmd_vel'),
            ]
        )
    return LaunchDescription([
        background_r_launch_arg,
        background_g_launch_arg,
        background_b_launch_arg,
        chatter_ns_launch_arg,
        launch_include,
        launch_include_with_namespace,
        turtlesim_node,
        turtlesim_node_with_parameters,
        forward_turtlesim_commands_to_second_turtlesim_node,
    ])
  • 编译后,ros2 launch pkg_topic complex_launch.py

ROS2 Launch之Xml、Yaml实现

ROS2录制回放工具

rosbag2,这个工具用于记录话题的数据。我们就可以使用这个指令将话题数据存储为文件 ,后续我们无需启动节点,直接可以将bag文件里的话题数据发布出来。

启动要录制的话题节点

ros2 run demo_nodes_py talker

记录

# 记录单个话题
ros2 bag record /topic-name
# 记录多个话题
ros2 bag record topic-name1 topic-name2
# 记录所有话题
ros2 bag record -a
# -o name 自定义输出文件的名字
ros2 bag record -o file-name topic-name

Ctrl+C 指令打断录制即可停止录制.

查看录制出话题的信息

ros2 bag info rosbag2_2023_10_31-07_58_23

播放并查看

播放

# 播放
ros2 bag play rosbag2_2023_10_31-07_58_23
# 倍速播放 -r 选项可以修改播放速率,比如 -r 值,比如 -r 10,就是10倍速,十倍速播放话题
ros2 bag play rosbag2_2023_10_31-07_58_23 -r 10
# 循环播放 -l 
ros2 bag play rosbag2_2023_10_31-07_58_23 -l
# 播放单个话题
ros2 bag play rosbag2_2023_10_31-07_58_23 --topics /chatter

查看

ros2 topic echo /chatter

ROS2 URDF模型介绍

URDF语法

关节Joint描述

关节类型 描述
continuous 描述旋转运动,可以围绕某一个轴无限旋转,比如小车的轮子,就属于这种类型。
revolute 也是旋转关节,和continuous类型的区别在于不能无限旋转,而是带有角度限制,比如机械臂的两个连杆,就属于这种运动。
prismatic 是滑动关节,可以沿某一个轴平移,也带有位置的极限,一般直线电机就是这种运动方式。
fixed 固定关节,是唯一一种不允许运动的关节,不过使用还是比较频繁的,比如相机这个连杆,安装在机器人上,相对位置是不会变化的,此时使用的连接方式就是Fixed。
Floating 是浮动关节,允许进行平移、旋转活动。使用相对较少。
planar 是平面关节,允许在平面正交方向上平移或者旋转。使用相对较少。

完整机器人模型

所有的link和joint标签完成了对机器人每个部分的描述和组合,全都放在一个robot标签中,就形成了完整的机器人模型。在看某一个URDF模型时,先不着急看每一块代码的细节,先来找link和joint,看下这个机器人是由哪些部分组成的,了解完全局之后,再看细节。

创建机器人模型

  • urdf:存放机器人模型的URDF或xacro文件
  • meshes:放置URDF中引用的模型渲染文件
  • launch:保存相关启动文件
  • rviz:保存rviz的配置文件

模型可视化效果

  • docker中确保开启了GUI显示,终端启动:ros2 launch yahboomcar_description display.launch.py
  • 在宿主机的vnc中可以看到开启了Rviz,并显示了机器人模型

模型文件解析

具体URDF模型 /root/yahboomcar_ros2_ws/yahboomcar_ws/src/yahboomcar_description/urdf/Muto.urdf

ROS2 Gazebo介绍

Gazebo是ROS系统中最为常用的三维物理仿真平台,支持动力学引擎,可以实现高质量的图形渲染,不仅可以模拟机器人及周边环境,还可以加入摩擦力、弹性系数等物理属性。随着技术的进步,Gazebo仿真平台也在不断迭代,新一代的Gazebo命名为Ignition,从渲染效果和仿真流畅度上都有较大的变化,我们不妨也来试一下。

ROS2坐标变换TF2简介

机器人中的坐标系

关于坐标系变换关系的基本理论,在每一本机器人学的教材中都会有讲解,可以分解为平移和旋转两个部分,通过一个四乘四的矩阵进行描述,在空间中画出坐标系,那两者之间的变换关系,其实就是向量的数学描述。

TF命令行操作

环境

sudo apt install ros-foxy-turtle-tf2-py ros-foxy-tf2-tools
sudo pip3 install transforms3d

启动

# 一个终端,开启海龟
ros2 launch turtle_tf2_py turtle_tf2_demo.launch.py
# 另一个终端,开启键盘接收
ros2 run turtlesim turtle_teleop_key

查看 TF 树,导出 pdf

ros2 run tf2_tools view_frames.py 在当前终端路径下生成了一个frames.pdf文件,就可以看到系统中各个坐标系的关系了。

查询坐标变换信息

只看到坐标系的结构还不行,如果我们想要知道某两个坐标系之间的具体关系,可以通过tf2_echo这个工具查看:ros2 run tf2_ros tf2_echo turtle2 turtle1

坐标系可视化

rviz2

  • Global options/Fixed frame, 修改为 world
  • 左下 add,添加 TF 显示,并勾选 show names

静态坐标变换

发布A到B的位姿

ros2 run tf2_ros static_transform_publisher 0 0 3 0 0 3.14 A B

监听/获取TF关系

ros2 run tf2_ros tf2_echo A B

rivz可视化

rivz 中,把 Global options/Fixed frame 修改为 A,就可以看到 A,B。

发表评论