Project 4: Advanced Line Following Robot
Overview
Build a complete autonomous line-following system with advanced PID control, intersection detection, and path decision making. This project combines computer vision, control theory, and state machine design to create a robust navigation system.
Learning Objectives
By completing this project, you will:
- Implement robust line detection with color segmentation
- Design and tune multi-stage PID controllers
- Build state machines for intersection handling
- Create a web dashboard for monitoring and tuning
- Understand control system performance metrics
Prerequisites
| Lab | Topic | Skills Used |
|---|---|---|
| Project 3 | Autonomous Navigator | YOLO, tracking, PID basics |
| ROS2 Lab 6 | Line Following | Basic line detection |
| ROS2 Lab 7 | Services | ROS2 service architecture |
| Docker Lab 2 | ROS2 in Docker | Containerized deployment |
Hardware Requirements
- Mini Pupper v2 with camera
- Line track (tape on floor)
- Color markers for intersections (optional)
Part 1: System Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ Advanced Line Following System │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Vision Pipeline │ │
│ │ ┌──────────┐ ┌──────────────┐ ┌───────────────────┐ │ │
│ │ │ Camera │─▶│ Color │─▶│ Line Detection │ │ │
│ │ │ Input │ │ Segmentation│ │ (Contours/Edge) │ │ │
│ │ └──────────┘ └──────────────┘ └─────────┬─────────┘ │ │
│ │ │ │ │
│ │ ┌──────────────────┐ ┌───────────────────┴────────┐ │ │
│ │ │ Intersection │◀─┤ Line Centroid │ │ │
│ │ │ Detection │ │ Calculation │ │ │
│ │ └────────┬─────────┘ └───────────────────┬────────┘ │ │
│ └───────────┼────────────────────────────────┼────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Control System │ │
│ │ ┌──────────────────┐ ┌────────────────────────────┐ │ │
│ │ │ State Machine │ │ PID Controllers │ │ │
│ │ │ - Following │ │ - Angular (Steering) │ │ │
│ │ │ - Intersection │ │ - Linear (Speed) │ │ │
│ │ │ - Lost Line │ │ - Adaptive Gains │ │ │
│ │ │ - Recovery │ │ │ │ │
│ │ └──────────┬───────┘ └──────────────┬─────────────┘ │ │
│ └─────────────┼─────────────────────────┼─────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Robot Output │ │
│ │ ┌──────────────────────────────────────────────────────┐ │ │
│ │ │ /cmd_vel (Twist) │ │ │
│ │ │ linear.x = base_speed × speed_factor │ │ │
│ │ │ angular.z = pid_output │ │ │
│ │ └──────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Part 2: Project Setup
Step 1: Create Project Structure
mkdir -p ~/line_follower/{src,config,launch,web}
cd ~/line_follower
Step 2: Install Dependencies
pip3 install opencv-python numpy flask flask-socketio
Step 3: Create Configuration File
Create config/line_config.yaml:
# Line Detection Configuration
line_detection:
# Color thresholds (HSV)
color_lower: [0, 0, 0] # Black line lower bound
color_upper: [180, 255, 80] # Black line upper bound
# Alternative for colored tape
# color_lower: [100, 100, 100] # Blue line
# color_upper: [130, 255, 255]
# Region of Interest (% of image height)
roi_top: 0.5 # Start at 50% height
roi_bottom: 0.9 # End at 90% height
# Minimum contour area
min_contour_area: 500
# Line width detection (pixels)
expected_line_width: 50
max_line_width: 200
# PID Controller Configuration
pid:
angular:
kp: 0.8
ki: 0.001
kd: 0.3
max_output: 1.0
deadband: 0.02
linear:
kp: 0.5
ki: 0.0
kd: 0.1
max_output: 0.15
# Speed Settings
speed:
base_linear: 0.08
max_linear: 0.12
turn_reduction: 0.7 # Slow down when turning
# State Machine
state:
line_lost_timeout: 1.0 # Seconds before recovery
intersection_pause: 0.5 # Pause at intersections
search_rotation_speed: 0.3 # Speed during recovery
Part 3: Line Detection Module
Create src/line_detector.py:
#!/usr/bin/env python3
"""
Advanced Line Detection with Multi-Stage Processing
"""
import cv2
import numpy as np
from dataclasses import dataclass
from typing import Tuple, Optional, List
import yaml
@dataclass
class LineDetection:
"""Detection results"""
detected: bool
centroid_x: float # -1.0 (left) to 1.0 (right), 0 = center
centroid_y: float # Normalized y position
angle: float # Line angle in degrees
width: float # Detected line width
confidence: float # Detection confidence 0-1
intersection: bool # Intersection detected
intersection_type: str # 'T', 'X', 'L', or None
mask: np.ndarray # Binary mask for visualization
class LineDetector:
"""Multi-stage line detection pipeline"""
def __init__(self, config_path: str = "config/line_config.yaml"):
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
self.config = config['line_detection']
# Color thresholds
self.color_lower = np.array(self.config['color_lower'])
self.color_upper = np.array(self.config['color_upper'])
# ROI settings
self.roi_top = self.config['roi_top']
self.roi_bottom = self.config['roi_bottom']
# Line parameters
self.min_area = self.config['min_contour_area']
self.expected_width = self.config['expected_line_width']
self.max_width = self.config['max_line_width']
# History for smoothing
self.centroid_history = []
self.history_size = 5
def detect(self, frame: np.ndarray) -> LineDetection:
"""Main detection pipeline"""
height, width = frame.shape[:2]
# Step 1: Extract ROI
roi_y_start = int(height * self.roi_top)
roi_y_end = int(height * self.roi_bottom)
roi = frame[roi_y_start:roi_y_end, :]
# Step 2: Color segmentation
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv, self.color_lower, self.color_upper)
# Step 3: Morphological operations
kernel = np.ones((5, 5), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
# Step 4: Find contours
contours, _ = cv2.findContours(
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
if not contours:
return self._no_detection(mask)
# Step 5: Find the main line contour
line_contour = self._find_line_contour(contours, width)
if line_contour is None:
return self._no_detection(mask)
# Step 6: Calculate centroid and properties
centroid_x, centroid_y = self._calculate_centroid(line_contour, width)
angle = self._calculate_angle(line_contour)
line_width = self._calculate_width(line_contour)
confidence = self._calculate_confidence(line_contour, width)
# Step 7: Intersection detection
intersection, int_type = self._detect_intersection(mask, width)
# Step 8: Smooth centroid
smoothed_x = self._smooth_centroid(centroid_x)
return LineDetection(
detected=True,
centroid_x=smoothed_x,
centroid_y=centroid_y,
angle=angle,
width=line_width,
confidence=confidence,
intersection=intersection,
intersection_type=int_type,
mask=mask
)
def _find_line_contour(self, contours: List,
image_width: int) -> Optional[np.ndarray]:
"""Find the most likely line contour"""
valid_contours = []
for contour in contours:
area = cv2.contourArea(contour)
if area < self.min_area:
continue
# Check aspect ratio
x, y, w, h = cv2.boundingRect(contour)
# Line should be taller than wide (vertical in ROI)
# or appropriately wide for horizontal segments
if w < self.max_width or h > w * 0.3:
valid_contours.append((contour, area))
if not valid_contours:
return None
# Return largest valid contour
valid_contours.sort(key=lambda x: x[1], reverse=True)
return valid_contours[0][0]
def _calculate_centroid(self, contour: np.ndarray,
image_width: int) -> Tuple[float, float]:
"""Calculate normalized centroid position"""
M = cv2.moments(contour)
if M['m00'] == 0:
return 0.0, 0.0
cx = int(M['m10'] / M['m00'])
cy = int(M['m01'] / M['m00'])
# Normalize to [-1, 1] where 0 is center
normalized_x = (cx - image_width / 2) / (image_width / 2)
normalized_y = cy / contour.shape[0] if len(contour) > 0 else 0.5
return normalized_x, normalized_y
def _calculate_angle(self, contour: np.ndarray) -> float:
"""Calculate line angle from contour"""
if len(contour) < 5:
return 0.0
# Fit line to contour
[vx, vy, x, y] = cv2.fitLine(contour, cv2.DIST_L2, 0, 0.01, 0.01)
angle = np.arctan2(vy, vx) * 180 / np.pi
return float(angle[0])
def _calculate_width(self, contour: np.ndarray) -> float:
"""Calculate line width"""
x, y, w, h = cv2.boundingRect(contour)
return float(min(w, h))
def _calculate_confidence(self, contour: np.ndarray,
image_width: int) -> float:
"""Calculate detection confidence"""
area = cv2.contourArea(contour)
max_area = image_width * (image_width * 0.3) * 0.5
# Confidence based on area
area_score = min(area / max_area, 1.0)
# Penalty for extreme positions
M = cv2.moments(contour)
if M['m00'] > 0:
cx = M['m10'] / M['m00']
position_score = 1.0 - abs(cx - image_width/2) / (image_width/2)
else:
position_score = 0.5
return 0.7 * area_score + 0.3 * position_score
def _detect_intersection(self, mask: np.ndarray,
width: int) -> Tuple[bool, str]:
"""Detect intersections in the mask"""
height = mask.shape[0]
# Divide mask into horizontal slices
slice_height = height // 4
slice_widths = []
for i in range(4):
y_start = i * slice_height
y_end = (i + 1) * slice_height
slice_mask = mask[y_start:y_end, :]
# Find width of line in this slice
cols = np.sum(slice_mask > 0, axis=0)
if np.max(cols) > 0:
line_pixels = np.where(cols > slice_height * 0.3)[0]
if len(line_pixels) > 0:
slice_widths.append(line_pixels[-1] - line_pixels[0])
else:
slice_widths.append(0)
else:
slice_widths.append(0)
# Detect intersection types
if len(slice_widths) >= 2:
top_width = np.mean(slice_widths[:2])
bottom_width = np.mean(slice_widths[2:])
# Significant widening indicates intersection
if bottom_width > top_width * 1.8 and bottom_width > self.expected_width * 2:
# Count branches
bottom_slice = mask[height*3//4:, :]
contours, _ = cv2.findContours(
bottom_slice, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
if len(contours) >= 3:
return True, 'X'
elif len(contours) >= 2:
return True, 'T'
elif bottom_width > self.expected_width * 3:
return True, 'T'
return False, None
def _smooth_centroid(self, centroid_x: float) -> float:
"""Apply smoothing to centroid"""
self.centroid_history.append(centroid_x)
if len(self.centroid_history) > self.history_size:
self.centroid_history.pop(0)
# Weighted average (more recent = higher weight)
weights = np.linspace(0.5, 1.0, len(self.centroid_history))
return np.average(self.centroid_history, weights=weights)
def _no_detection(self, mask: np.ndarray) -> LineDetection:
"""Return no detection result"""
return LineDetection(
detected=False,
centroid_x=0.0,
centroid_y=0.0,
angle=0.0,
width=0.0,
confidence=0.0,
intersection=False,
intersection_type=None,
mask=mask
)
def visualize(self, frame: np.ndarray,
detection: LineDetection) -> np.ndarray:
"""Create visualization of detection"""
vis = frame.copy()
height, width = vis.shape[:2]
# Draw ROI
roi_y_start = int(height * self.roi_top)
roi_y_end = int(height * self.roi_bottom)
cv2.rectangle(vis, (0, roi_y_start), (width, roi_y_end), (0, 255, 255), 2)
if detection.detected:
# Draw centroid line
cx = int((detection.centroid_x + 1) / 2 * width)
cv2.line(vis, (cx, roi_y_start), (cx, roi_y_end), (0, 255, 0), 3)
# Draw center reference
cv2.line(vis, (width//2, roi_y_start), (width//2, roi_y_end),
(255, 0, 0), 1)
# Status text
status = f"Line: {detection.centroid_x:.2f} | Conf: {detection.confidence:.2f}"
cv2.putText(vis, status, (10, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.7, (0, 255, 0), 2)
if detection.intersection:
cv2.putText(vis, f"INTERSECTION: {detection.intersection_type}",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
else:
cv2.putText(vis, "LINE LOST", (10, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.7, (0, 0, 255), 2)
return vis
Part 4: PID Controller Module
Create src/pid_controller.py:
#!/usr/bin/env python3
"""
Advanced PID Controller with Anti-Windup and Adaptive Gains
"""
import time
from dataclasses import dataclass
from typing import Optional, Tuple
import yaml
import numpy as np
@dataclass
class PIDState:
"""PID controller state for monitoring"""
error: float
p_term: float
i_term: float
d_term: float
output: float
setpoint: float
measurement: float
class PIDController:
"""PID controller with advanced features"""
def __init__(self, kp: float, ki: float, kd: float,
max_output: float = 1.0,
min_output: float = None,
deadband: float = 0.0,
anti_windup: float = None):
"""
Initialize PID controller
Args:
kp: Proportional gain
ki: Integral gain
kd: Derivative gain
max_output: Maximum output value
min_output: Minimum output (default: -max_output)
deadband: Error deadband (no output within deadband)
anti_windup: Integral anti-windup limit
"""
self.kp = kp
self.ki = ki
self.kd = kd
self.max_output = max_output
self.min_output = min_output if min_output is not None else -max_output
self.deadband = deadband
self.anti_windup = anti_windup if anti_windup else max_output
# State variables
self.integral = 0.0
self.last_error = 0.0
self.last_time = None
self.last_measurement = None
# State for monitoring
self.state = PIDState(0, 0, 0, 0, 0, 0, 0)
def update(self, setpoint: float, measurement: float) -> float:
"""
Calculate PID output
Args:
setpoint: Desired value
measurement: Current value
Returns:
Control output
"""
current_time = time.time()
# Calculate dt
if self.last_time is None:
dt = 0.02 # Default 50Hz
else:
dt = current_time - self.last_time
# Prevent division by zero
dt = max(dt, 0.001)
# Calculate error
error = setpoint - measurement
# Apply deadband
if abs(error) < self.deadband:
error = 0.0
# Proportional term
p_term = self.kp * error
# Integral term with anti-windup
self.integral += error * dt
self.integral = np.clip(self.integral, -self.anti_windup, self.anti_windup)
i_term = self.ki * self.integral
# Derivative term (on measurement to avoid derivative kick)
if self.last_measurement is not None:
d_measurement = (measurement - self.last_measurement) / dt
d_term = -self.kd * d_measurement # Negative because using measurement
else:
d_term = 0.0
# Calculate output
output = p_term + i_term + d_term
# Apply output limits
output = np.clip(output, self.min_output, self.max_output)
# Update state
self.last_error = error
self.last_time = current_time
self.last_measurement = measurement
# Save state for monitoring
self.state = PIDState(
error=error,
p_term=p_term,
i_term=i_term,
d_term=d_term,
output=output,
setpoint=setpoint,
measurement=measurement
)
return output
def reset(self):
"""Reset controller state"""
self.integral = 0.0
self.last_error = 0.0
self.last_time = None
self.last_measurement = None
def set_gains(self, kp: float = None, ki: float = None, kd: float = None):
"""Update PID gains"""
if kp is not None:
self.kp = kp
if ki is not None:
self.ki = ki
if kd is not None:
self.kd = kd
def get_state(self) -> PIDState:
"""Get current state for monitoring"""
return self.state
class AdaptivePIDController(PIDController):
"""PID controller with gain scheduling based on error"""
def __init__(self, *args,
gain_schedule: dict = None, **kwargs):
"""
Args:
gain_schedule: Dict mapping error thresholds to gain multipliers
Example: {0.1: 0.5, 0.3: 1.0, 0.6: 1.5}
"""
super().__init__(*args, **kwargs)
# Default gain schedule
self.gain_schedule = gain_schedule or {
0.1: 0.5, # Small error: reduce gains
0.3: 1.0, # Medium error: normal gains
0.6: 1.5, # Large error: increase gains
}
# Base gains
self.base_kp = self.kp
self.base_ki = self.ki
self.base_kd = self.kd
def update(self, setpoint: float, measurement: float) -> float:
"""Update with adaptive gains"""
error = abs(setpoint - measurement)
# Find appropriate gain multiplier
multiplier = 1.0
for threshold, mult in sorted(self.gain_schedule.items()):
if error >= threshold:
multiplier = mult
# Apply gain scheduling
self.kp = self.base_kp * multiplier
self.ki = self.base_ki * multiplier
self.kd = self.base_kd * multiplier
return super().update(setpoint, measurement)
class LineFollowingController:
"""Dual PID controller for line following"""
def __init__(self, config_path: str = "config/line_config.yaml"):
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
pid_config = config['pid']
speed_config = config['speed']
# Angular (steering) controller - tracks line center
ang = pid_config['angular']
self.angular_pid = AdaptivePIDController(
kp=ang['kp'],
ki=ang['ki'],
kd=ang['kd'],
max_output=ang['max_output'],
deadband=ang.get('deadband', 0.0)
)
# Linear (speed) controller - adjusts speed based on error
lin = pid_config['linear']
self.linear_pid = PIDController(
kp=lin['kp'],
ki=lin['ki'],
kd=lin['kd'],
max_output=lin['max_output']
)
# Speed settings
self.base_speed = speed_config['base_linear']
self.max_speed = speed_config['max_linear']
self.turn_reduction = speed_config['turn_reduction']
def compute(self, line_error: float,
confidence: float = 1.0) -> Tuple[float, float]:
"""
Compute control outputs
Args:
line_error: Normalized error (-1 to 1)
confidence: Detection confidence (0 to 1)
Returns:
Tuple of (linear_speed, angular_speed)
"""
# Angular control (steering)
angular = self.angular_pid.update(0.0, line_error)
# Speed reduction based on steering and confidence
turn_factor = 1.0 - abs(angular) * self.turn_reduction
confidence_factor = 0.5 + 0.5 * confidence
# Linear speed
linear = self.base_speed * turn_factor * confidence_factor
linear = max(0.02, min(linear, self.max_speed))
return linear, angular
def reset(self):
"""Reset both controllers"""
self.angular_pid.reset()
self.linear_pid.reset()
def get_states(self) -> Tuple[PIDState, PIDState]:
"""Get states of both controllers"""
return self.angular_pid.get_state(), self.linear_pid.get_state()
def set_angular_gains(self, kp: float = None, ki: float = None, kd: float = None):
"""Update angular PID gains"""
self.angular_pid.set_gains(kp, ki, kd)
if kp is not None:
self.angular_pid.base_kp = kp
if ki is not None:
self.angular_pid.base_ki = ki
if kd is not None:
self.angular_pid.base_kd = kd
Part 5: State Machine
Create src/state_machine.py:
#!/usr/bin/env python3
"""
State Machine for Line Following
"""
from enum import Enum, auto
from dataclasses import dataclass
from typing import Optional, Callable
import time
class State(Enum):
"""Robot states"""
IDLE = auto()
FOLLOWING = auto()
INTERSECTION = auto()
LINE_LOST = auto()
RECOVERY = auto()
STOPPED = auto()
@dataclass
class StateData:
"""Data associated with current state"""
state: State
duration: float # Time in current state
line_error: float # Last known line error
last_detection: float # Time of last detection
intersection_type: str # Type of intersection if any
recovery_direction: int # -1 = left, 1 = right
class LineFollowingStateMachine:
"""State machine for line following behavior"""
def __init__(self,
lost_timeout: float = 1.0,
intersection_pause: float = 0.5,
recovery_speed: float = 0.3):
self.lost_timeout = lost_timeout
self.intersection_pause = intersection_pause
self.recovery_speed = recovery_speed
# State tracking
self.current_state = State.IDLE
self.state_start_time = time.time()
self.last_detection_time = time.time()
self.last_line_error = 0.0
self.intersection_type = None
# Callbacks
self.on_state_change: Optional[Callable[[State, State], None]] = None
def update(self, detected: bool, line_error: float = 0.0,
intersection: bool = False,
intersection_type: str = None) -> StateData:
"""
Update state machine
Args:
detected: Whether line is detected
line_error: Current line error
intersection: Whether intersection detected
intersection_type: Type of intersection
Returns:
Current state data
"""
current_time = time.time()
if detected:
self.last_detection_time = current_time
self.last_line_error = line_error
state_duration = current_time - self.state_start_time
time_since_detection = current_time - self.last_detection_time
# State transitions
old_state = self.current_state
if self.current_state == State.IDLE:
if detected:
self._transition_to(State.FOLLOWING)
elif self.current_state == State.FOLLOWING:
if intersection:
self.intersection_type = intersection_type
self._transition_to(State.INTERSECTION)
elif not detected:
if time_since_detection > self.lost_timeout:
self._transition_to(State.LINE_LOST)
elif self.current_state == State.INTERSECTION:
if state_duration > self.intersection_pause:
if detected:
self._transition_to(State.FOLLOWING)
else:
self._transition_to(State.LINE_LOST)
elif self.current_state == State.LINE_LOST:
self._transition_to(State.RECOVERY)
elif self.current_state == State.RECOVERY:
if detected:
self._transition_to(State.FOLLOWING)
elif state_duration > 5.0: # Recovery timeout
self._transition_to(State.STOPPED)
elif self.current_state == State.STOPPED:
if detected:
self._transition_to(State.FOLLOWING)
# Notify on state change
if old_state != self.current_state and self.on_state_change:
self.on_state_change(old_state, self.current_state)
# Determine recovery direction
recovery_direction = 1 if self.last_line_error > 0 else -1
return StateData(
state=self.current_state,
duration=current_time - self.state_start_time,
line_error=self.last_line_error,
last_detection=self.last_detection_time,
intersection_type=self.intersection_type,
recovery_direction=recovery_direction
)
def _transition_to(self, new_state: State):
"""Transition to new state"""
self.current_state = new_state
self.state_start_time = time.time()
def start(self):
"""Start following"""
self._transition_to(State.IDLE)
def stop(self):
"""Stop following"""
self._transition_to(State.STOPPED)
def reset(self):
"""Reset state machine"""
self.current_state = State.IDLE
self.state_start_time = time.time()
self.last_detection_time = time.time()
self.last_line_error = 0.0
Part 6: Main Line Follower Node
Create src/line_follower_node.py:
#!/usr/bin/env python3
"""
ROS2 Line Following Node
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import Twist
from std_msgs.msg import String, Float32MultiArray
from std_srvs.srv import SetBool, Trigger
from cv_bridge import CvBridge
import cv2
import json
from line_detector import LineDetector
from pid_controller import LineFollowingController
from state_machine import LineFollowingStateMachine, State
class LineFollowerNode(Node):
"""Main line following node"""
def __init__(self):
super().__init__('line_follower')
# Parameters
self.declare_parameter('config_path', 'config/line_config.yaml')
config_path = self.get_parameter('config_path').value
# Components
self.detector = LineDetector(config_path)
self.controller = LineFollowingController(config_path)
self.state_machine = LineFollowingStateMachine()
self.bridge = CvBridge()
self.enabled = False
# Publishers
self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.debug_pub = self.create_publisher(Image, '/line_follower/debug', 10)
self.status_pub = self.create_publisher(String, '/line_follower/status', 10)
self.pid_pub = self.create_publisher(Float32MultiArray, '/line_follower/pid', 10)
# Subscribers
self.image_sub = self.create_subscription(
Image, '/image_raw', self.image_callback, 10
)
# Services
self.enable_srv = self.create_service(
SetBool, '/line_follower/enable', self.enable_callback
)
self.reset_srv = self.create_service(
Trigger, '/line_follower/reset', self.reset_callback
)
# State change callback
self.state_machine.on_state_change = self.on_state_change
self.get_logger().info('Line Follower Node initialized')
def image_callback(self, msg: Image):
"""Process camera images"""
if not self.enabled:
return
# Convert image
frame = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
# Detect line
detection = self.detector.detect(frame)
# Update state machine
state_data = self.state_machine.update(
detected=detection.detected,
line_error=detection.centroid_x,
intersection=detection.intersection,
intersection_type=detection.intersection_type
)
# Compute control
twist = Twist()
if state_data.state == State.FOLLOWING:
linear, angular = self.controller.compute(
detection.centroid_x,
detection.confidence
)
twist.linear.x = linear
twist.angular.z = angular
elif state_data.state == State.INTERSECTION:
# Slow down at intersection
twist.linear.x = 0.03
twist.angular.z = 0.0
elif state_data.state == State.RECOVERY:
# Spin to find line
twist.linear.x = 0.0
twist.angular.z = state_data.recovery_direction * 0.3
elif state_data.state in [State.STOPPED, State.IDLE]:
twist.linear.x = 0.0
twist.angular.z = 0.0
# Publish command
self.cmd_vel_pub.publish(twist)
# Publish debug image
debug_frame = self.detector.visualize(frame, detection)
self.add_state_overlay(debug_frame, state_data)
debug_msg = self.bridge.cv2_to_imgmsg(debug_frame, 'bgr8')
self.debug_pub.publish(debug_msg)
# Publish status
self.publish_status(state_data, detection)
# Publish PID state
self.publish_pid_state()
def add_state_overlay(self, frame, state_data):
"""Add state information to debug frame"""
state_colors = {
State.IDLE: (128, 128, 128),
State.FOLLOWING: (0, 255, 0),
State.INTERSECTION: (0, 255, 255),
State.LINE_LOST: (0, 165, 255),
State.RECOVERY: (255, 165, 0),
State.STOPPED: (0, 0, 255),
}
color = state_colors.get(state_data.state, (255, 255, 255))
cv2.putText(frame, f"State: {state_data.state.name}",
(10, frame.shape[0] - 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
cv2.putText(frame, f"Duration: {state_data.duration:.1f}s",
(10, frame.shape[0] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
def publish_status(self, state_data, detection):
"""Publish status as JSON"""
status = {
'state': state_data.state.name,
'detected': detection.detected,
'error': detection.centroid_x,
'confidence': detection.confidence,
'intersection': detection.intersection,
'intersection_type': detection.intersection_type,
}
msg = String()
msg.data = json.dumps(status)
self.status_pub.publish(msg)
def publish_pid_state(self):
"""Publish PID controller state"""
ang_state, _ = self.controller.get_states()
msg = Float32MultiArray()
msg.data = [
ang_state.error,
ang_state.p_term,
ang_state.i_term,
ang_state.d_term,
ang_state.output
]
self.pid_pub.publish(msg)
def on_state_change(self, old_state: State, new_state: State):
"""Handle state transitions"""
self.get_logger().info(f'State: {old_state.name} -> {new_state.name}')
def enable_callback(self, request, response):
"""Enable/disable service"""
self.enabled = request.data
if self.enabled:
self.state_machine.start()
self.controller.reset()
response.message = 'Line follower enabled'
else:
self.state_machine.stop()
# Stop robot
twist = Twist()
self.cmd_vel_pub.publish(twist)
response.message = 'Line follower disabled'
response.success = True
return response
def reset_callback(self, request, response):
"""Reset service"""
self.state_machine.reset()
self.controller.reset()
response.success = True
response.message = 'Line follower reset'
return response
def main():
rclpy.init()
node = LineFollowerNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Part 7: Web Dashboard for Tuning
Create web/dashboard.py:
#!/usr/bin/env python3
"""
Web Dashboard for PID Tuning
"""
from flask import Flask, render_template, jsonify, request
from flask_socketio import SocketIO
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Float32MultiArray
from std_srvs.srv import SetBool
from threading import Thread
import json
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
# Global state
state = {
'enabled': False,
'status': {},
'pid': {'error': 0, 'p': 0, 'i': 0, 'd': 0, 'output': 0},
'history': []
}
class DashboardNode(Node):
def __init__(self):
super().__init__('tuning_dashboard')
self.status_sub = self.create_subscription(
String, '/line_follower/status', self.status_callback, 10
)
self.pid_sub = self.create_subscription(
Float32MultiArray, '/line_follower/pid', self.pid_callback, 10
)
self.enable_client = self.create_client(
SetBool, '/line_follower/enable'
)
def status_callback(self, msg):
state['status'] = json.loads(msg.data)
socketio.emit('status', state['status'])
def pid_callback(self, msg):
data = msg.data
state['pid'] = {
'error': data[0],
'p': data[1],
'i': data[2],
'd': data[3],
'output': data[4]
}
# Keep history for plotting
state['history'].append(state['pid'].copy())
if len(state['history']) > 200:
state['history'].pop(0)
socketio.emit('pid', state['pid'])
@app.route('/')
def index():
return '''
<!DOCTYPE html>
<html>
<head>
<title>Line Follower Tuning</title>
<script src="https://cdn.socket.io/4.0.0/socket.io.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body { font-family: Arial; padding: 20px; background: #1a1a2e; color: white; }
.container { max-width: 1200px; margin: auto; }
.card { background: #16213e; padding: 20px; border-radius: 10px; margin: 10px 0; }
.grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }
.status { font-size: 1.2em; }
.status.following { color: #00ff88; }
.status.lost { color: #ff6b6b; }
button { padding: 10px 20px; margin: 5px; border: none; border-radius: 5px; cursor: pointer; }
.btn-start { background: #00ff88; color: black; }
.btn-stop { background: #ff6b6b; color: white; }
input[type="range"] { width: 100%; }
.pid-value { display: flex; justify-content: space-between; }
canvas { width: 100% !important; height: 200px !important; }
</style>
</head>
<body>
<div class="container">
<h1>🤖 Line Follower Dashboard</h1>
<div class="grid">
<div class="card">
<h2>Control</h2>
<button class="btn-start" onclick="enable(true)">Start</button>
<button class="btn-stop" onclick="enable(false)">Stop</button>
<p class="status" id="state">State: IDLE</p>
<p>Error: <span id="error">0.00</span></p>
<p>Confidence: <span id="confidence">0.00</span></p>
</div>
<div class="card">
<h2>PID Gains</h2>
<div class="pid-value">
<label>Kp: <span id="kp-val">0.8</span></label>
</div>
<input type="range" id="kp" min="0" max="2" step="0.1" value="0.8" oninput="updateGains()">
<div class="pid-value">
<label>Ki: <span id="ki-val">0.001</span></label>
</div>
<input type="range" id="ki" min="0" max="0.1" step="0.001" value="0.001" oninput="updateGains()">
<div class="pid-value">
<label>Kd: <span id="kd-val">0.3</span></label>
</div>
<input type="range" id="kd" min="0" max="1" step="0.05" value="0.3" oninput="updateGains()">
</div>
</div>
<div class="card">
<h2>PID Response</h2>
<canvas id="pidChart"></canvas>
</div>
<div class="card">
<h2>Output Components</h2>
<p>P: <span id="p-term">0.00</span></p>
<p>I: <span id="i-term">0.00</span></p>
<p>D: <span id="d-term">0.00</span></p>
<p>Output: <span id="output">0.00</span></p>
</div>
</div>
<script>
const socket = io();
// Chart setup
const ctx = document.getElementById('pidChart').getContext('2d');
const chart = new Chart(ctx, {
type: 'line',
data: {
labels: [],
datasets: [
{ label: 'Error', data: [], borderColor: '#ff6b6b', fill: false },
{ label: 'Output', data: [], borderColor: '#00ff88', fill: false }
]
},
options: {
animation: false,
scales: { y: { min: -1, max: 1 } }
}
});
socket.on('status', (data) => {
document.getElementById('state').textContent = 'State: ' + data.state;
document.getElementById('state').className = 'status ' + (data.detected ? 'following' : 'lost');
document.getElementById('error').textContent = data.error.toFixed(3);
document.getElementById('confidence').textContent = data.confidence.toFixed(2);
});
socket.on('pid', (data) => {
document.getElementById('p-term').textContent = data.p.toFixed(4);
document.getElementById('i-term').textContent = data.i.toFixed(4);
document.getElementById('d-term').textContent = data.d.toFixed(4);
document.getElementById('output').textContent = data.output.toFixed(4);
// Update chart
chart.data.labels.push('');
chart.data.datasets[0].data.push(data.error);
chart.data.datasets[1].data.push(data.output);
if (chart.data.labels.length > 100) {
chart.data.labels.shift();
chart.data.datasets[0].data.shift();
chart.data.datasets[1].data.shift();
}
chart.update('none');
});
function enable(val) {
fetch('/enable', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({enabled: val})
});
}
function updateGains() {
const kp = document.getElementById('kp').value;
const ki = document.getElementById('ki').value;
const kd = document.getElementById('kd').value;
document.getElementById('kp-val').textContent = kp;
document.getElementById('ki-val').textContent = ki;
document.getElementById('kd-val').textContent = kd;
fetch('/gains', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({kp: parseFloat(kp), ki: parseFloat(ki), kd: parseFloat(kd)})
});
}
</script>
</body>
</html>
'''
@app.route('/enable', methods=['POST'])
def enable():
data = request.json
# Call ROS2 service
return jsonify({'success': True})
@app.route('/gains', methods=['POST'])
def set_gains():
data = request.json
# Update gains via ROS2
return jsonify({'success': True})
def ros_spin(node):
rclpy.spin(node)
def main():
rclpy.init()
node = DashboardNode()
ros_thread = Thread(target=ros_spin, args=(node,), daemon=True)
ros_thread.start()
socketio.run(app, host='0.0.0.0', port=5000)
if __name__ == '__main__':
main()
Part 8: Running the Project
Start Line Follower
# Terminal 1: Camera
ros2 run v4l2_camera v4l2_camera_node
# Terminal 2: Line Follower
cd ~/line_follower
python3 src/line_follower_node.py
# Terminal 3: Dashboard
python3 web/dashboard.py
Control via Service
# Enable
ros2 service call /line_follower/enable std_srvs/srv/SetBool "{data: true}"
# Disable
ros2 service call /line_follower/enable std_srvs/srv/SetBool "{data: false}"
# Reset
ros2 service call /line_follower/reset std_srvs/srv/Trigger
View Debug Output
ros2 run rqt_image_view rqt_image_view /line_follower/debug
Deliverables
| Deliverable | Description |
|---|---|
src/line_detector.py | Line detection with intersection detection |
src/pid_controller.py | Adaptive PID controller |
src/state_machine.py | Behavior state machine |
src/line_follower_node.py | Main ROS2 node |
web/dashboard.py | Tuning dashboard |
config/line_config.yaml | Configuration file |
| PID Tuning Report | Document your tuning process |
| Video Demo | Robot following line with intersections |
Extension Ideas
- Color Differentiation: Different colors for different paths
- QR Code Waypoints: Read QR codes at intersections
- Lap Timer: Track lap times on closed course
- Obstacle Avoidance: Integrate ultrasonic sensor
- Path Memory: Remember and replay routes
- Racing Mode: Optimize for speed vs. accuracy
Grading Rubric
| Component | Points | Criteria |
|---|---|---|
| Line Detection | 20 | Robust detection, intersection handling |
| PID Controller | 25 | Smooth control, proper tuning |
| State Machine | 15 | Correct state transitions |
| Dashboard | 15 | Real-time tuning interface |
| Integration | 15 | Components work together |
| Code Quality | 10 | Clean, documented code |
| Total | 100 |