Project 4: Advanced Line Following Robot

Overview

Build a complete autonomous line-following system with advanced PID control, intersection detection, and path decision making. This project combines computer vision, control theory, and state machine design to create a robust navigation system.

Learning Objectives

By completing this project, you will:

  • Implement robust line detection with color segmentation
  • Design and tune multi-stage PID controllers
  • Build state machines for intersection handling
  • Create a web dashboard for monitoring and tuning
  • Understand control system performance metrics

Prerequisites

Lab Topic Skills Used
Project 3 Autonomous Navigator YOLO, tracking, PID basics
ROS2 Lab 6 Line Following Basic line detection
ROS2 Lab 7 Services ROS2 service architecture
Docker Lab 2 ROS2 in Docker Containerized deployment

Hardware Requirements

  • Mini Pupper v2 with camera
  • Line track (tape on floor)
  • Color markers for intersections (optional)

Part 1: System Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                 Advanced Line Following System                      │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Vision Pipeline                          │   │
│  │  ┌──────────┐  ┌──────────────┐  ┌───────────────────┐     │   │
│  │  │  Camera  │─▶│  Color       │─▶│  Line Detection   │     │   │
│  │  │  Input   │  │  Segmentation│  │  (Contours/Edge)  │     │   │
│  │  └──────────┘  └──────────────┘  └─────────┬─────────┘     │   │
│  │                                            │                │   │
│  │  ┌──────────────────┐  ┌───────────────────┴────────┐      │   │
│  │  │  Intersection    │◀─┤  Line Centroid             │      │   │
│  │  │  Detection       │  │  Calculation               │      │   │
│  │  └────────┬─────────┘  └───────────────────┬────────┘      │   │
│  └───────────┼────────────────────────────────┼────────────────┘   │
│              │                                │                     │
│              ▼                                ▼                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Control System                           │   │
│  │  ┌──────────────────┐  ┌────────────────────────────┐      │   │
│  │  │  State Machine   │  │  PID Controllers           │      │   │
│  │  │  - Following     │  │  - Angular (Steering)      │      │   │
│  │  │  - Intersection  │  │  - Linear (Speed)          │      │   │
│  │  │  - Lost Line     │  │  - Adaptive Gains          │      │   │
│  │  │  - Recovery      │  │                            │      │   │
│  │  └──────────┬───────┘  └──────────────┬─────────────┘      │   │
│  └─────────────┼─────────────────────────┼─────────────────────┘   │
│                │                         │                          │
│                ▼                         ▼                          │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Robot Output                             │   │
│  │  ┌──────────────────────────────────────────────────────┐  │   │
│  │  │              /cmd_vel (Twist)                        │  │   │
│  │  │  linear.x = base_speed × speed_factor                │  │   │
│  │  │  angular.z = pid_output                              │  │   │
│  │  └──────────────────────────────────────────────────────┘  │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Part 2: Project Setup

Step 1: Create Project Structure

mkdir -p ~/line_follower/{src,config,launch,web}
cd ~/line_follower

Step 2: Install Dependencies

pip3 install opencv-python numpy flask flask-socketio

Step 3: Create Configuration File

Create config/line_config.yaml:

# Line Detection Configuration
line_detection:
  # Color thresholds (HSV)
  color_lower: [0, 0, 0]      # Black line lower bound
  color_upper: [180, 255, 80] # Black line upper bound
  
  # Alternative for colored tape
  # color_lower: [100, 100, 100]  # Blue line
  # color_upper: [130, 255, 255]
  
  # Region of Interest (% of image height)
  roi_top: 0.5      # Start at 50% height
  roi_bottom: 0.9   # End at 90% height
  
  # Minimum contour area
  min_contour_area: 500
  
  # Line width detection (pixels)
  expected_line_width: 50
  max_line_width: 200

# PID Controller Configuration
pid:
  angular:
    kp: 0.8
    ki: 0.001
    kd: 0.3
    max_output: 1.0
    deadband: 0.02
    
  linear:
    kp: 0.5
    ki: 0.0
    kd: 0.1
    max_output: 0.15
    
# Speed Settings
speed:
  base_linear: 0.08
  max_linear: 0.12
  turn_reduction: 0.7  # Slow down when turning
  
# State Machine
state:
  line_lost_timeout: 1.0      # Seconds before recovery
  intersection_pause: 0.5     # Pause at intersections
  search_rotation_speed: 0.3  # Speed during recovery

Part 3: Line Detection Module

Create src/line_detector.py:

#!/usr/bin/env python3
"""
Advanced Line Detection with Multi-Stage Processing
"""

import cv2
import numpy as np
from dataclasses import dataclass
from typing import Tuple, Optional, List
import yaml


@dataclass
class LineDetection:
    """Detection results"""
    detected: bool
    centroid_x: float          # -1.0 (left) to 1.0 (right), 0 = center
    centroid_y: float          # Normalized y position
    angle: float               # Line angle in degrees
    width: float               # Detected line width
    confidence: float          # Detection confidence 0-1
    intersection: bool         # Intersection detected
    intersection_type: str     # 'T', 'X', 'L', or None
    mask: np.ndarray          # Binary mask for visualization


