工作空间
创建工作空间
cd yahboomcar_ros2_ws
mkdir -p yahboomcar_ws_ptz/src
cd yahboomcar_ws_ptz
编译工作空间
colcon build
source install/setup.bash
echo "source ~/yahboomcar_ros2_ws/yahboomcar_ws/install/setup.bash" >> ~/.bashrc
ROS2 功能包
创建功能包命令
ros2 pkg create <package_name> --build-type <build-type> --dependencies <dependencies> --node-name <node-name>
- cpp,
ros2 pkg create pkg_helloworld_cpp --build-type ament_cmake --dependencies rclcpp --node-name helloworld
- py,
ros2 pkg create pkg_helloworld_py --build-type ament_python --dependencies rclpy --node-name helloworld
功能包简单流程
cd ~/yahboomcar_ros2_ws/yahboomcar_ws/src
ros2 pkg create pkg_helloworld_py --build-type ament_python --dependencies rclpy --node-name helloworld
cd ~/yahboomcar_ros2_ws/yahboomcar_ws
colcon build --packages-select pkg_helloworld_py
source install/setup.bash
ros2 run pkg_helloworld_py helloworld
自动生成的 helloworld.py 中,main 里面只有一条 print, 所以运行的时候,也只打印了一条字符串。
ros2 节点
把 helloworld.py 中引入 rclpy 的节点,再使用 logger 打印,然后 build, source, run,就可以看到循环打印了。
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
import time
"""
创建一个HelloWorld节点, 初始化时输出“hello world”日志
"""
class HelloWorldNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
while rclpy.ok(): # ROS2系统是否正常运行
self.get_logger().info("Hello World") # ROS2日志输出
time.sleep(0.5) # 休眠控制循环时间
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = HelloWorldNode("helloworld") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
- import
rclpy
from rclpy.node import Node
- 开关 ros2 接口,
rclpy.init(args=args)
rclpy.shutdown()
- 创建和销毁节点,
node = HelloWorldNode("helloworld")
node.destroy_node()
- 等待节点退出,
rclpy.spin(node)
- 日志输出
self.get_logger().info("Hello World")
- 检查 rclpy 正常,
while rclpy.ok()
ros2 话题通信 publish
#导入rclpy库
import rclpy
from rclpy.node import Node
#导入String字符串消息
from std_msgs.msg import String
#创建一个继承于Node基类的Topic_Pub节点子类 传入一个参数name
class Topic_Pub(Node):
def __init__(self,name):
super().__init__(name)
#创建一个发布者,使用create_publisher的函数,传入的参数分别是:
#话题数据类型、话题名称、保存消息的队列长度
self.pub = self.create_publisher(String,"/topic_demo",1)
#创建一个定时器,间隔1s进入中断处理函数,传入的参数分别是:
#中断函数执行的间隔时间,中断处理函数
self.timer = self.create_timer(1,self.pub_msg)
#定义中断处理函数
def pub_msg(self):
msg = String() #创建一个String类型的变量msg
msg.data = "Hi,I send a message." #给msg里边的data赋值
self.pub.publish(msg) #发布话题数据
#主函数
def main():
rclpy.init() #初始化
pub_demo = Topic_Pub("publisher_node") #创建Topic_Pub类对象,传入的参数就是节点的名字
rclpy.spin(pub_demo) #执行rclpy.spin函数,里边传入一个参数,参数是刚才创建好的Topic_Pub类对象
pub_demo.destroy_node() #销毁节点对象
rclpy.shutdown() #关闭ROS2 Python接口
- 创建 publisher,
self.pub = self.create_publisher(String,"/topic_demo",1)
- 发布话题数据,
self.pub.publish(msg) #发布话题数据
- 创建定时器,
self.timer = self.create_timer(1,self.pub_msg)
调试:
ros2 run pkg_topic publisher_demo
ros2 topic list
ros2 topic echo /topic_demo
ros2 话题通信 subscriber
#导入相关的库
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Topic_Sub(Node):
def __init__(self,name):
super().__init__(name)
#创建订阅者使用的是create_subscription,传入的参数分别是:话题数据类型,话题名称,回调函数名称,队列长度
self.sub = self.create_subscription(String,"/topic_demo",self.sub_callback,1)
#回调函数执行程序:打印接收的到信息
def sub_callback(self,msg):
print(msg.data)
def main():
rclpy.init() #ROS2 Python接口初始化
sub_demo = Topic_Sub("subscriber_node") # 创建对象并进行初始化
rclpy.spin(sub_demo)
sub_demo.destroy_node() #销毁节点对象
rclpy.shutdown() #关闭ROS2 Python接口
// setup.py
'subscriber_demo = pkg_topic.subscriber_demo:main'
- 创建 subscriber,
self.sub = self.create_subscription(String,"/topic_demo",self.sub_callback,1)
调试:
ros2 run pkg_topic publisher_demo
ros2 run pkg_topic subscriber_demo
ros2 服务通信 server
#导入相关的库文件
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class Service_Server(Node):
def __init__(self,name):
super().__init__(name)
#创建一个服务端,使用的是create_service函数,传入的参数分别是:
#服务数据的数据类型、服务的名称,服务回调函数(也就是服务的内容)
self.srv = self.create_service(AddTwoInts, '/add_two_ints', self.Add2Ints_callback)
#这里的服务回调函数的内容是把两个整型数相加,然后返回相加的结果
def Add2Ints_callback(self,request,response):
response.sum = request.a + request.b
print("response.sum = ",response.sum)
return response
def main():
rclpy.init()
server_demo = Service_Server("publisher_node")
rclpy.spin(server_demo)
server_demo.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
- 创建服务
self.srv = self.create_service(AddTwoInts, '/add_two_ints', self.Add2Ints_callback)
服务相关:
- 查看服务接口,
ros2 interface show example_interfaces/srv/AddTwoInts
- 查看服务路径,
ros2 pkg prefix example_interfaces
- 服务 AddTwoInts 在
/opt/ros/foxy/share/example_interfaces/srv/AddTwoInts.srv
中
调试:
ros2 run pkg_service server_demo
ros2 service list
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 1,b: 4}"
注意,:
后面需要空格,否则报错。
ros2 服务通信 client
#导入相关的库
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class Service_Client(Node):
def __init__(self,name):
super().__init__(name)
#创建客户端,使用的是create_client函数,传入的参数是服务数据的数据类型、服务的话题名称
self.client = self.create_client(AddTwoInts,'/add_two_ints')
# 循环等待服务器端成功启动
while not self.client.wait_for_service(timeout_sec=1.0):
print("service not available, waiting again...")
# 创建服务请求的数据对象
self.request = AddTwoInts.Request()
def send_request(self):
self.request.a = 10
self.request.b = 90
#发送服务请求
self.future = self.client.call_async(self.request)
def main():
rclpy.init() #节点初始化
service_client = Service_Client("client_node") #创建对象
service_client.send_request() #发送服务请求
while rclpy.ok():
rclpy.spin_once(service_client)
#判断数据是否处理完成
if service_client.future.done():
try:
#获得服务反馈的信息并且打印
response = service_client.future.result()
print("service_client.request.a = ",service_client.request.a)
print("service_client.request.b = ",service_client.request.b)
print("Result = ",response.sum)
except Exception as e:
service_client.get_logger().info('Service call failed %r' % (e,))
break
service_client.destroy_node()
rclpy.shutdown()
- 等待服务启动成功,
self.client.wait_for_service(timeout_sec=1.0)
self.request = AddTwoInts.Request()
self.future = self.client.call_async(self.request)
if service_client.future.done():
response = service_client.future.result()
调试:
ros2 run pkg_service server_demo
ros2 run pkg_service client_demo
ros2 动作通讯 server
接口功能包
cd ~/yahboomcar_ros2_ws/yahboomcar_ws/src
ros2 pkg create --build-type ament_cmake pkg_interfaces
mkdir pkg_interfaces/action
vim pkg_interfaces/action/Progress.action
新建# 目标(Goal)部分 int64 num --- # 结果(Result)部分 int64 sum --- # 反馈(Feedback)部分 float64 progress
vim pkg_interfaces/package.xml
添加<buildtool_depend>rosidl_default_generators</buildtool_depend> <depend>action_msgs</depend> <member_of_group>rosidl_interface_packages</member_of_group>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
:用于生成接口代码的工具。rosidl_default_generators 是 ROS 2 的一个工具包,用于从接口定义文件(如 .msg、.srv、.action)生成相应的 Python、C++ 等语言的代码。<depend>action_msgs</depend>
:动作接口的运行时依赖。action_msgs 是 ROS 2 提供的一个标准包,包含了动作(Action)接口的基本消息类型(如 GoalStatus、GoalInfo 等)。<member_of_group>rosidl_interface_packages</member_of_group>
:标记当前包为接口包。rosidl_interface_packages 是 ROS 2 中的一个组,用于标识包含接口定义(如 .msg、.srv、.action)的包。
vim pkg_interfaces/CMakeLists.txt
添加find_package(rosidl_default_generators REQUIRED) rosidl_generate_interfaces(${PROJECT_NAME} "action/Progress.action" )
find_package(rosidl_default_generators REQUIRED)
查找并加载 rosidl_default_generators 包。rosidl_default_generators 是 ROS 2 提供的一个工具包,用于从接口定义文件(如 .msg、.srv、.action)生成相应的 Python、C++ 等语言的代码。rosidl_generate_interfaces(${PROJECT_NAME} "action/Progress.action")
生成接口代码。它会根据指定的接口定义文件(如 Progress.action),生成相应的 Python、C++ 等语言的代码。生成的代码会被放置在 install 目录中,供其他包使用。
colcon build --packages-select pkg_interfaces
source install/setup.bash
ros2 interface show pkg_interfaces/action/Progress
服务端
ros2 pkg create pkg_action --build-type ament_python --dependencies rclpy pkg_interfaces --node-name action_server_demo
注意--dependencies
后面需要添加pkg_interfaces
import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from pkg_interfaces.action import Progress
class Action_Server(Node):
def __init__(self):
super().__init__('progress_action_server')
# 创建动作服务端
self._action_server = ActionServer(self, Progress, 'get_sum', self.execute_callback)
self.get_logger().info('动作服务已经启动!')
def execute_callback(self, goal_handle):
self.get_logger().info('开始执行任务....')
# 生成连续反馈;
feedback_msg = Progress.Feedback()
sum = 0
for i in range(1, goal_handle.request.num + 1):
sum += i
feedback_msg.progress = i / goal_handle.request.num
self.get_logger().info('连续反馈: %.2f' % feedback_msg.progress)
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
# 生成最终响应。
goal_handle.succeed()
result = Progress.Result()
result.sum = sum
self.get_logger().info('任务完成!')
return result
def main(args=None):
rclpy.init(args=args)
# 调用spin函数,并传入节点对象
Progress_action_server = Action_Server()
rclpy.spin(Progress_action_server)
Progress_action_server.destroy_node()
# 释放资源
rclpy.shutdown()
from rclpy.action import ActionServer
from pkg_interfaces.action import Progress
self._action_server = ActionServer(self, Progress, 'get_sum', self.execute_callback)
feedback_msg = Progress.Feedback()
用于存储动作执行过程中的反馈数据。Progress.Feedback() 是根据动作接口文件 Progress.action 生成的反馈消息类型。goal_handle.request.num
客户端发送的目标数据。goal_handle 是动作服务器的目标句柄,request 是客户端发送的目标请求对象。num 是目标请求中的一个字段,表示客户端发送的整数。feedback_msg.progress = i / goal_handle.request.num
计算当前任务的进度,并将其存储在反馈消息中。feedback_msg.progress 是反馈消息中的一个字段,表示任务完成的进度(通常是百分比)。goal_handle.publish_feedback(feedback_msg)
发布反馈消息给客户端。客户端可以通过回调函数接收到这些反馈消息,用于实时监控任务进度。goal_handle.succeed()
标记动作执行成功。调用 succeed() 后,动作服务器会向客户端返回结果,并结束动作执行。result = Progress.Result()
创建一个 Result 对象,用于存储动作执行的结果数据。 Progress.Result() 是根据动作接口文件 Progress.action 生成的结果消息类型。
调试:
colcon build --packages-select pkg_action
ros2 run pkg_action action_server_demo
ros2 action list
ros2 action send_goal /get_sum pkg_interfaces/action/Progress num:\ 10\
客户端
- 在
action_server_demo.py
同级目录下新建文件action_client_demo.py
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from pkg_interfaces.action import Progress
class Action_Client(Node):
def __init__(self):
super().__init__('progress_action_client')
# 创建动作客户端;
self._action_client = ActionClient(self, Progress, 'get_sum')
def send_goal(self, num):
# 发送请求;
goal_msg = Progress.Goal()
goal_msg.num = num
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
# 处理目标发送后的反馈;
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('请求被拒绝')
return
self.get_logger().info('请求被接收,开始执行任务!')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
# 处理最终响应。
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('最终计算结果:sum = %d' % result.sum)
# 5.释放资源。
rclpy.shutdown()
# 处理连续反馈;
def feedback_callback(self, feedback_msg):
feedback = (int)(feedback_msg.feedback.progress * 100)
self.get_logger().info('当前进度: %d%%' % feedback)
def main(args=None):
rclpy.init(args=args)
action_client = Action_Client()
action_client.send_goal(10)
rclpy.spin(action_client)
self._action_client = ActionClient(self, Progress, 'get_sum')
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
- 异步发送目标请求给动作服务器。
- send_goal_async 是动作客户端的方法,用于发送目标请求并返回一个 Future 对象。
- goal_msg 是客户端发送的目标消息。
- feedback_callback=self.feedback_callback 是可选参数,用于指定接收反馈消息的回调函数。
self._send_goal_future.add_done_callback(self.goal_response_callback)
- 为目标发送操作添加一个回调函数,当目标发送完成时调用。
- self.goal_response_callback 是回调函数,用于处理目标发送的结果(例如目标是否被服务器接受)。
feedback = (int)(feedback_msg.feedback.progress * 100)
- 从反馈消息中提取进度信息,并将其转换为整数百分比。
- feedback_msg.feedback.progress 是反馈消息中的进度字段(通常是 0 到 1 之间的浮点数)。
goal_handle = future.result()
- future 是目标发送操作(send_goal_async)返回的 Future 对象。
- goal_handle 是目标句柄,用于跟踪目标的状态(例如是否被接受、是否完成)。
if not goal_handle.accepted
检查目标是否被动作服务器接受。self._get_result_future = goal_handle.get_result_async()
- 异步获取动作执行的结果。
- get_result_async 是目标句柄的方法,用于异步获取结果并返回一个 Future 对象。
self._get_result_future.add_done_callback(self.get_result_callback)
为获取结果操作添加一个回调函数,当结果获取完成时调用。result = future.result().result
从 Future 对象中提取动作执行的结果。future.result() 返回一个包含结果的对象,result 字段是实际的结果数据。
调试:
ros2 run pkg_action action_server_demo
ros2 run pkg_action action_client_demo
自定义接口
话题通信接口
mkdir pkg_interfaces/msg
vim pkg_interfaces/msg/Person.msg
string name int32 age float64 height
pkg_interfaces/package.xml
<buildtool_depend>rosidl_default_generators</buildtool_depend> <exec_depend>rosidl_default_runtime</exec_depend> <depend>action_msgs</depend> <member_of_group>rosidl_interface_packages</member_of_group>
pkg_interfaces/CMakeLists.txt
find_package(rosidl_default_generators REQUIRED) rosidl_generate_interfaces(${PROJECT_NAME} "action/Progress.action" "msg/Person.msg" )
colcon build --packages-select pkg_interfaces
source install/setup.bash
ros2 interface show pkg_interfaces/msg/Person
服务通信接口
mkdir -p pkg_interfaces/srv
vim pkg_interfaces/srv/Add.srv
int32 num1 int32 num2 --- int32 sum
colcon build --packages-select pkg_interfaces
source ../install/setup.bash
ros2 interface show pkg_interfaces/srv/Add
参数服务
介绍
ros2 param list
- 参数查询与修改
ros2 param describe turtlesim background_b # 查看某个参数的描述信息 ros2 param get turtlesim background_b # 查询某个参数的值 ros2 param set turtlesim background_b 10 # 修改某个参数的值
- 参数文件保存与加载
ros2 param dump turtlesim >> turtlesim.yaml # 将某个节点的参数保存到参数文件中 ros2 param load turtlesim turtlesim.yaml # 一次性加载某一个文件中的所有参数
参数实例
新建功能包
ros2 pkg create pkg_param --build-type ament_python --dependencies rclpy --node-name param_demo
代码
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
class ParameterNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.timer = self.create_timer(2, self.timer_callback) # 创建一个定时器(单位为秒的周期,定时执行的回调函数)
self.declare_parameter('robot_name', 'muto') # 创建一个参数,并设置参数的默认值
def timer_callback(self): # 创建定时器周期执行的回调函数
robot_name_param = self.get_parameter('robot_name').get_parameter_value().string_value # 从ROS2系统中读取参数的值
self.get_logger().info('Hello %s!' % robot_name_param) # 输出日志信息,打印读取到的参数值
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = ParameterNode("param_declare") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
self.declare_parameter('robot_name', 'muto')
创建参数robot_name_param = self.get_parameter('robot_name').get_parameter_value().string_value
读取参数
编译运行
colcon build --packages-select pkg_param
source install/setup.bash
ros2 run pkg_param param_demo
ros2 param set param_declare robot_name robot
元功能包
ros2 pkg create pkg_metapackage
- 修改
package.xml
<exec_depend>pkg_interfaces</exec_depend> <exec_depend>pkg_helloworld_py</exec_depend> <exec_depend>pkg_topic</exec_depend> <exec_depend>pkg_service</exec_depend> <exec_depend>pkg_action</exec_depend> <exec_depend>pkg_param</exec_depend>
colcon build --packages-select pkg_metapackage
分布式
实现
- 小车
./run_docker.sh
,ros2 run demo_nodes_py talker
- 虚拟机
ros2 run demo_nodes_py listener
- 需要确保
bashrc
中的ROS_DOMAIN_ID
相同。
取值
- 建议ROS_DOMAIN_ID的取值在[0,101] 之间,包含0和101
- 每个域ID内的节点总数是有限制的,需要小于等于120个
- 如果域ID为101,那么该域的节点总数需要小于等于54个
计算规则
- DDS协议规定以7400作为起始端口,也即可用端口为[7400,65535],又已知按照DDS协议默认情况下,每个域ID占用250个端口,那么域ID的个数为:(65535-7400)/250 = 232(个),对应的其取值范围为[0,231];
- 在Linux下,可用的域ID为[0,101]与[215-231],在Windows和Mac中可用的域ID为[0,166],综上,为了兼容多平台,建议域ID在[0,101] 范围内取值。
- 每个域ID默认占用250个端口,且每个ROS2节点需要占用两个端口,另外,按照DDS协议每个域ID的端口段内,第1、2个端口是Discovery Multicast端口与User Multicast端口,从第11、12个端口开始是域内第一个节点的Discovery Unicast端口与User Unicast,后续节点所占用端口依次顺延,那么一个域ID中的最大节点个数为:(250-10)/2 = 120(个)
- 特殊情况:域ID值为101时,其后半段端口属于操作系统的预留端口,其节点最大个数为54个。
DDS
QoS
- DEADLINE策略,表示通信数据必须要在每次截止时间内完成一次通信;
- HISTORY策略,表示针对历史数据的一个缓存大小;
- RELIABILITY策略,表示数据通信的模式,配置成BEST_EFFORT,就是尽力传输模式,网络情况不好的时候,也要保证数据流畅,此时可能会导致数据丢失,配置成RELIABLE,就是可信赖模式,可以在通信中尽量保证图像的完整性,我们可以根据应用功能场景选择合适的通信模式;
- DURABILITY策略,可以配置针对晚加入的节点,也保证有一定的历史数据发送过去,可以让新节点快速适应系统。
ROS2时间相关API
rate
rclpy 中的 Rate 对象可以通过节点创建,Rate 对象的 sleep() 函数需要在子线程中执行,否咋会阻塞程序。
import rclpy
import threading
from rclpy.timer import Rate
rate = None
node = None
def do_some():
global rate
global node
while rclpy.ok():
node.get_logger().info("hello ---------")
# 休眠
rate.sleep()
def main():
global rate
global node
rclpy.init()
node = rclpy.create_node("rate_demo")
# 创建 Rate 对象
rate = node.create_rate(1.0)
# 创建子线程
thread = threading.Thread(target=do_some)
thread.start()
rclpy.spin(node) # 进入事件循环
rclpy.shutdown()
if __name__ == '__main__':
main()
Time
node = rclpy.create_node("time_demo")
right_now = node.get_clock().now()
t1 = Time(seconds=10,nanoseconds=500000000)
node.get_logger().info("s = %.2f, ns = %d" % (right_now.seconds_nanoseconds()[0], right_now.seconds_nanoseconds()[1]))
node.get_logger().info("s = %.2f, ns = %d" % (t1.seconds_nanoseconds()[0], t1.seconds_nanoseconds()[1]))
node.get_logger().info("ns = %d" % right_now.nanoseconds)
node.get_logger().info("ns = %d" % t1.nanoseconds)
Duration
du1 = Duration(seconds = 2,nanoseconds = 500000000)
Time 与 Duration 运算
Time 和 Duration 之间可以直接进行数值计算。
t1 = Time(seconds=10)
t2 = Time(seconds=4)
du1 = Duration(seconds=3)
du2 = Duration(seconds=5)
# 比较
node.get_logger().info("t1 >= t2 ? %d" % (t1 >= t2))
node.get_logger().info("t1 < t2 ? %d" % (t1 < t2))
# 数学运算
t3 = t1 + du1
t4 = t1 - t2
t5 = t1 - du1
node.get_logger().info("t3 = %d" % t3.nanoseconds)
node.get_logger().info("t4 = %d" % t4.nanoseconds)
node.get_logger().info("t5 = %d" % t5.nanoseconds)
# 比较
node.get_logger().info("-" * 80)
node.get_logger().info("du1 >= du2 ? %d" % (du1 >= du2))
node.get_logger().info("du1 < du2 ? %d" % (du1 < du2))
ROS2 常用命令工具
包管理工具ros2 pkg
ros2 pkg create PKG_NAME --build-type ament_python --dependencies rclpy --node-name NODE_NAME
ros2 pkg list
ros2 pkg executables PKG_NAME
节点运行 ros2 run
ros2 run PKG_NAME NODE_NAM
节点相关工具 ros2 node
ros2 node list
ros2 node info
查看节点详细信息,包括订阅、发布的消息,开启的服务和动作等
主题相关工具 ros2 topic
ros2 topic list
ros2 topic info topic_name
显示主题消息类型,订阅者/发布者数量ros2 topic type topic_name
查看话题的消息类型ros2 topic hz topic_name
显示主题平均发布频率ros2 topic echo topic_name
在终端打印主题消息,类似于一个订阅者ros2 topic pub topic_name message_type message_content
在终端发布指定话题消息ros2 topic pub turtle1/cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.2}}"
- 参数-1只发布一次,
ros2 topic pub -1 topic_name message_type message_content
- 参数-t count循环发布count次结束,
ros2 topic pub -t count topic_name message_type message_content
- 参数-r count以count Hz的频率循环发布,
ros2 topic pub -r count topic_name message_type message_content
- 参数-1只发布一次,
接口相关工具ros2 interface
ros2 interface list
包括话题、服务、动作。ros2 interface show interface_name
显示指定接口的详细内容
服务相关工具 ros2 service
ros2 service list
ros2 service call service_name service_Type arguments
调用指定服务.ros2 service call /spawn turtlesim/srv/Spawn "{x: 2, y: 2, theta: 0.2, name: ''}"
ROS2 Rviz2使用
Rviz2就是一款机器人开发过程中的数据可视化软件,机器人模型、传感器信息、环境信息等等.
Rviz2启动
rviz2
图像数据可视化
在 docker 中操作,
ros2 launch astra_camera astro_pro_plus.launch.xml
一个终端开启摄像头rviz2
中左下方 add -> by topic, 选择/camera/color/image_raw/image
参考
ROS2 Rqt工具箱
rqt
可以通过多种插件来工作,比如 topic, service, param.
ROS2 Launch之Python实现
单节点 launch
- 功能包文件夹下面增加
launch
文件夹 launch
下新增single_node_launch.py
文件, launch文件命名常以LaunchName_launch.py
,其中,LaunchName自定义,_launch.py
是常认为固定的。from launch import LaunchDescription from launch_ros.actions import Node # 定义了一个变量node作为一个节点启动的返回值,最后调用LaunchDescription函数传入node参数执行返回。 # 返回一个launch_description def generate_launch_description(): node = Node( package='pkg_helloworld_py', executable='helloworld', ) return LaunchDescription([node])
- 修改
setup.py
import os from glob import glob # data_files=[ (os.path.join('share',package_name,'launch'),glob(os.path.join('launch','*launch.py'))),
- 正常编译后,使用
ros2 launch pkg_helloworld_py single_node_launch.py
来加载运行。
多节点 launch
pkg_topic
功能包文件夹下面增加launch
文件夹- 新建
multi_node_launch.py
from launch import LaunchDescription from launch_ros.actions import Node def generate_launch_description(): publisher_node = Node( package='pkg_topic', executable='publisher_demo', output='screen' ) subscriber_node = Node( package='pkg_topic', executable='subscriber_demo', output='screen' ) return LaunchDescription([ publisher_node, subscriber_node ])
ros2 launch pkg_topic multi_node_launch.py
ros2 node list
话题重映射
pkg_topic
功能包文件夹下面增加remap_name_launch.py
from launch import LaunchDescription from launch_ros.actions import Node def generate_launch_description(): publisher_node = Node( package='pkg_topic', executable='publisher_demo', output='screen', remappings=[("/topic_demo", "/topic_update")] ) return LaunchDescription([ publisher_node ])
- 编译后
ros2 launch pkg_topic remap_name_launch.py
ros2 topic list
奇怪: 有些时候,编译后 launch 没有相关的文件。 但是如果在 setup.py 中,按照单个节点的方式去修改。然后编译的时候,先不带 --allow-overriding, 然后再加上。就可以在 launch 中找到了。
launch文件启动launch文件
- 在
multi_node_launch.py
同级目录下新建include_launch.py
from launch import LaunchDescription from launch_ros.actions import Node import os from launch.actions import IncludeLaunchDescription from launch.launch_description_sources import PythonLaunchDescriptionSource from ament_index_python.packages import get_package_share_directory def generate_launch_description(): hello_launch = IncludeLaunchDescription(PythonLaunchDescriptionSource( [os.path.join(get_package_share_directory('pkg_helloworld_py'), 'launch'), '/single_node_launch.py']), ) return LaunchDescription([ hello_launch ])
- 编译后,
ros2 launch pkg_topic include_launch.py
稍复杂的launch文件
- 在
multi_node_launch.py
同级目录下新建complex_launch.py
import os from ament_index_python import get_package_share_directory from launch import LaunchDescription from launch.actions import DeclareLaunchArgument from launch.actions import IncludeLaunchDescription from launch.actions import GroupAction from launch.launch_description_sources import PythonLaunchDescriptionSource from launch.substitutions import LaunchConfiguration from launch.substitutions import TextSubstitution from launch_ros.actions import Node from launch_ros.actions import PushRosNamespace def generate_launch_description(): # args that can be set from the command line or a default will be used background_r_launch_arg = DeclareLaunchArgument("background_r", default_value=TextSubstitution(text="0")) background_g_launch_arg = DeclareLaunchArgument("background_g", default_value=TextSubstitution(text="255")) background_b_launch_arg = DeclareLaunchArgument("background_b", default_value=TextSubstitution(text="0")) chatter_ns_launch_arg = DeclareLaunchArgument("chatter_ns", default_value=TextSubstitution(text="my/chatter/ns")) # include another launch file launch_include = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join( get_package_share_directory('demo_nodes_cpp'), 'launch/topics/talker_listener.launch.py')) ) # include another launch file in the chatter_ns namespace launch_include_with_namespace = GroupAction( actions=[ # push-ros-namespace to set namespace of included nodes PushRosNamespace(LaunchConfiguration('chatter_ns')), IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join( get_package_share_directory('demo_nodes_cpp'), 'launch/topics/talker_listener.launch.py')) ), ] ) # start a turtlesim_node in the turtlesim1 namespace turtlesim_node = Node( package='turtlesim', namespace='turtlesim1', executable='turtlesim_node', name='sim' ) # start another turtlesim_node in the turtlesim2 namespace # and use args to set parameters turtlesim_node_with_parameters = Node( package='turtlesim', namespace='turtlesim2', executable='turtlesim_node', name='sim', parameters=[{ "background_r": LaunchConfiguration('background_r'), "background_g": LaunchConfiguration('background_g'), "background_b": LaunchConfiguration('background_b'), }] ) # perform remap so both turtles listen to the same command topic forward_turtlesim_commands_to_second_turtlesim_node = Node( package='turtlesim', executable='mimic', name='mimic', remappings=[ ('/input/pose', '/turtlesim1/turtle1/pose'), ('/output/cmd_vel', '/turtlesim2/turtle1/cmd_vel'), ] ) return LaunchDescription([ background_r_launch_arg, background_g_launch_arg, background_b_launch_arg, chatter_ns_launch_arg, launch_include, launch_include_with_namespace, turtlesim_node, turtlesim_node_with_parameters, forward_turtlesim_commands_to_second_turtlesim_node, ])
- 编译后,
ros2 launch pkg_topic complex_launch.py
ROS2 Launch之Xml、Yaml实现
ROS2录制回放工具
rosbag2,这个工具用于记录话题的数据。我们就可以使用这个指令将话题数据存储为文件 ,后续我们无需启动节点,直接可以将bag文件里的话题数据发布出来。
启动要录制的话题节点
ros2 run demo_nodes_py talker
记录
# 记录单个话题
ros2 bag record /topic-name
# 记录多个话题
ros2 bag record topic-name1 topic-name2
# 记录所有话题
ros2 bag record -a
# -o name 自定义输出文件的名字
ros2 bag record -o file-name topic-name
Ctrl+C
指令打断录制即可停止录制.
查看录制出话题的信息
ros2 bag info rosbag2_2023_10_31-07_58_23
播放并查看
播放
# 播放
ros2 bag play rosbag2_2023_10_31-07_58_23
# 倍速播放 -r 选项可以修改播放速率,比如 -r 值,比如 -r 10,就是10倍速,十倍速播放话题
ros2 bag play rosbag2_2023_10_31-07_58_23 -r 10
# 循环播放 -l
ros2 bag play rosbag2_2023_10_31-07_58_23 -l
# 播放单个话题
ros2 bag play rosbag2_2023_10_31-07_58_23 --topics /chatter
查看
ros2 topic echo /chatter
ROS2 URDF模型介绍
URDF语法
关节Joint描述
关节类型 | 描述 |
---|---|
continuous | 描述旋转运动,可以围绕某一个轴无限旋转,比如小车的轮子,就属于这种类型。 |
revolute | 也是旋转关节,和continuous类型的区别在于不能无限旋转,而是带有角度限制,比如机械臂的两个连杆,就属于这种运动。 |
prismatic | 是滑动关节,可以沿某一个轴平移,也带有位置的极限,一般直线电机就是这种运动方式。 |
fixed | 固定关节,是唯一一种不允许运动的关节,不过使用还是比较频繁的,比如相机这个连杆,安装在机器人上,相对位置是不会变化的,此时使用的连接方式就是Fixed。 |
Floating | 是浮动关节,允许进行平移、旋转活动。使用相对较少。 |
planar | 是平面关节,允许在平面正交方向上平移或者旋转。使用相对较少。 |
完整机器人模型
所有的link和joint标签完成了对机器人每个部分的描述和组合,全都放在一个robot标签中,就形成了完整的机器人模型。在看某一个URDF模型时,先不着急看每一块代码的细节,先来找link和joint,看下这个机器人是由哪些部分组成的,了解完全局之后,再看细节。
创建机器人模型
- urdf:存放机器人模型的URDF或xacro文件
- meshes:放置URDF中引用的模型渲染文件
- launch:保存相关启动文件
- rviz:保存rviz的配置文件
模型可视化效果
- docker中确保开启了GUI显示,终端启动:
ros2 launch yahboomcar_description display.launch.py
- 在宿主机的vnc中可以看到开启了Rviz,并显示了机器人模型
模型文件解析
具体URDF模型 /root/yahboomcar_ros2_ws/yahboomcar_ws/src/yahboomcar_description/urdf/Muto.urdf
ROS2 Gazebo介绍
Gazebo是ROS系统中最为常用的三维物理仿真平台,支持动力学引擎,可以实现高质量的图形渲染,不仅可以模拟机器人及周边环境,还可以加入摩擦力、弹性系数等物理属性。随着技术的进步,Gazebo仿真平台也在不断迭代,新一代的Gazebo命名为Ignition,从渲染效果和仿真流畅度上都有较大的变化,我们不妨也来试一下。
ROS2坐标变换TF2简介
机器人中的坐标系
关于坐标系变换关系的基本理论,在每一本机器人学的教材中都会有讲解,可以分解为平移和旋转两个部分,通过一个四乘四的矩阵进行描述,在空间中画出坐标系,那两者之间的变换关系,其实就是向量的数学描述。
TF命令行操作
环境
sudo apt install ros-foxy-turtle-tf2-py ros-foxy-tf2-tools
sudo pip3 install transforms3d
启动
# 一个终端,开启海龟
ros2 launch turtle_tf2_py turtle_tf2_demo.launch.py
# 另一个终端,开启键盘接收
ros2 run turtlesim turtle_teleop_key
查看 TF 树,导出 pdf
ros2 run tf2_tools view_frames.py
在当前终端路径下生成了一个frames.pdf文件,就可以看到系统中各个坐标系的关系了。
查询坐标变换信息
只看到坐标系的结构还不行,如果我们想要知道某两个坐标系之间的具体关系,可以通过tf2_echo这个工具查看:ros2 run tf2_ros tf2_echo turtle2 turtle1
坐标系可视化
rviz2
- Global options/Fixed frame, 修改为 world
- 左下 add,添加 TF 显示,并勾选 show names
静态坐标变换
发布A到B的位姿
ros2 run tf2_ros static_transform_publisher 0 0 3 0 0 3.14 A B
监听/获取TF关系
ros2 run tf2_ros tf2_echo A B
rivz可视化
rivz 中,把 Global options/Fixed frame 修改为 A,就可以看到 A,B。