Lesson 4.3 – Complete System Integration
Learning Objectives
By the end of this lesson, you will be able to:
- Implement complete perception-to-action pipelines for elementary tasks
- Perform end-to-end system validation
- Create complete integrated systems with validation tests
- Validate simulation compatibility with real hardware interfaces
- Implement time synchronization between real and simulation time
- Create hardware abstraction layers for simulation compatibility
Concept Overview and Scope
This lesson focuses on integrating all components developed in previous chapters into a complete, functioning system. We'll implement complete perception-to-action pipelines, validate the entire system, and ensure compatibility between simulation and real hardware. This represents the culmination of Module 1, where we connect the communication infrastructure, robot description, Python AI agents, and simulation environment into a unified system.
Complete Perception-to-Action Pipeline Implementation
In this section, we'll implement a complete perception-to-action pipeline that integrates all the components we've developed:
1. Complete Pipeline Architecture
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan, Image, JointState, Imu
from geometry_msgs.msg import Twist
from std_msgs.msg import String, Float64MultiArray
from cv_bridge import CvBridge
import cv2
import numpy as np
import math
from collections import deque
class CompletePerceptionActionPipeline(Node):
def __init__(self):
super().__init__('complete_perception_action_pipeline')
# Subscriptions for all sensor modalities
self.laser_sub = self.create_subscription(
LaserScan, '/scan', self.laser_callback, 10)
self.camera_sub = self.create_subscription(
Image, '/camera/image_raw', self.camera_callback, 10)
self.joint_sub = self.create_subscription(
JointState, '/joint_states', self.joint_callback, 10)
self.imu_sub = self.create_subscription(
Imu, '/imu/data', self.imu_callback, 10)
# Publishers for all action modalities
self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.joint_cmd_pub = self.create_publisher(Float64MultiArray, '/position_controller/commands', 10)
self.status_pub = self.create_publisher(String, '/system_status', 10)
# CV Bridge for image processing
self.cv_bridge = CvBridge()
# State variables and buffers
self.laser_buffer = deque(maxlen=5) # Store last 5 laser scans
self.image_buffer = deque(maxlen=3) # Store last 3 images
self.joint_buffer = deque(maxlen=10) # Store last 10 joint states
self.imu_buffer = deque(maxlen=10) # Store last 10 IMU readings
# System state
self.current_behavior = "IDLE"
self.system_health = {"sensors": True, "controllers": True, "communication": True}
self.last_sensor_update = self.get_clock().now()
# Pipeline processing timer (20Hz)
self.pipeline_timer = self.create_timer(0.05, self.pipeline_loop)
# Health monitoring timer (1Hz)
self.health_timer = self.create_timer(1.0, self.health_check)
self.get_logger().info('Complete Perception-to-Action Pipeline has been initialized')
def laser_callback(self, msg):
self.laser_buffer.append(msg)
self.last_sensor_update = self.get_clock().now()
def camera_callback(self, msg):
try:
cv_image = self.cv_bridge.imgmsg_to_cv2(msg, "bgr8")
self.image_buffer.append(cv_image)
except Exception as e:
self.get_logger().error(f'Error converting image: {e}')
def joint_callback(self, msg):
self.joint_buffer.append(msg)
def imu_callback(self, msg):
self.imu_buffer.append(msg)
def pipeline_loop(self):
"""Main pipeline processing loop"""
# Check if we have recent sensor data
time_since_update = self.get_clock().now() - self.last_sensor_update
if time_since_update.nanoseconds / 1e9 > 2.0: # No data for 2 seconds
self.current_behavior = "NO_SENSOR_DATA"
self.system_health["sensors"] = False
self.publish_status(f"ERROR: No sensor data for {time_since_update.nanoseconds / 1e9:.1f}s")
return
self.system_health["sensors"] = True
# Process sensor data and make decisions
action_command = self.process_sensors_and_decide()
# Execute action
self.execute_action(action_command)
# Publish system status
self.publish_status(f"BEHAVIOR: {self.current_behavior} | HEALTH: {self.system_health}")
def process_sensors_and_decide(self):
"""Process all sensor data and decide on action"""
# Get latest sensor data
latest_laser = self.laser_buffer[-1] if self.laser_buffer else None
latest_image = self.image_buffer[-1] if self.image_buffer else None
latest_joints = self.joint_buffer[-1] if self.joint_buffer else None
latest_imu = self.imu_buffer[-1] if self.imu_buffer else None
# Initialize command
command = {
"cmd_vel": Twist(),
"joint_cmd": Float64MultiArray(),
"behavior": "IDLE"
}
if latest_laser is None:
return command
# Multi-sensor fusion for decision making
# 1. Obstacle detection from laser
obstacles_detected = self.analyze_obstacles(latest_laser)
# 2. Visual processing
visual_features = self.analyze_visual_data(latest_image) if latest_image is not None else {}
# 3. Joint state analysis
joint_analysis = self.analyze_joints(latest_joints) if latest_joints is not None else {}
# 4. IMU analysis
imu_analysis = self.analyze_imu(latest_imu) if latest_imu is not None else {}
# Decision logic based on all sensor inputs
if obstacles_detected["front"] < 0.5:
# Emergency stop for close obstacles
command["cmd_vel"].linear.x = 0.0
command["cmd_vel"].angular.z = 0.0
command["behavior"] = "EMERGENCY_STOP"
elif "red_object" in visual_features and visual_features["red_object"]["distance"] < 2.0:
# Approach red object
command["cmd_vel"].linear.x = 0.3
command["cmd_vel"].angular.z = visual_features["red_object"]["angular_correction"]
command["behavior"] = "APPROACH_RED_OBJECT"
elif obstacles_detected["front"] < 1.0:
# Avoid obstacles
command["cmd_vel"].linear.x = 0.0
if obstacles_detected["left"] > obstacles_detected["right"]:
command["cmd_vel"].angular.z = 0.5 # Turn left
else:
command["cmd_vel"].angular.z = -0.5 # Turn right
command["behavior"] = "AVOIDING_OBSTACLE"
else:
# Normal exploration
command["cmd_vel"].linear.x = 0.5
command["cmd_vel"].angular.z = 0.0
command["behavior"] = "EXPLORING"
self.current_behavior = command["behavior"]
return command
def analyze_obstacles(self, laser_msg):
"""Analyze laser scan for obstacles"""
ranges = laser_msg.ranges
n = len(ranges)
# Divide scan into sectors
front_ranges = ranges[:n//8] + ranges[-n//8:]
left_ranges = ranges[n*3//8:n*5//8]
right_ranges = ranges[n*5//8:n*7//8]
# Calculate minimum distances in each sector
front_min = min(front_ranges) if front_ranges else float('inf')
left_min = min(left_ranges) if left_ranges else float('inf')
right_min = min(right_ranges) if right_ranges else float('inf')
return {
"front": front_min,
"left": left_min,
"right": right_min
}
def analyze_visual_data(self, image):
"""Analyze visual data for features"""
if image is None:
return {}
features = {}
# Look for red objects
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
red_mask = cv2.inRange(hsv, (0, 100, 100), (10, 255, 255))
red_mask = cv2.inRange(hsv, (170, 100, 100), (180, 255, 255)) | red_mask
# Find contours of red objects
contours, _ = cv2.findContours(red_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
# Find largest red contour
largest_contour = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest_contour) > 500: # Only consider significant objects
# Calculate center of the object
M = cv2.moments(largest_contour)
if M["m00"] != 0:
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
# Calculate angular correction needed to center the object
img_center_x = image.shape[1] / 2
angular_correction = (cx - img_center_x) / img_center_x * 0.5 # Scale to reasonable range
features["red_object"] = {
"distance": 1.0, # Placeholder - would use depth in real system
"angular_correction": angular_correction
}
return features
def analyze_joints(self, joint_msg):
"""Analyze joint states"""
if joint_msg is None:
return {}
analysis = {
"positions": list(joint_msg.position),
"velocities": list(joint_msg.velocity),
"efforts": list(joint_msg.effort)
}
return analysis
def analyze_imu(self, imu_msg):
"""Analyze IMU data"""
if imu_msg is None:
return {}
analysis = {
"orientation": {
"x": imu_msg.orientation.x,
"y": imu_msg.orientation.y,
"z": imu_msg.orientation.z,
"w": imu_msg.orientation.w
},
"angular_velocity": {
"x": imu_msg.angular_velocity.x,
"y": imu_msg.angular_velocity.y,
"z": imu_msg.angular_velocity.z
},
"linear_acceleration": {
"x": imu_msg.linear_acceleration.x,
"y": imu_msg.linear_acceleration.y,
"z": imu_msg.linear_acceleration.z
}
}
return analysis
def execute_action(self, command):
"""Execute the decided action"""
# Publish velocity command
self.cmd_vel_pub.publish(command["cmd_vel"])
# Publish joint commands if needed
if command["joint_cmd"].data:
self.joint_cmd_pub.publish(command["joint_cmd"])
def publish_status(self, status_msg):
"""Publish system status"""
msg = String()
msg.data = status_msg
self.status_pub.publish(msg)
def health_check(self):
"""Perform system health check"""
# Check if we have recent joint data
if self.joint_buffer:
latest_joint_time = self.get_clock().now() # In a real system, we'd check the actual timestamp
# This is a simplified check - in practice, we'd verify communication with real joints
self.system_health["controllers"] = True
# Check communication health
self.system_health["communication"] = True # Simplified check
def main(args=None):
rclpy.init(args=args)
node = CompletePerceptionActionPipeline()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down complete pipeline...')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
End-to-End System Validation
In this section, we'll implement comprehensive validation techniques for our integrated system:
1. Validation Framework
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from geometry_msgs.msg import Twist
from std_msgs.msg import String
from builtin_interfaces.msg import Time
import time
import json
class SystemValidator(Node):
def __init__(self):
super().__init__('system_validator')
# Subscriptions for monitoring system behavior
self.joint_sub = self.create_subscription(JointState, '/joint_states', self.joint_callback, 10)
self.cmd_sub = self.create_subscription(Twist, '/cmd_vel', self.cmd_callback, 10)
self.status_sub = self.create_subscription(String, '/system_status', self.status_callback, 10)
# Validation parameters
self.validation_results = {
"sensor_data_validity": True,
"control_response_time": 0.0,
"communication_stability": True,
"behavior_consistency": True
}
# Validation timer (check every 5 seconds)
self.validation_timer = self.create_timer(5.0, self.run_validation)
# Performance monitoring
self.cmd_timestamps = []
self.joint_timestamps = []
self.get_logger().info('System Validator has been initialized')
def joint_callback(self, msg):
# Record joint state timestamp for performance analysis
self.joint_timestamps.append(self.get_clock().now())
def cmd_callback(self, msg):
# Record command timestamp for response time analysis
self.cmd_timestamps.append(self.get_clock().now())
def status_callback(self, msg):
# Monitor system status for behavioral consistency
if "ERROR" in msg.data:
self.validation_results["behavior_consistency"] = False
def run_validation(self):
"""Run comprehensive system validation"""
self.get_logger().info('Starting system validation...')
# 1. Sensor data validity check
self.check_sensor_validity()
# 2. Control response time check
self.check_control_response_time()
# 3. Communication stability check
self.check_communication_stability()
# 4. Behavior consistency check
self.check_behavior_consistency()
# Report validation results
self.report_validation_results()
def check_sensor_validity(self):
"""Validate that sensor data is reasonable"""
# This is a placeholder - in practice, we'd check for:
# - Data ranges (e.g., joint positions within limits)
# - Update frequency (sensors publishing at expected rate)
# - Data quality (no NaN or inf values)
self.validation_results["sensor_data_validity"] = True
def check_control_response_time(self):
"""Check that control commands are responded to in time"""
if len(self.cmd_timestamps) > 1 and len(self.joint_timestamps) > 1:
# Calculate response time between command and joint movement
cmd_time = self.cmd_timestamps[-1]
joint_time = self.joint_timestamps[-1]
response_time = (joint_time.nanoseconds - cmd_time.nanoseconds) / 1e9
self.validation_results["control_response_time"] = response_time
if response_time > 0.5: # More than 500ms is too slow
self.get_logger().warning(f'Control response time is too slow: {response_time:.3f}s')
self.validation_results["communication_stability"] = False
def check_communication_stability(self):
"""Check for communication stability"""
# Check if topics are publishing at expected rates
# This is a simplified check
self.validation_results["communication_stability"] = True
def check_behavior_consistency(self):
"""Check that system behaves consistently"""
# This would involve more complex behavioral validation
# For now, we just ensure no error messages in status
pass
def report_validation_results(self):
"""Report validation results"""
self.get_logger().info('--- System Validation Results ---')
for test, result in self.validation_results.items():
status = "PASS" if result else "FAIL"
self.get_logger().info(f'{test}: {status}')
self.get_logger().info('--- End Validation Results ---')
def main(args=None):
rclpy.init(args=args)
validator = SystemValidator()
# Run validation for 30 seconds
start_time = time.time()
end_time = start_time + 30
try:
while time.time() < end_time:
rclpy.spin_once(validator, timeout_sec=0.1)
except KeyboardInterrupt:
pass
finally:
validator.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Hardware Abstraction Layer Implementation
To ensure compatibility between simulation and real hardware, we need to implement hardware abstraction:
1. Hardware Interface Abstraction
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState, Imu
from geometry_msgs.msg import Twist
from std_msgs.msg import Float64MultiArray
import threading
import time
class HardwareInterface(Node):
def __init__(self, is_simulation=True):
super().__init__('hardware_interface')
self.is_simulation = is_simulation
self.hardware_connected = False
self.lock = threading.Lock()
# Publishers and subscribers
self.joint_state_pub = self.create_publisher(JointState, '/joint_states', 10)
self.imu_pub = self.create_publisher(Imu, '/imu/data', 10)
self.cmd_vel_sub = self.create_subscription(Twist, '/cmd_vel', self.cmd_vel_callback, 10)
self.joint_cmd_sub = self.create_subscription(Float64MultiArray, '/joint_commands', self.joint_cmd_callback, 10)
# Initialize hardware interface
self.initialize_hardware()
def initialize_hardware(self):
"""Initialize connection to hardware or simulation"""
if self.is_simulation:
self.get_logger().info('Initializing in simulation mode')
# In simulation, we rely on Gazebo plugins for hardware simulation
self.hardware_connected = True
else:
self.get_logger().info('Initializing in real hardware mode')
# Connect to real hardware (simplified example)
try:
# Connect to actual hardware controller
# self.hardware_controller = RealHardwareController()
# self.hardware_connected = self.hardware_controller.connect()
self.hardware_connected = True # Placeholder
self.get_logger().info('Hardware connected successfully')
except Exception as e:
self.get_logger().error(f'Failed to connect to hardware: {e}')
self.hardware_connected = False
def cmd_vel_callback(self, msg):
"""Handle velocity commands"""
with self.lock:
if self.hardware_connected:
if self.is_simulation:
# In simulation, commands are handled by Gazebo
# We just validate the command
self.validate_and_publish_cmd_vel(msg)
else:
# Send command to real hardware
self.send_cmd_vel_to_hardware(msg)
def joint_cmd_callback(self, msg):
"""Handle joint commands"""
with self.lock:
if self.hardware_connected:
if self.is_simulation:
# In simulation, joint commands go through controller manager
self.publish_joint_commands(msg)
else:
# Send joint commands to real hardware
self.send_joint_commands_to_hardware(msg)
def validate_and_publish_cmd_vel(self, cmd_vel):
"""Validate and publish velocity commands (simulation mode)"""
# Apply safety limits
max_linear = 1.0 # m/s
max_angular = 1.0 # rad/s
cmd_vel.linear.x = max(min(cmd_vel.linear.x, max_linear), -max_linear)
cmd_vel.angular.z = max(min(cmd_vel.angular.z, max_angular), -max_angular)
# In simulation, the command is handled by the simulation environment
# We just validate and potentially log
self.get_logger().debug(f'Validated cmd_vel: linear={cmd_vel.linear.x}, angular={cmd_vel.angular.z}')
def send_cmd_vel_to_hardware(self, cmd_vel):
"""Send velocity commands to real hardware"""
# This would send commands to real hardware controller
# For example: self.hardware_controller.send_velocity_command(cmd_vel)
self.get_logger().info(f'Sent cmd_vel to hardware: linear={cmd_vel.linear.x}, angular={cmd_vel.angular.z}')
def publish_joint_commands(self, joint_cmd):
"""Publish joint commands (simulation mode)"""
# In simulation, commands are sent to controller manager topics
# This would be handled by the controller manager
self.get_logger().debug(f'Published joint commands: {len(joint_cmd.data)} joints')
def send_joint_commands_to_hardware(self, joint_cmd):
"""Send joint commands to real hardware"""
# Send commands to real hardware controller
# For example: self.hardware_controller.send_joint_commands(joint_cmd.data)
self.get_logger().info(f'Sent joint commands to hardware: {len(joint_cmd.data)} joints')
def get_sensor_data(self):
"""Get sensor data from hardware or simulation"""
with self.lock:
if self.is_simulation:
# In simulation, sensor data comes from Gazebo plugins
# This is handled by the simulation environment
return None
else:
# Get data from real hardware sensors
# For example: return self.hardware_controller.get_sensor_data()
return None
def main(args=None):
rclpy.init(args=args)
# Create both simulation and real hardware interfaces for testing
sim_interface = HardwareInterface(is_simulation=True)
try:
rclpy.spin(sim_interface)
except KeyboardInterrupt:
pass
finally:
sim_interface.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Time Synchronization Implementation
Proper time synchronization is crucial for both simulation and real hardware:
1. Time Synchronization Node
import rclpy
from rclpy.node import Node
from builtin_interfaces.msg import Time
from std_msgs.msg import Header
import time
class TimeSynchronizer(Node):
def __init__(self):
super().__init__('time_synchronizer')
# Time synchronization parameters
self.use_sim_time = self.declare_parameter('use_sim_time', False).value
self.time_offset = 0.0 # Offset between real and simulation time
self.time_scale = 1.0 # Time scaling factor for simulation
# Time synchronization publisher
self.time_sync_pub = self.create_publisher(Time, '/time_sync', 10)
# Time sync timer (10Hz)
self.sync_timer = self.create_timer(0.1, self.synchronize_time)
self.get_logger().info(f'Time Synchronizer initialized (use_sim_time: {self.use_sim_time})')
def synchronize_time(self):
"""Synchronize time between real and simulation environments"""
if self.use_sim_time:
# In simulation, use Gazebo's simulation time
# This is typically handled automatically by ROS2 parameters
sim_time_msg = self.get_clock().now().to_msg()
self.time_sync_pub.publish(sim_time_msg)
else:
# In real hardware, use system time
real_time_msg = self.get_clock().now().to_msg()
self.time_sync_pub.publish(real_time_msg)
def main(args=None):
rclpy.init(args=args)
synchronizer = TimeSynchronizer()
try:
rclpy.spin(synchronizer)
except KeyboardInterrupt:
pass
finally:
synchronizer.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Complete Integration Launch File
Let's create a comprehensive launch file that brings all components together:
Python Launch File
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription, RegisterEventHandler
from launch.event_handlers import OnProcessStart
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import Command, LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
# Package names
pkg_share = FindPackageShare("my_robot_description").find("my_robot_description")
gazebo_pkg_share = FindPackageShare("gazebo_ros").find("gazebo_ros")
# Arguments
use_sim_time = LaunchConfiguration('use_sim_time', default='true')
is_simulation = LaunchConfiguration('is_simulation', default='true')
# Robot description
robot_description_content = Command([
'xacro ',
PathJoinSubstitution([pkg_share, 'urdf', 'my_robot.urdf.xacro'])
])
# Robot state publisher
robot_state_publisher_node = Node(
package='robot_state_publisher',
executable='robot_state_publisher',
name='robot_state_publisher',
parameters=[{
'use_sim_time': use_sim_time,
'robot_description': robot_description_content
}]
)
# Joint state publisher
joint_state_publisher_node = Node(
package='joint_state_publisher',
executable='joint_state_publisher',
name='joint_state_publisher',
parameters=[{'use_sim_time': use_sim_time}]
)
# Complete perception-action pipeline
perception_action_node = Node(
package='my_robot_control',
executable='complete_perception_action_pipeline',
name='complete_perception_action_pipeline',
parameters=[{'use_sim_time': use_sim_time}],
remappings=[
('/scan', '/scan'),
('/camera/image_raw', '/camera/image_raw'),
('/joint_states', '/joint_states'),
('/imu/data', '/imu/data'),
('/cmd_vel', '/cmd_vel'),
]
)
# System validator
system_validator_node = Node(
package='my_robot_control',
executable='system_validator',
name='system_validator',
parameters=[{'use_sim_time': use_sim_time}]
)
# Hardware interface
hardware_interface_node = Node(
package='my_robot_control',
executable='hardware_interface',
name='hardware_interface',
parameters=[
{'use_sim_time': use_sim_time},
{'is_simulation': is_simulation}
]
)
# Time synchronizer
time_synchronizer_node = Node(
package='my_robot_control',
executable='time_synchronizer',
name='time_synchronizer',
parameters=[{'use_sim_time': use_sim_time}]
)
# Gazebo launch (only if simulation)
gazebo_launch = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
os.path.join(gazebo_pkg_share, 'launch', 'gazebo.launch.py')
),
condition=lambda context: LaunchConfiguration('is_simulation').perform(context) == 'true'
)
# Spawn robot in Gazebo
spawn_entity_node = Node(
package='gazebo_ros',
executable='spawn_entity.py',
arguments=[
'-topic', 'robot_description',
'-entity', 'my_robot',
'-x', '0', '-y', '0', '-z', '1.0'
],
output='screen',
condition=lambda context: LaunchConfiguration('is_simulation').perform(context) == 'true'
)
return LaunchDescription([
DeclareLaunchArgument(
'use_sim_time',
default_value='true',
description='Use simulation time if true'
),
DeclareLaunchArgument(
'is_simulation',
default_value='true',
description='Run in simulation mode if true'
),
gazebo_launch,
robot_state_publisher_node,
joint_state_publisher_node,
spawn_entity_node,
perception_action_node,
system_validator_node,
hardware_interface_node,
time_synchronizer_node
])
Validation and Testing Procedures
1. Automated Validation Tests
import unittest
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from geometry_msgs.msg import Twist
from std_msgs.msg import String
from std_msgs.msg import Float64MultiArray
class TestCompleteSystemIntegration(unittest.TestCase):
def setUp(self):
rclpy.init()
self.node = Node('test_complete_system')
# Create test publishers
self.cmd_vel_pub = self.node.create_publisher(Twist, '/cmd_vel', 10)
self.joint_cmd_pub = self.node.create_publisher(Float64MultiArray, '/joint_commands', 10)
# Create test subscribers
self.status_sub = self.node.create_subscription(String, '/system_status', self.status_callback, 10)
self.joint_state_sub = self.node.create_subscription(JointState, '/joint_states', self.joint_state_callback, 10)
self.status_received = False
self.joint_state_received = False
def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()
def status_callback(self, msg):
self.status_received = True
def joint_state_callback(self, msg):
self.joint_state_received = True
def test_system_responds_to_commands(self):
"""Test that the system responds to velocity commands"""
cmd = Twist()
cmd.linear.x = 0.5
cmd.angular.z = 0.2
# Publish command
self.cmd_vel_pub.publish(cmd)
# Wait for response
timeout = 0
while not self.status_received and timeout < 100: # 10 seconds
rclpy.spin_once(self.node, timeout_sec=0.1)
timeout += 1
self.assertTrue(self.status_received, "System did not respond to velocity command")
def test_joint_state_updates(self):
"""Test that joint states are being published"""
timeout = 0
while not self.joint_state_received and timeout < 100: # 10 seconds
rclpy.spin_once(self.node, timeout_sec=0.1)
timeout += 1
self.assertTrue(self.joint_state_received, "Joint states not received")
if __name__ == '__main__':
unittest.main()
Step-by-Step Integration Exercise
Follow these steps to integrate and validate your complete system:
- Launch the complete system using the integrated launch file
- Monitor system status through the status topics
- Send test commands to verify the perception-action pipeline works
- Run validation tests to ensure all components are functioning
- Test hardware abstraction by switching between simulation and real hardware modes
- Validate time synchronization between real and simulation environments
Summary
In this lesson, you learned:
- How to implement complete perception-to-action pipelines for elementary tasks
- How to perform end-to-end system validation
- How to create complete integrated systems with validation tests
- How to validate simulation compatibility with real hardware interfaces
- How to implement time synchronization between real and simulation time
- How to create hardware abstraction layers for simulation compatibility
This completes Module 1 of the Physical AI & Humanoid Robotics course. You now have a complete understanding of how to create a "robotic nervous system" using ROS2, from basic communication patterns through complete AI integration and simulation readiness.
Next Steps
With Module 1 complete, you're now ready to advance to Module 2, where you'll explore more advanced AI integration, vision-language-action systems, and complex decision-making algorithms that build upon the foundation you've established here.