Chuyển tới nội dung chính

Computer Vision Analysis Agent

Overview

The Computer Vision Analysis Agent provides comprehensive video analytics capabilities, including vehicle counting, classification, speed estimation, lane monitoring, and behavior analysis using state-of-the-art deep learning models.

Features

  • Vehicle Detection & Counting: Real-time vehicle detection and counting across multiple lanes
  • Vehicle Classification: 8-class classification (car, truck, bus, motorcycle, bicycle, etc.)
  • Speed Estimation: Calculate vehicle speeds using optical flow and calibration
  • Lane Occupancy: Monitor lane-level traffic distribution
  • Traffic Flow Metrics: Calculate flow rate, density, and occupancy
  • Behavior Analysis: Detect lane violations, wrong-way driving, illegal parking

Architecture

graph TB
A[Video Stream] --> B[Frame Extractor]
B --> C[YOLOX Detector]
C --> D[Vehicle Tracker]
D --> E[Classification]
D --> F[Speed Estimator]
D --> G[Lane Analyzer]
E --> H[Counter]
F --> I[Flow Calculator]
G --> J[Occupancy Monitor]
H --> K[Metrics Aggregator]
I --> K
J --> K
K --> L[MongoDB Storage]
K --> M[Real-time Dashboard]

Configuration

File: config/cv_config.yaml

cv_analysis:
detection:
model: "yolox_x"
weights: "yolox_x.pth"
confidence_threshold: 0.5
iou_threshold: 0.45
device: "cuda"

vehicle_classes:
- id: 0
name: "car"
size_category: "small"

- id: 1
name: "truck"
size_category: "large"

- id: 2
name: "bus"
size_category: "large"

- id: 3
name: "motorcycle"
size_category: "small"

- id: 4
name: "bicycle"
size_category: "small"

tracking:
method: "strongsort"
max_age: 30 # frames
min_hits: 3
iou_threshold: 0.3

speed_estimation:
enabled: true
calibration_required: true
units: "km/h"
smoothing_window: 5 # frames

lane_detection:
enabled: true
num_lanes: 4
lane_width: 3.5 # meters

metrics:
calculation_interval: 60 # seconds
aggregation_window: 300 # 5 minutes

performance:
target_fps: 15
skip_frames: 2
batch_size: 1

Usage

Basic Vehicle Detection

from src.agents.analytics.cv_analysis_agent import CVAnalysisAgent

# Initialize agent
agent = CVAnalysisAgent()

# Analyze single frame
result = agent.analyze_frame(
camera_id="CAM_001",
frame=cv2.imread("traffic_frame.jpg")
)

print(f"Vehicles detected: {result.vehicle_count}")
print(f"Cars: {result.class_counts['car']}")
print(f"Trucks: {result.class_counts['truck']}")
print(f"Motorcycles: {result.class_counts['motorcycle']}")

Real-time Video Stream Analysis

# Process video stream
def analyze_stream(camera_id):
agent = CVAnalysisAgent()

for frame in video_stream(camera_id):
analysis = agent.analyze_frame(
camera_id=camera_id,
frame=frame
)

# Display results
print(f"Frame {analysis.frame_number}:")
print(f" Vehicles: {analysis.vehicle_count}")
print(f" Avg Speed: {analysis.avg_speed} km/h")
print(f" Flow Rate: {analysis.flow_rate} veh/h")

Speed Estimation

# Calibrate camera for speed estimation
agent.calibrate_camera(
camera_id="CAM_001",
reference_distance=50, # meters
pixel_distance=500 # pixels
)

# Get vehicle speeds
speeds = agent.get_vehicle_speeds(
camera_id="CAM_001",
time_window=60 # seconds
)

for vehicle in speeds:
print(f"Vehicle {vehicle.id}: {vehicle.speed} km/h")
if vehicle.speed > 60: # Speed limit
print(f" ⚠️ Speeding detected!")

Lane Occupancy Analysis

# Analyze lane-level traffic
lane_stats = agent.analyze_lane_occupancy(
camera_id="CAM_001"
)

for lane in lane_stats:
print(f"Lane {lane.number}:")
print(f" Occupancy: {lane.occupancy_pct}%")
print(f" Vehicle Count: {lane.vehicle_count}")
print(f" Average Speed: {lane.avg_speed} km/h")
print(f" Flow Rate: {lane.flow_rate} veh/h")

Traffic Flow Metrics

# Calculate comprehensive flow metrics
metrics = agent.calculate_flow_metrics(
camera_id="CAM_001",
time_window=300 # 5 minutes
)