class LineDetector:
    """Multi-stage line detection pipeline"""
    
    def __init__(self, config_path: str = "config/line_config.yaml"):
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
            
        self.config = config['line_detection']
        
        # Color thresholds
        self.color_lower = np.array(self.config['color_lower'])
        self.color_upper = np.array(self.config['color_upper'])
        
        # ROI settings
        self.roi_top = self.config['roi_top']
        self.roi_bottom = self.config['roi_bottom']
        
        # Line parameters
        self.min_area = self.config['min_contour_area']
        self.expected_width = self.config['expected_line_width']
        self.max_width = self.config['max_line_width']
        
        # History for smoothing
        self.centroid_history = []
        self.history_size = 5
        
    def detect(self, frame: np.ndarray) -> LineDetection:
        """Main detection pipeline"""
        height, width = frame.shape[:2]
        
        # Step 1: Extract ROI
        roi_y_start = int(height * self.roi_top)
        roi_y_end = int(height * self.roi_bottom)
        roi = frame[roi_y_start:roi_y_end, :]
        
        # Step 2: Color segmentation
        hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
        mask = cv2.inRange(hsv, self.color_lower, self.color_upper)
        
        # Step 3: Morphological operations
        kernel = np.ones((5, 5), np.uint8)
        mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
        
        # Step 4: Find contours
        contours, _ = cv2.findContours(
            mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
        )
        
        if not contours:
            return self._no_detection(mask)
            
        # Step 5: Find the main line contour
        line_contour = self._find_line_contour(contours, width)
        
        if line_contour is None:
            return self._no_detection(mask)
            
        # Step 6: Calculate centroid and properties
        centroid_x, centroid_y = self._calculate_centroid(line_contour, width)
        angle = self._calculate_angle(line_contour)
        line_width = self._calculate_width(line_contour)
        confidence = self._calculate_confidence(line_contour, width)
        
        # Step 7: Intersection detection
        intersection, int_type = self._detect_intersection(mask, width)
        
        # Step 8: Smooth centroid
        smoothed_x = self._smooth_centroid(centroid_x)
        
        return LineDetection(
            detected=True,
            centroid_x=smoothed_x,
            centroid_y=centroid_y,
            angle=angle,
            width=line_width,
            confidence=confidence,
            intersection=intersection,
            intersection_type=int_type,
            mask=mask
        )
        
    def _find_line_contour(self, contours: List, 
                           image_width: int) -> Optional[np.ndarray]:
        """Find the most likely line contour"""
        valid_contours = []
        
        for contour in contours:
            area = cv2.contourArea(contour)
            if area < self.min_area:
                continue
                
            # Check aspect ratio
            x, y, w, h = cv2.boundingRect(contour)
            
            # Line should be taller than wide (vertical in ROI)
            # or appropriately wide for horizontal segments
            if w < self.max_width or h > w * 0.3:
                valid_contours.append((contour, area))
                
        if not valid_contours:
            return None
            
        # Return largest valid contour
        valid_contours.sort(key=lambda x: x[1], reverse=True)
        return valid_contours[0][0]
        
    def _calculate_centroid(self, contour: np.ndarray, 
                            image_width: int) -> Tuple[float, float]:
        """Calculate normalized centroid position"""
        M = cv2.moments(contour)
        
        if M['m00'] == 0:
            return 0.0, 0.0
            
        cx = int(M['m10'] / M['m00'])
        cy = int(M['m01'] / M['m00'])
        
        # Normalize to [-1, 1] where 0 is center
        normalized_x = (cx - image_width / 2) / (image_width / 2)
        normalized_y = cy / contour.shape[0] if len(contour) > 0 else 0.5
        
        return normalized_x, normalized_y
        
    def _calculate_angle(self, contour: np.ndarray) -> float:
        """Calculate line angle from contour"""
        if len(contour) < 5:
            return 0.0
            
        # Fit line to contour
        [vx, vy, x, y] = cv2.fitLine(contour, cv2.DIST_L2, 0, 0.01, 0.01)
        angle = np.arctan2(vy, vx) * 180 / np.pi
        
        return float(angle[0])
        
    def _calculate_width(self, contour: np.ndarray) -> float:
        """Calculate line width"""
        x, y, w, h = cv2.boundingRect(contour)
        return float(min(w, h))
        
    def _calculate_confidence(self, contour: np.ndarray, 
                              image_width: int) -> float:
        """Calculate detection confidence"""
        area = cv2.contourArea(contour)
        max_area = image_width * (image_width * 0.3) * 0.5
        
        # Confidence based on area
        area_score = min(area / max_area, 1.0)
        
        # Penalty for extreme positions
        M = cv2.moments(contour)
        if M['m00'] > 0:
            cx = M['m10'] / M['m00']
            position_score = 1.0 - abs(cx - image_width/2) / (image_width/2)
        else:
            position_score = 0.5
            
        return 0.7 * area_score + 0.3 * position_score
        
    def _detect_intersection(self, mask: np.ndarray, 
                             width: int) -> Tuple[bool, str]:
        """Detect intersections in the mask"""
        height = mask.shape[0]
        
        # Divide mask into horizontal slices
        slice_height = height // 4
        slice_widths = []
        
        for i in range(4):
            y_start = i * slice_height
            y_end = (i + 1) * slice_height
            slice_mask = mask[y_start:y_end, :]
            
            # Find width of line in this slice
            cols = np.sum(slice_mask > 0, axis=0)
            if np.max(cols) > 0:
                line_pixels = np.where(cols > slice_height * 0.3)[0]
                if len(line_pixels) > 0:
                    slice_widths.append(line_pixels[-1] - line_pixels[0])
                else:
                    slice_widths.append(0)
            else:
                slice_widths.append(0)
                
        # Detect intersection types
        if len(slice_widths) >= 2:
            top_width = np.mean(slice_widths[:2])
            bottom_width = np.mean(slice_widths[2:])
            
            # Significant widening indicates intersection
            if bottom_width > top_width * 1.8 and bottom_width > self.expected_width * 2:
                # Count branches
                bottom_slice = mask[height*3//4:, :]
                contours, _ = cv2.findContours(
                    bottom_slice, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
                )
                
                if len(contours) >= 3:
                    return True, 'X'
                elif len(contours) >= 2:
                    return True, 'T'
                elif bottom_width > self.expected_width * 3:
                    return True, 'T'
                    
        return False, None
        
    def _smooth_centroid(self, centroid_x: float) -> float:
        """Apply smoothing to centroid"""
        self.centroid_history.append(centroid_x)
        if len(self.centroid_history) > self.history_size:
            self.centroid_history.pop(0)
            
        # Weighted average (more recent = higher weight)
        weights = np.linspace(0.5, 1.0, len(self.centroid_history))
        return np.average(self.centroid_history, weights=weights)
        
    def _no_detection(self, mask: np.ndarray) -> LineDetection:
        """Return no detection result"""
        return LineDetection(
            detected=False,
            centroid_x=0.0,
            centroid_y=0.0,
            angle=0.0,
            width=0.0,
            confidence=0.0,
            intersection=False,
            intersection_type=None,
            mask=mask
        )
        
    def visualize(self, frame: np.ndarray, 
                  detection: LineDetection) -> np.ndarray:
        """Create visualization of detection"""
        vis = frame.copy()
        height, width = vis.shape[:2]
        
        # Draw ROI
        roi_y_start = int(height * self.roi_top)
        roi_y_end = int(height * self.roi_bottom)
        cv2.rectangle(vis, (0, roi_y_start), (width, roi_y_end), (0, 255, 255), 2)
        
        if detection.detected:
            # Draw centroid line
            cx = int((detection.centroid_x + 1) / 2 * width)
            cv2.line(vis, (cx, roi_y_start), (cx, roi_y_end), (0, 255, 0), 3)
            
            # Draw center reference
            cv2.line(vis, (width//2, roi_y_start), (width//2, roi_y_end), 
                    (255, 0, 0), 1)
            
            # Status text
            status = f"Line: {detection.centroid_x:.2f} | Conf: {detection.confidence:.2f}"
            cv2.putText(vis, status, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 
                       0.7, (0, 255, 0), 2)
            
            if detection.intersection:
                cv2.putText(vis, f"INTERSECTION: {detection.intersection_type}", 
                          (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        else:
            cv2.putText(vis, "LINE LOST", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 
                       0.7, (0, 0, 255), 2)
            
        return vis

Part 4: PID Controller Module

Create src/pid_controller.py:

#!/usr/bin/env python3
"""
Advanced PID Controller with Anti-Windup and Adaptive Gains
"""

import time
from dataclasses import dataclass
from typing import Optional, Tuple
import yaml
import numpy as np


@dataclass
class PIDState:
    """PID controller state for monitoring"""
    error: float
    p_term: float
    i_term: float
    d_term: float
    output: float
    setpoint: float
    measurement: float


class PIDController:
    """PID controller with advanced features"""
    
    def __init__(self, kp: float, ki: float, kd: float,
                 max_output: float = 1.0,
                 min_output: float = None,
                 deadband: float = 0.0,
                 anti_windup: float = None):
        """
        Initialize PID controller
        
        Args:
            kp: Proportional gain
            ki: Integral gain
            kd: Derivative gain
            max_output: Maximum output value
            min_output: Minimum output (default: -max_output)
            deadband: Error deadband (no output within deadband)
            anti_windup: Integral anti-windup limit
        """
        self.kp = kp
        self.ki = ki
        self.kd = kd
        
        self.max_output = max_output
        self.min_output = min_output if min_output is not None else -max_output
        self.deadband = deadband
        self.anti_windup = anti_windup if anti_windup else max_output
        
        # State variables
        self.integral = 0.0
        self.last_error = 0.0
        self.last_time = None
        self.last_measurement = None
        
        # State for monitoring
        self.state = PIDState(0, 0, 0, 0, 0, 0, 0)
        
    def update(self, setpoint: float, measurement: float) -> float:
        """
        Calculate PID output
        
        Args:
            setpoint: Desired value
            measurement: Current value
            
        Returns:
            Control output
        """
        current_time = time.time()
        
        # Calculate dt
        if self.last_time is None:
            dt = 0.02  # Default 50Hz
        else:
            dt = current_time - self.last_time
            
        # Prevent division by zero
        dt = max(dt, 0.001)
        
        # Calculate error
        error = setpoint - measurement
        
        # Apply deadband
        if abs(error) < self.deadband:
            error = 0.0
            
        # Proportional term
        p_term = self.kp * error
        
        # Integral term with anti-windup
        self.integral += error * dt
        self.integral = np.clip(self.integral, -self.anti_windup, self.anti_windup)
        i_term = self.ki * self.integral
        
        # Derivative term (on measurement to avoid derivative kick)
        if self.last_measurement is not None:
            d_measurement = (measurement - self.last_measurement) / dt
            d_term = -self.kd * d_measurement  # Negative because using measurement
        else:
            d_term = 0.0
            
        # Calculate output
        output = p_term + i_term + d_term
        
        # Apply output limits
        output = np.clip(output, self.min_output, self.max_output)
        
        # Update state
        self.last_error = error
        self.last_time = current_time
        self.last_measurement = measurement
        
        # Save state for monitoring
        self.state = PIDState(
            error=error,
            p_term=p_term,
            i_term=i_term,
            d_term=d_term,
            output=output,
            setpoint=setpoint,
            measurement=measurement
        )
        
        return output
        
    def reset(self):
        """Reset controller state"""
        self.integral = 0.0
        self.last_error = 0.0
        self.last_time = None
        self.last_measurement = None
        
    def set_gains(self, kp: float = None, ki: float = None, kd: float = None):
        """Update PID gains"""
        if kp is not None:
            self.kp = kp
        if ki is not None:
            self.ki = ki
        if kd is not None:
            self.kd = kd
            
    def get_state(self) -> PIDState:
        """Get current state for monitoring"""
        return self.state


class AdaptivePIDController(PIDController):
    """PID controller with gain scheduling based on error"""
    
    def __init__(self, *args, 
                 gain_schedule: dict = None, **kwargs):
        """
        Args:
            gain_schedule: Dict mapping error thresholds to gain multipliers
                Example: {0.1: 0.5, 0.3: 1.0, 0.6: 1.5}
        """
        super().__init__(*args, **kwargs)
        
        # Default gain schedule
        self.gain_schedule = gain_schedule or {
            0.1: 0.5,   # Small error: reduce gains
            0.3: 1.0,   # Medium error: normal gains
            0.6: 1.5,   # Large error: increase gains
        }
        
        # Base gains
        self.base_kp = self.kp
        self.base_ki = self.ki
        self.base_kd = self.kd
        
    def update(self, setpoint: float, measurement: float) -> float:
        """Update with adaptive gains"""
        error = abs(setpoint - measurement)
        
        # Find appropriate gain multiplier
        multiplier = 1.0
        for threshold, mult in sorted(self.gain_schedule.items()):
            if error >= threshold:
                multiplier = mult
                
        # Apply gain scheduling
        self.kp = self.base_kp * multiplier
        self.ki = self.base_ki * multiplier
        self.kd = self.base_kd * multiplier
        
        return super().update(setpoint, measurement)


class LineFollowingController:
    """Dual PID controller for line following"""
    
    def __init__(self, config_path: str = "config/line_config.yaml"):
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
            
        pid_config = config['pid']
        speed_config = config['speed']
        
        # Angular (steering) controller - tracks line center
        ang = pid_config['angular']
        self.angular_pid = AdaptivePIDController(
            kp=ang['kp'],
            ki=ang['ki'],
            kd=ang['kd'],
            max_output=ang['max_output'],
            deadband=ang.get('deadband', 0.0)
        )
        
        # Linear (speed) controller - adjusts speed based on error
        lin = pid_config['linear']
        self.linear_pid = PIDController(
            kp=lin['kp'],
            ki=lin['ki'],
            kd=lin['kd'],
            max_output=lin['max_output']
        )
        
        # Speed settings
        self.base_speed = speed_config['base_linear']
        self.max_speed = speed_config['max_linear']
        self.turn_reduction = speed_config['turn_reduction']
        
    def compute(self, line_error: float, 
                confidence: float = 1.0) -> Tuple[float, float]:
        """
        Compute control outputs
        
        Args:
            line_error: Normalized error (-1 to 1)
            confidence: Detection confidence (0 to 1)
            
        Returns:
            Tuple of (linear_speed, angular_speed)
        """
        # Angular control (steering)
        angular = self.angular_pid.update(0.0, line_error)
        
        # Speed reduction based on steering and confidence
        turn_factor = 1.0 - abs(angular) * self.turn_reduction
        confidence_factor = 0.5 + 0.5 * confidence
        
        # Linear speed
        linear = self.base_speed * turn_factor * confidence_factor
        linear = max(0.02, min(linear, self.max_speed))
        
        return linear, angular
        
    def reset(self):
        """Reset both controllers"""
        self.angular_pid.reset()
        self.linear_pid.reset()
        
    def get_states(self) -> Tuple[PIDState, PIDState]:
        """Get states of both controllers"""
        return self.angular_pid.get_state(), self.linear_pid.get_state()
        
    def set_angular_gains(self, kp: float = None, ki: float = None, kd: float = None):
        """Update angular PID gains"""
        self.angular_pid.set_gains(kp, ki, kd)
        if kp is not None:
            self.angular_pid.base_kp = kp
        if ki is not None:
            self.angular_pid.base_ki = ki
        if kd is not None:
            self.angular_pid.base_kd = kd

Part 5: State Machine

Create src/state_machine.py:

#!/usr/bin/env python3
"""
State Machine for Line Following
"""

from enum import Enum, auto
from dataclasses import dataclass
from typing import Optional, Callable
import time


class State(Enum):
    """Robot states"""
    IDLE = auto()
    FOLLOWING = auto()
    INTERSECTION = auto()
    LINE_LOST = auto()
    RECOVERY = auto()
    STOPPED = auto()


@dataclass
class StateData:
    """Data associated with current state"""
    state: State
    duration: float           # Time in current state
    line_error: float         # Last known line error
    last_detection: float     # Time of last detection
    intersection_type: str    # Type of intersection if any
    recovery_direction: int   # -1 = left, 1 = right
    

class LineFollowingStateMachine:
    """State machine for line following behavior"""
    
    def __init__(self, 
                 lost_timeout: float = 1.0,
                 intersection_pause: float = 0.5,
                 recovery_speed: float = 0.3):
        
        self.lost_timeout = lost_timeout
        self.intersection_pause = intersection_pause
        self.recovery_speed = recovery_speed
        
        # State tracking
        self.current_state = State.IDLE
        self.state_start_time = time.time()
        self.last_detection_time = time.time()
        self.last_line_error = 0.0
        self.intersection_type = None
        
        # Callbacks
        self.on_state_change: Optional[Callable[[State, State], None]] = None
        
    def update(self, detected: bool, line_error: float = 0.0,
               intersection: bool = False, 
               intersection_type: str = None) -> StateData:
        """
        Update state machine
        
        Args:
            detected: Whether line is detected
            line_error: Current line error
            intersection: Whether intersection detected
            intersection_type: Type of intersection
            
        Returns:
            Current state data
        """
        current_time = time.time()
        
        if detected:
            self.last_detection_time = current_time
            self.last_line_error = line_error
            
        state_duration = current_time - self.state_start_time
        time_since_detection = current_time - self.last_detection_time
        
        # State transitions
        old_state = self.current_state
        
        if self.current_state == State.IDLE:
            if detected:
                self._transition_to(State.FOLLOWING)
                
        elif self.current_state == State.FOLLOWING:
            if intersection:
                self.intersection_type = intersection_type
                self._transition_to(State.INTERSECTION)
            elif not detected:
                if time_since_detection > self.lost_timeout:
                    self._transition_to(State.LINE_LOST)
                    
        elif self.current_state == State.INTERSECTION:
            if state_duration > self.intersection_pause:
                if detected:
                    self._transition_to(State.FOLLOWING)
                else:
                    self._transition_to(State.LINE_LOST)
                    
        elif self.current_state == State.LINE_LOST:
            self._transition_to(State.RECOVERY)
            
        elif self.current_state == State.RECOVERY:
            if detected:
                self._transition_to(State.FOLLOWING)
            elif state_duration > 5.0:  # Recovery timeout
                self._transition_to(State.STOPPED)
                
        elif self.current_state == State.STOPPED:
            if detected:
                self._transition_to(State.FOLLOWING)
                
        # Notify on state change
        if old_state != self.current_state and self.on_state_change:
            self.on_state_change(old_state, self.current_state)
            
        # Determine recovery direction
        recovery_direction = 1 if self.last_line_error > 0 else -1
        
        return StateData(
            state=self.current_state,
            duration=current_time - self.state_start_time,
            line_error=self.last_line_error,
            last_detection=self.last_detection_time,
            intersection_type=self.intersection_type,
            recovery_direction=recovery_direction
        )
        
    def _transition_to(self, new_state: State):
        """Transition to new state"""
        self.current_state = new_state
        self.state_start_time = time.time()
        
    def start(self):
        """Start following"""
        self._transition_to(State.IDLE)
        
    def stop(self):
        """Stop following"""
        self._transition_to(State.STOPPED)
        
    def reset(self):
        """Reset state machine"""
        self.current_state = State.IDLE
        self.state_start_time = time.time()
        self.last_detection_time = time.time()
        self.last_line_error = 0.0

Part 6: Main Line Follower Node

Create src/line_follower_node.py:

#!/usr/bin/env python3
"""
ROS2 Line Following Node
"""

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import Twist
from std_msgs.msg import String, Float32MultiArray
from std_srvs.srv import SetBool, Trigger
from cv_bridge import CvBridge
import cv2
import json

from line_detector import LineDetector
from pid_controller import LineFollowingController
from state_machine import LineFollowingStateMachine, State


class LineFollowerNode(Node):
    """Main line following node"""
    
    def __init__(self):
        super().__init__('line_follower')
        
        # Parameters
        self.declare_parameter('config_path', 'config/line_config.yaml')
        config_path = self.get_parameter('config_path').value
        
        # Components
        self.detector = LineDetector(config_path)
        self.controller = LineFollowingController(config_path)
        self.state_machine = LineFollowingStateMachine()
        
        self.bridge = CvBridge()
        self.enabled = False
        
        # Publishers
        self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
        self.debug_pub = self.create_publisher(Image, '/line_follower/debug', 10)
        self.status_pub = self.create_publisher(String, '/line_follower/status', 10)
        self.pid_pub = self.create_publisher(Float32MultiArray, '/line_follower/pid', 10)
        
        # Subscribers
        self.image_sub = self.create_subscription(
            Image, '/image_raw', self.image_callback, 10
        )
        
        # Services
        self.enable_srv = self.create_service(
            SetBool, '/line_follower/enable', self.enable_callback
        )
        self.reset_srv = self.create_service(
            Trigger, '/line_follower/reset', self.reset_callback
        )
        
        # State change callback
        self.state_machine.on_state_change = self.on_state_change
        
        self.get_logger().info('Line Follower Node initialized')
        
    def image_callback(self, msg: Image):
        """Process camera images"""
        if not self.enabled:
            return
            
        # Convert image
        frame = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
        
        # Detect line
        detection = self.detector.detect(frame)
        
        # Update state machine
        state_data = self.state_machine.update(
            detected=detection.detected,
            line_error=detection.centroid_x,
            intersection=detection.intersection,
            intersection_type=detection.intersection_type
        )
        
        # Compute control
        twist = Twist()
        
        if state_data.state == State.FOLLOWING:
            linear, angular = self.controller.compute(
                detection.centroid_x,
                detection.confidence
            )
            twist.linear.x = linear
            twist.angular.z = angular
            
        elif state_data.state == State.INTERSECTION:
            # Slow down at intersection
            twist.linear.x = 0.03
            twist.angular.z = 0.0
            
        elif state_data.state == State.RECOVERY:
            # Spin to find line
            twist.linear.x = 0.0
            twist.angular.z = state_data.recovery_direction * 0.3
            
        elif state_data.state in [State.STOPPED, State.IDLE]:
            twist.linear.x = 0.0
            twist.angular.z = 0.0
            
        # Publish command
        self.cmd_vel_pub.publish(twist)
        
        # Publish debug image
        debug_frame = self.detector.visualize(frame, detection)
        self.add_state_overlay(debug_frame, state_data)
        debug_msg = self.bridge.cv2_to_imgmsg(debug_frame, 'bgr8')
        self.debug_pub.publish(debug_msg)
        
        # Publish status
        self.publish_status(state_data, detection)
        
        # Publish PID state
        self.publish_pid_state()
        
    def add_state_overlay(self, frame, state_data):
        """Add state information to debug frame"""
        state_colors = {
            State.IDLE: (128, 128, 128),
            State.FOLLOWING: (0, 255, 0),
            State.INTERSECTION: (0, 255, 255),
            State.LINE_LOST: (0, 165, 255),
            State.RECOVERY: (255, 165, 0),
            State.STOPPED: (0, 0, 255),
        }
        
        color = state_colors.get(state_data.state, (255, 255, 255))
        
        cv2.putText(frame, f"State: {state_data.state.name}", 
                   (10, frame.shape[0] - 40),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
        cv2.putText(frame, f"Duration: {state_data.duration:.1f}s",
                   (10, frame.shape[0] - 10),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
                   
    def publish_status(self, state_data, detection):
        """Publish status as JSON"""
        status = {
            'state': state_data.state.name,
            'detected': detection.detected,
            'error': detection.centroid_x,
            'confidence': detection.confidence,
            'intersection': detection.intersection,
            'intersection_type': detection.intersection_type,
        }
        
        msg = String()
        msg.data = json.dumps(status)
        self.status_pub.publish(msg)
        
    def publish_pid_state(self):
        """Publish PID controller state"""
        ang_state, _ = self.controller.get_states()
        
        msg = Float32MultiArray()
        msg.data = [
            ang_state.error,
            ang_state.p_term,
            ang_state.i_term,
            ang_state.d_term,
            ang_state.output
        ]
        self.pid_pub.publish(msg)
        
    def on_state_change(self, old_state: State, new_state: State):
        """Handle state transitions"""
        self.get_logger().info(f'State: {old_state.name} -> {new_state.name}')
        
    def enable_callback(self, request, response):
        """Enable/disable service"""
        self.enabled = request.data
        
        if self.enabled:
            self.state_machine.start()
            self.controller.reset()
            response.message = 'Line follower enabled'
        else:
            self.state_machine.stop()
            # Stop robot
            twist = Twist()
            self.cmd_vel_pub.publish(twist)
            response.message = 'Line follower disabled'
            
        response.success = True
        return response
        
    def reset_callback(self, request, response):
        """Reset service"""
        self.state_machine.reset()
        self.controller.reset()
        
        response.success = True
        response.message = 'Line follower reset'
        return response


def main():
    rclpy.init()
    node = LineFollowerNode()
    
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

Part 7: Web Dashboard for Tuning

Create web/dashboard.py:

#!/usr/bin/env python3
"""
Web Dashboard for PID Tuning
"""

from flask import Flask, render_template, jsonify, request
from flask_socketio import SocketIO
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Float32MultiArray
from std_srvs.srv import SetBool
from threading import Thread
import json

app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")

# Global state
state = {
    'enabled': False,
    'status': {},
    'pid': {'error': 0, 'p': 0, 'i': 0, 'd': 0, 'output': 0},
    'history': []
}

class DashboardNode(Node):
    def __init__(self):
        super().__init__('tuning_dashboard')
        
        self.status_sub = self.create_subscription(
            String, '/line_follower/status', self.status_callback, 10
        )
        self.pid_sub = self.create_subscription(
            Float32MultiArray, '/line_follower/pid', self.pid_callback, 10
        )
        self.enable_client = self.create_client(
            SetBool, '/line_follower/enable'
        )
        
    def status_callback(self, msg):
        state['status'] = json.loads(msg.data)
        socketio.emit('status', state['status'])
        
    def pid_callback(self, msg):
        data = msg.data
        state['pid'] = {
            'error': data[0],
            'p': data[1],
            'i': data[2],
            'd': data[3],
            'output': data[4]
        }
        
        # Keep history for plotting
        state['history'].append(state['pid'].copy())
        if len(state['history']) > 200:
            state['history'].pop(0)
            
        socketio.emit('pid', state['pid'])


@app.route('/')
def index():
    return '''
<!DOCTYPE html>
<html>
<head>
    <title>Line Follower Tuning</title>
    <script src="https://cdn.socket.io/4.0.0/socket.io.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <style>
        body { font-family: Arial; padding: 20px; background: #1a1a2e; color: white; }
        .container { max-width: 1200px; margin: auto; }
        .card { background: #16213e; padding: 20px; border-radius: 10px; margin: 10px 0; }
        .grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }
        .status { font-size: 1.2em; }
        .status.following { color: #00ff88; }
        .status.lost { color: #ff6b6b; }
        button { padding: 10px 20px; margin: 5px; border: none; border-radius: 5px; cursor: pointer; }
        .btn-start { background: #00ff88; color: black; }
        .btn-stop { background: #ff6b6b; color: white; }
        input[type="range"] { width: 100%; }
        .pid-value { display: flex; justify-content: space-between; }
        canvas { width: 100% !important; height: 200px !important; }
    </style>
</head>
<body>
    <div class="container">
        <h1>🤖 Line Follower Dashboard</h1>
        
        <div class="grid">
            <div class="card">
                <h2>Control</h2>
                <button class="btn-start" onclick="enable(true)">Start</button>
                <button class="btn-stop" onclick="enable(false)">Stop</button>
                <p class="status" id="state">State: IDLE</p>
                <p>Error: <span id="error">0.00</span></p>
                <p>Confidence: <span id="confidence">0.00</span></p>
            </div>
            
            <div class="card">
                <h2>PID Gains</h2>
                <div class="pid-value">
                    <label>Kp: <span id="kp-val">0.8</span></label>
                </div>
                <input type="range" id="kp" min="0" max="2" step="0.1" value="0.8" oninput="updateGains()">
                
                <div class="pid-value">
                    <label>Ki: <span id="ki-val">0.001</span></label>
                </div>
                <input type="range" id="ki" min="0" max="0.1" step="0.001" value="0.001" oninput="updateGains()">
                
                <div class="pid-value">
                    <label>Kd: <span id="kd-val">0.3</span></label>
                </div>
                <input type="range" id="kd" min="0" max="1" step="0.05" value="0.3" oninput="updateGains()">
            </div>
        </div>
        
        <div class="card">
            <h2>PID Response</h2>
            <canvas id="pidChart"></canvas>
        </div>
        
        <div class="card">
            <h2>Output Components</h2>
            <p>P: <span id="p-term">0.00</span></p>
            <p>I: <span id="i-term">0.00</span></p>
            <p>D: <span id="d-term">0.00</span></p>
            <p>Output: <span id="output">0.00</span></p>
        </div>
    </div>
    
    <script>
        const socket = io();
        
        // Chart setup
        const ctx = document.getElementById('pidChart').getContext('2d');
        const chart = new Chart(ctx, {
            type: 'line',
            data: {
                labels: [],
                datasets: [
                    { label: 'Error', data: [], borderColor: '#ff6b6b', fill: false },
                    { label: 'Output', data: [], borderColor: '#00ff88', fill: false }
                ]
            },
            options: {
                animation: false,
                scales: { y: { min: -1, max: 1 } }
            }
        });
        
        socket.on('status', (data) => {
            document.getElementById('state').textContent = 'State: ' + data.state;
            document.getElementById('state').className = 'status ' + (data.detected ? 'following' : 'lost');
            document.getElementById('error').textContent = data.error.toFixed(3);
            document.getElementById('confidence').textContent = data.confidence.toFixed(2);
        });
        
        socket.on('pid', (data) => {
            document.getElementById('p-term').textContent = data.p.toFixed(4);
            document.getElementById('i-term').textContent = data.i.toFixed(4);
            document.getElementById('d-term').textContent = data.d.toFixed(4);
            document.getElementById('output').textContent = data.output.toFixed(4);
            
            // Update chart
            chart.data.labels.push('');
            chart.data.datasets[0].data.push(data.error);
            chart.data.datasets[1].data.push(data.output);
            
            if (chart.data.labels.length > 100) {
                chart.data.labels.shift();
                chart.data.datasets[0].data.shift();
                chart.data.datasets[1].data.shift();
            }
            chart.update('none');
        });
        
        function enable(val) {
            fetch('/enable', {
                method: 'POST',
                headers: {'Content-Type': 'application/json'},
                body: JSON.stringify({enabled: val})
            });
        }
        
        function updateGains() {
            const kp = document.getElementById('kp').value;
            const ki = document.getElementById('ki').value;
            const kd = document.getElementById('kd').value;
            
            document.getElementById('kp-val').textContent = kp;
            document.getElementById('ki-val').textContent = ki;
            document.getElementById('kd-val').textContent = kd;
            
            fetch('/gains', {
                method: 'POST',
                headers: {'Content-Type': 'application/json'},
                body: JSON.stringify({kp: parseFloat(kp), ki: parseFloat(ki), kd: parseFloat(kd)})
            });
        }
    </script>
</body>
</html>
'''

@app.route('/enable', methods=['POST'])
def enable():
    data = request.json
    # Call ROS2 service
    return jsonify({'success': True})

@app.route('/gains', methods=['POST'])
def set_gains():
    data = request.json
    # Update gains via ROS2
    return jsonify({'success': True})

def ros_spin(node):
    rclpy.spin(node)

def main():
    rclpy.init()
    node = DashboardNode()
    
    ros_thread = Thread(target=ros_spin, args=(node,), daemon=True)
    ros_thread.start()
    
    socketio.run(app, host='0.0.0.0', port=5000)

if __name__ == '__main__':
    main()

Part 8: Running the Project

Start Line Follower

# Terminal 1: Camera
ros2 run v4l2_camera v4l2_camera_node

# Terminal 2: Line Follower
cd ~/line_follower
python3 src/line_follower_node.py

# Terminal 3: Dashboard
python3 web/dashboard.py

Control via Service

# Enable
ros2 service call /line_follower/enable std_srvs/srv/SetBool "{data: true}"

# Disable
ros2 service call /line_follower/enable std_srvs/srv/SetBool "{data: false}"

# Reset
ros2 service call /line_follower/reset std_srvs/srv/Trigger

View Debug Output

ros2 run rqt_image_view rqt_image_view /line_follower/debug

Deliverables

Deliverable Description
src/line_detector.py Line detection with intersection detection
src/pid_controller.py Adaptive PID controller
src/state_machine.py Behavior state machine
src/line_follower_node.py Main ROS2 node
web/dashboard.py Tuning dashboard
config/line_config.yaml Configuration file
PID Tuning Report Document your tuning process
Video Demo Robot following line with intersections

Extension Ideas

  1. Color Differentiation: Different colors for different paths
  2. QR Code Waypoints: Read QR codes at intersections
  3. Lap Timer: Track lap times on closed course
  4. Obstacle Avoidance: Integrate ultrasonic sensor
  5. Path Memory: Remember and replay routes
  6. Racing Mode: Optimize for speed vs. accuracy

Grading Rubric

Component Points Criteria
Line Detection 20 Robust detection, intersection handling
PID Controller 25 Smooth control, proper tuning
State Machine 15 Correct state transitions
Dashboard 15 Real-time tuning interface
Integration 15 Components work together
Code Quality 10 Clean, documented code
Total 100