Week 4 - ROS 2 Nodes and Topics
Learning Objectivesβ
By the end of this week, you will be able to:
- Create ROS 2 nodes in Python and C++
- Implement publish/subscribe communication patterns
- Define custom message types
- Use QoS policies for reliable communication
Overviewβ
Nodes and topics form the foundation of ROS 2 communication. Nodes are executable processes that perform computation, while topics are named buses for data exchange using the publish/subscribe pattern.
Node Architectureβ
Node Lifecycleβ
βββββββββββββββ
β Created β
ββββββββ¬βββββββ
β
βΌ
βββββββββββββββ
β Active β ββββββββ spin() / spin_once()
ββββββββ¬βββββββ
β
βΌ
βββββββββββββββ
β Destroyed β
βββββββββββββββ
Python Node Implementationβ
#!/usr/bin/env python3
"""
ROS 2 Node with Publisher and Subscriber
Demonstrates bidirectional communication
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Int32
from sensor_msgs.msg import Image
import numpy as np
class RobotController(Node):
def __init__(self):
super().__init__('robot_controller')
# Declare parameters
self.declare_parameter('robot_name', 'humanoid_01')
self.declare_parameter('control_rate', 100.0) # Hz
# Get parameters
self.robot_name = self.get_parameter('robot_name').value
control_rate = self.get_parameter('control_rate').value
# Publishers
self.command_pub = self.create_publisher(
String, f'/{self.robot_name}/commands', 10)
self.status_pub = self.create_publisher(
String, f'/{self.robot_name}/status', 10)
# Subscribers
self.sensor_sub = self.create_subscription(
String, f'/{self.robot_name}/sensors',
self.sensor_callback, 10)
# Timer for periodic publishing
self.timer = self.create_timer(1.0 / control_rate, self.control_loop)
self.get_logger().info(f'Robot Controller initialized: {self.robot_name}')
def sensor_callback(self, msg: String):
"""Process incoming sensor data"""
self.get_logger().debug(f'Received sensor data: {msg.data}')
# Process and republish status
status_msg = String()
status_msg.data = f'Status: Processing {msg.data}'
self.status_pub.publish(status_msg)
def control_loop(self):
"""Main control loop"""
command_msg = String()
command_msg.data = f'Command from {self.robot_name}'
self.command_pub.publish(command_msg)
def main(args=None):
rclpy.init(args=args)
node = RobotController()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
C++ Node Implementationβ
// robot_controller.cpp
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include <memory>
#include <chrono>
class RobotController : public rclcpp::Node
{
public:
RobotController()
: Node("robot_controller"), count_(0)
{
// Declare parameters
this->declare_parameter<std::string>("robot_name", "humanoid_01");
this->declare_parameter<double>("control_rate", 100.0);
// Get parameters
std::string robot_name = this->get_parameter("robot_name").as_string();
double control_rate = this->get_parameter("control_rate").as_double();
// Publishers
command_publisher_ = this->create_publisher<std_msgs::msg::String>(
"/" + robot_name + "/commands", 10);
status_publisher_ = this->create_publisher<std_msgs::msg::String>(
"/" + robot_name + "/status", 10);
// Subscribers
sensor_subscription_ = this->create_subscription<std_msgs::msg::String>(
"/" + robot_name + "/sensors", 10,
std::bind(&RobotController::sensor_callback, this, std::placeholders::_1));
// Timer
auto period = std::chrono::milliseconds(static_cast<int>(1000.0 / control_rate));
timer_ = this->create_wall_timer(
period, std::bind(&RobotController::control_loop, this));
RCLCPP_INFO(this->get_logger(), "Robot Controller initialized: %s", robot_name.c_str());
}
private:
void sensor_callback(const std_msgs::msg::String::SharedPtr msg)
{
RCLCPP_DEBUG(this->get_logger(), "Received sensor data: %s", msg->data.c_str());
auto status_msg = std_msgs::msg::String();
status_msg.data = "Status: Processing " + msg->data;
status_publisher_->publish(status_msg);
}
void control_loop()
{
auto command_msg = std_msgs::msg::String();
command_msg.data = "Command from humanoid_01";
command_publisher_->publish(command_msg);
count_++;
}
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr command_publisher_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr status_publisher_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sensor_subscription_;
rclcpp::TimerBase::SharedPtr timer_;
size_t count_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<RobotController>());
rclcpp::shutdown();
return 0;
}
Custom Messagesβ
Defining Custom Message Typesβ
# msg/JointCommand.msg
# Joint command for humanoid robot actuation
string joint_name # Name of the joint
float64 position # Desired position (radians)
float64 velocity # Desired velocity (rad/s)
float64 effort # Desired effort (Nm)
float64 stiffness # Stiffness gain
float64 damping # Damping gain
Package Configuration for Custom Messagesβ
# setup.py
from setuptools import setup
from glob import glob
import os
package_name = 'humanoid_interfaces'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'msg'), glob('msg/*.msg')),
],
install_requires=['setuptools'],
zip_safe=True,
author='Your Name',
author_email='your.email@example.com',
description='Custom interfaces for humanoid robot',
license='Apache 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'controller = humanoid_interfaces.controller:main',
],
},
)
<!-- package.xml -->
<?xml version="1.0"?>
<package format="3">
<name>humanoid_interfaces</name>
<version>0.1.0</version>
<description>Custom interfaces for humanoid robot</description>
<maintainer email="your.email@example.com">Your Name</maintainer>
<license>Apache 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>ament_cmake_python</buildtool_depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
# CMakeLists.txt
cmake_minimum_required(VERSION 3.8)
project(humanoid_interfaces)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)
find_package(sensor_msgs REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/JointCommand.msg"
"msg/HumanoidState.msg"
"srv/GetJointState.srv"
DEPENDENCIES std_msgs sensor_msgs
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()
QoS Policiesβ
Quality of Service Configurationβ
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy
# Reliable communication (for critical commands)
reliable_qos = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
depth=10
)
# Best-effort communication (for sensor data)
sensor_qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
durability=QoSDurabilityPolicy.VOLATILE,
depth=100
)
# Create publishers with different QoS
self.command_pub = self.create_publisher(
String, '/commands', reliable_qos)
self.sensor_pub = self.create_publisher(
Image, '/camera/image', sensor_qos)
Resourcesβ
Documentationβ
Toolsβ
- ros2 interface - Interface introspection
- rqt_graph - Node graph visualization
Exercisesβ
- Publisher/Subscriber Pair: Create a node pair that communicates via custom messages
- Multi-Node System: Launch 3+ nodes that communicate through topics
- QoS Experiment: Test different QoS policies and observe behavior
- C++ Conversion: Convert the Python node to C++ (or vice versa)
Next Stepsβ
Proceed to Week 5 - ROS 2 Services and Actions for synchronous and asynchronous communication patterns.