print(f"Flow Rate: {metrics.flow_rate} vehicles/hour")
print(f"Density: {metrics.density} vehicles/km")
print(f"Average Speed: {metrics.avg_speed} km/h")
print(f"Level of Service: {metrics.los}") # A, B, C, D, E, F

API Reference

Class: CVAnalysisAgent

Methods

analyze_frame(camera_id: str, frame: np.ndarray) -> AnalysisResult

Analyze a single video frame.

Parameters:

  • camera_id (str): Camera identifier
  • frame (np.ndarray): Video frame (BGR format)

Returns:

  • AnalysisResult: Comprehensive analysis results

Example:

result = agent.analyze_frame("CAM_001", frame)
print(f"Detected: {result.vehicle_count} vehicles")
get_vehicle_speeds(camera_id: str, time_window: int) -> List[VehicleSpeed]

Get vehicle speeds over time window.

Parameters:

  • camera_id (str): Camera identifier
  • time_window (int): Time window in seconds

Returns:

  • List[VehicleSpeed]: Vehicle speed data
analyze_lane_occupancy(camera_id: str) -> List[LaneStats]

Analyze lane-level traffic statistics.

Parameters:

  • camera_id (str): Camera identifier

Returns:

  • List[LaneStats]: Lane statistics
calculate_flow_metrics(camera_id: str, time_window: int) -> FlowMetrics

Calculate traffic flow metrics.

Parameters:

  • camera_id (str): Camera identifier
  • time_window (int): Calculation window (seconds)

Returns:

  • FlowMetrics: Flow metrics

Example:

metrics = agent.calculate_flow_metrics("CAM_001", 300)
print(f"Flow: {metrics.flow_rate} veh/h")
calibrate_camera(camera_id: str, reference_distance: float, pixel_distance: float)

Calibrate camera for accurate measurements.

Parameters:

  • camera_id (str): Camera identifier
  • reference_distance (float): Real-world distance (meters)
  • pixel_distance (float): Corresponding pixel distance

Example:

agent.calibrate_camera("CAM_001", 50, 500)

Data Models

AnalysisResult

@dataclass
class AnalysisResult:
timestamp: datetime
camera_id: str
frame_number: int
vehicle_count: int
class_counts: dict # {class_name: count}
detections: List[Detection]
avg_speed: float # km/h
flow_rate: int # vehicles/hour
lane_occupancy: List[float] # per-lane occupancy %
metadata: dict

Detection

@dataclass
class Detection:
id: int
class_name: str
confidence: float
bbox: dict # {x, y, width, height}
speed: float # km/h (if estimated)
lane: int
trajectory: List[dict] # Historical positions
first_seen: datetime
last_seen: datetime

VehicleSpeed

@dataclass
class VehicleSpeed:
vehicle_id: int
class_name: str
speed: float # km/h
timestamp: datetime
lane: int
position: dict
is_speeding: bool

LaneStats

@dataclass
class LaneStats:
lane_number: int
occupancy_pct: float # 0-100
vehicle_count: int
avg_speed: float # km/h
flow_rate: int # vehicles/hour
density: float # vehicles/km
dominant_class: str

FlowMetrics

@dataclass
class FlowMetrics:
flow_rate: int # vehicles/hour
density: float # vehicles/km
avg_speed: float # km/h
avg_headway: float # seconds
avg_spacing: float # meters
level_of_service: str # A, B, C, D, E, F
capacity_utilization: float # 0-1

Vehicle Classification

Supported Classes

Class IDNameSize CategoryTypical Count
0CarSmall~60%
1TruckLarge~10%
2BusLarge~5%
3MotorcycleSmall~20%
4BicycleSmall~3%
5PedestrianN/A~2%
6VanMedium~5%
7EmergencySpecial<1%

Speed Estimation Methods

1. Optical Flow

# Enable optical flow-based speed estimation
agent.configure_speed_estimation(
method="optical_flow",
calibration_required=False
)

2. Calibrated Tracking

# More accurate with calibration
agent.calibrate_camera("CAM_001", reference_distance=50, pixel_distance=500)
agent.configure_speed_estimation(
method="calibrated_tracking",
smoothing_window=5
)

Integration Examples

Integration with Congestion Detection

from src.agents.analytics.congestion_detection_agent import CongestionDetectionAgent

cv_agent = CVAnalysisAgent()
congestion_agent = CongestionDetectionAgent()

# Use CV metrics for congestion detection
def detect_congestion(camera_id):
# Get flow metrics from CV
metrics = cv_agent.calculate_flow_metrics(camera_id, 300)

# Update congestion detection
congestion_agent.update_metrics(
zone_id=camera_to_zone[camera_id],
speed=metrics.avg_speed,
density=metrics.density,
flow=metrics.flow_rate
)

Integration with Pattern Recognition

from src.agents.analytics.pattern_recognition_agent import PatternRecognitionAgent

pattern_agent = PatternRecognitionAgent()

# Analyze vehicle composition patterns
composition_patterns = pattern_agent.detect_patterns(
metric="vehicle_composition",
data_source=cv_agent.get_historical_composition("30d")
)

Integration with Accident Detection

from src.agents.analytics.accident_detection_agent import AccidentDetectionAgent

accident_agent = AccidentDetectionAgent()

# Use CV tracking for accident context
def analyze_accident_context(camera_id):
# Get vehicles near accident
analysis = cv_agent.analyze_frame(camera_id, frame)

# Estimate severity based on vehicle count
involved_vehicles = len([
v for v in analysis.detections
if v.speed < 5 # Stopped vehicles
])

return {
"involved_vehicles": involved_vehicles,
"traffic_density": analysis.vehicle_count,
"lane_blockage": cv_agent.calculate_blockage(analysis)
}

Monitoring & Metrics

Health Check

health = agent.health_check()
print(f"Status: {health.status}")
print(f"Model Loaded: {health.model_loaded}")
print(f"Active Cameras: {health.active_cameras}")
print(f"Processing FPS: {health.avg_fps}")
print(f"GPU Utilization: {health.gpu_usage}%")

Performance Metrics

metrics = agent.get_performance_metrics()

print(f"Detection Latency: {metrics.avg_latency}ms")
print(f"Tracking Accuracy: {metrics.tracking_accuracy}%")
print(f"Speed Estimation Error: {metrics.speed_error_pct}%")

Performance Optimization

GPU Acceleration

# Configure GPU settings
agent.configure_device(
device="cuda",
precision="fp16",
batch_size=4
)

Frame Skipping

# Process every Nth frame
agent.configure_processing(
skip_frames=2, # Process every 3rd frame
target_fps=10
)

Region of Interest

# Only process specific region
agent.set_roi(
camera_id="CAM_001",
roi=[[100, 100], [500, 400]] # [x, y] coordinates
)

Testing

Unit Tests

import pytest

def test_vehicle_detection():
agent = CVAnalysisAgent()

# Test with known image
frame = cv2.imread("test_data/traffic_5_vehicles.jpg")
result = agent.analyze_frame("CAM_TEST", frame)

assert result.vehicle_count >= 4 # Allow ±1 tolerance
assert len(result.detections) > 0

def test_speed_estimation():
agent = CVAnalysisAgent()
agent.calibrate_camera("CAM_TEST", 50, 500)

# Test with moving vehicle sequence
speeds = []
for frame in test_frames:
result = agent.analyze_frame("CAM_TEST", frame)
if result.detections:
speeds.append(result.detections[0].speed)

assert len(speeds) > 0
assert all(s >= 0 for s in speeds)

Best Practices

1. Camera Calibration

Always calibrate cameras for accurate measurements:

agent.calibrate_all_cameras(
calibration_data="config/camera_calibrations.yaml"
)

2. Adaptive Processing

Adjust processing based on traffic density:

agent.enable_adaptive_processing(
low_traffic_fps=5,
high_traffic_fps=15
)

3. Model Ensemble

Use multiple models for better accuracy:

agent.enable_model_ensemble(
models=["yolox_x", "yolox_l"],
voting_method="confidence"
)

Troubleshooting

Issue: Low Detection Accuracy

Solution: Adjust confidence threshold and retrain on local data

agent.configure_model(confidence_threshold=0.4)
agent.fine_tune(training_images="local_traffic_data/")

Issue: Inaccurate Speed Estimation

Solution: Recalibrate camera and increase smoothing

agent.calibrate_camera("CAM_001", reference_distance=50, pixel_distance=500)
agent.configure_speed_estimation(smoothing_window=10)

Issue: High GPU Memory Usage

Solution: Reduce batch size and use FP16 precision

agent.configure_device(batch_size=1, precision="fp16")

License

MIT License - Copyright (c) 2025 UIP Contributors (Nguyễn Nhật Quang, Nguyễn Việt Hoàng, Nguyễn Đình Anh Tuấn)

See LICENSE for details.