Pattern Recognition Agent
Overview
The Pattern Recognition Agent employs machine learning algorithms to identify recurring traffic patterns, predict congestion, detect anomalies, and provide actionable insights for traffic management optimization.
Features
- Traffic Pattern Detection: Identify daily, weekly, and seasonal patterns
- Anomaly Detection: Real-time detection of unusual traffic behavior
- Predictive Analytics: Forecast traffic conditions up to 24 hours ahead
- Hotspot Identification: Locate recurring congestion points
- Correlation Analysis: Link patterns with events, weather, accidents
- Adaptive Learning: Continuous model improvement from new data
Architecture
graph TB
A[Historical Traffic Data] --> B[Feature Extractor]
C[Real-time Data Stream] --> B
B --> D[Pattern Detector]
D --> E[ML Models]
E --> F{Pattern Type}
F -->|Normal| G[Baseline Update]
F -->|Anomaly| H[Anomaly Handler]
F -->|Congestion| I[Prediction Model]
H --> J[Alert System]
I --> K[Forecast Generator]
G --> L[MongoDB Storage]
K --> M[Visualization Service]
Configuration
File: config/pattern_recognition.yaml
pattern_recognition:
models:
time_series:
type: "prophet"
seasonality_mode: "multiplicative"
changepoint_prior_scale: 0.05
anomaly_detection:
type: "isolation_forest"
contamination: 0.1
n_estimators: 100
clustering:
type: "dbscan"
eps: 0.5
min_samples: 5
feature_extraction:
temporal_features:
- "hour_of_day"
- "day_of_week"
- "month"
- "is_holiday"
- "is_weekend"
traffic_features:
- "vehicle_count"
- "average_speed"
- "congestion_level"
- "flow_rate"
environmental_features:
- "weather_condition"
- "temperature"
- "precipitation"
- "visibility"
pattern_types:
daily_patterns:
time_window: "24h"
min_occurrences: 5
weekly_patterns:
time_window: "7d"
min_occurrences: 3
event_patterns:
look_back: "30d"
correlation_threshold: 0.7
thresholds:
anomaly_score: 0.75
prediction_confidence: 0.80
pattern_strength: 0.65
Usage
Basic Pattern Detection
from src.agents.analytics.pattern_recognition_agent import PatternRecognitionAgent
# Initialize agent
agent = PatternRecognitionAgent()
# Detect patterns in historical data
patterns = agent.detect_patterns(
location="District 1",
time_range="30d"
)
print(f"Patterns found: {len(patterns)}")
for pattern in patterns:
print(f"Type: {pattern.type}")
print(f"Strength: {pattern.strength}")
print(f"Occurrences: {pattern.occurrences}")
Anomaly Detection
# Real-time anomaly detection
anomaly = agent.detect_anomaly(
camera_id="CAM_001",
current_metrics={
"vehicle_count": 150,
"average_speed": 15,
"congestion_level": "high"
}
)
if anomaly.is_anomalous:
print(f"Anomaly detected!")
print(f"Severity: {anomaly.severity}")
print(f"Confidence: {anomaly.confidence}")
print(f"Expected: {anomaly.expected_values}")
print(f"Actual: {anomaly.actual_values}")
Traffic Prediction
# Predict traffic for next 24 hours
predictions = agent.predict_traffic(
location="District 1",
forecast_hours=24
)
for hour in predictions:
print(f"{hour.timestamp}: {hour.vehicle_count} vehicles")
print(f" Congestion Level: {hour.congestion_level}")
print(f" Confidence: {hour.confidence}")
Hotspot Analysis
# Identify congestion hotspots
hotspots = agent.identify_hotspots(
time_range="30d",
min_occurrences=10
)
for hotspot in hotspots:
print(f"Location: {hotspot.location}")
print(f"Frequency: {hotspot.frequency}")
print(f"Peak Hours: {hotspot.peak_hours}")
print(f"Average Duration: {hotspot.avg_duration}min")
API Reference
Class: PatternRecognitionAgent
Methods
detect_patterns(location: str, time_range: str) -> List[Pattern]
Detect recurring traffic patterns.
Parameters:
location(str): Geographic locationtime_range(str): Analysis period (e.g., "7d", "30d")
Returns:
- List[Pattern]: Detected patterns
Example:
patterns = agent.detect_patterns(
location="District 1",
time_range="30d"
)
detect_anomaly(camera_id: str, current_metrics: dict) -> AnomalyResult
Detect traffic anomalies in real-time.
Parameters:
camera_id(str): Camera identifiercurrent_metrics(dict): Current traffic metrics
Returns:
- AnomalyResult: Anomaly detection result
predict_traffic(location: str, forecast_hours: int) -> List[Prediction]
Predict future traffic conditions.
Parameters:
location(str): Location to predictforecast_hours(int): Forecast horizon (1-24 hours)
Returns:
- List[Prediction]: Hourly predictions
Example:
forecast = agent.predict_traffic(
location="District 1",
forecast_hours=12
)
identify_hotspots(time_range: str, min_occurrences: int = 5) -> List[Hotspot]
Identify recurring congestion hotspots.
Parameters:
time_range(str): Analysis periodmin_occurrences(int): Minimum occurrence threshold
Returns:
- List[Hotspot]: Identified hotspots
analyze_correlation(event_type: str, time_range: str) -> CorrelationAnalysis
Analyze correlation between events and traffic patterns.
Parameters:
event_type(str): Event type (weather, accidents, holidays)time_range(str): Analysis period
Returns:
- CorrelationAnalysis: Correlation results
Example:
correlation = agent.analyze_correlation(
event_type="heavy_rain",
time_range="90d"
)
print(f"Correlation coefficient: {correlation.coefficient}")
Data Models
Pattern
@dataclass
class Pattern:
id: str
type: str # daily, weekly, seasonal, event-based
location: dict
strength: float # 0.0-1.0
occurrences: int
time_windows: List[dict] # When pattern occurs
characteristics: dict
confidence: float
first_detected: datetime
last_updated: datetime
AnomalyResult
@dataclass
class AnomalyResult:
timestamp: datetime
is_anomalous: bool
severity: str # low, medium, high, critical
confidence: float
anomaly_score: float # 0.0-1.0
expected_values: dict
actual_values: dict
deviation_percentage: float
possible_causes: List[str]
Prediction
@dataclass
class Prediction:
timestamp: datetime
location: dict
vehicle_count: int
average_speed: float
congestion_level: str
confidence: float
prediction_interval: dict # upper/lower bounds
factors: List[str]
Hotspot
@dataclass
class Hotspot:
location: dict
frequency: int # Occurrences in time_range
peak_hours: List[int]
avg_duration: int # minutes
severity_distribution: dict
contributing_factors: List[str]
historical_trend: str # increasing, stable, decreasing
Pattern Types
1. Daily Patterns
# Rush hour patterns
daily_patterns = agent.detect_patterns(
location="District 1",
time_range="30d",
pattern_type="daily"
)
for pattern in daily_patterns:
if pattern.type == "morning_rush":
print(f"Morning rush: {pattern.time_windows}")
elif pattern.type == "evening_rush":
print(f"Evening rush: {pattern.time_windows}")
2. Weekly Patterns
# Weekend vs weekday patterns
weekly_patterns = agent.detect_patterns(
time_range="90d",
pattern_type="weekly"
)
# Compare weekend and weekday traffic
comparison = agent.compare_patterns(
pattern1=weekly_patterns["weekend"],
pattern2=weekly_patterns["weekday"]
)
3. Event-Based Patterns
# Patterns related to specific events
event_patterns = agent.detect_event_patterns(
event_types=["concerts", "sports", "holidays"],
time_range="1y"
)
4. Seasonal Patterns
# Monsoon season traffic patterns
seasonal = agent.detect_patterns(
pattern_type="seasonal",
season="monsoon",
time_range="3y"
)
Machine Learning Models
Time Series Forecasting (Prophet)
# Configure Prophet model
agent.configure_prophet_model(
seasonality_mode="multiplicative",
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=True
)
Anomaly Detection (Isolation Forest)
# Train anomaly detection model
agent.train_anomaly_detector(
training_data=historical_data,
contamination=0.1,
n_estimators=100
)
Clustering (DBSCAN)
# Cluster similar traffic patterns
clusters = agent.cluster_patterns(
eps=0.5,
min_samples=5,
metric="euclidean"
)
Integration Examples
Integration with Accident Detection
from src.agents.analytics.accident_detection_agent import AccidentDetectionAgent
pattern_agent = PatternRecognitionAgent()
accident_agent = AccidentDetectionAgent()
# Analyze accident patterns
accident_patterns = pattern_agent.analyze_accident_patterns(
time_range="90d"
)
print(f"High-risk locations: {accident_patterns.hotspots}")
print(f"Peak accident hours: {accident_patterns.peak_hours}")
# Predict high-risk periods
risk_forecast = pattern_agent.predict_accident_risk(
location="Highway 1",
forecast_hours=24
)
Integration with Weather Data
from src.agents.data_collection.weather_agent import WeatherAgent
weather_agent = WeatherAgent()
# Analyze weather-traffic correlation
correlation = pattern_agent.analyze_weather_correlation(
weather_data=weather_agent.get_historical_weather("30d"),
traffic_data=pattern_agent.get_traffic_history("30d")
)
print(f"Rain impact on traffic: {correlation.rain_impact}%")
print(f"Temperature correlation: {correlation.temp_coefficient}")
Integration with Route Planning
# Use patterns for intelligent route planning
def get_optimal_route(origin, destination, departure_time):
# Get predicted traffic for departure time
prediction = pattern_agent.predict_traffic(
location=origin,
timestamp=departure_time
)
# Factor in predicted congestion
route = route_planner.calculate_route(
origin=origin,
destination=destination,
avoid_congestion=prediction.congestion_hotspots
)
return route
Monitoring & Metrics
Model Performance
metrics = agent.get_model_metrics()
print(f"Prediction Accuracy: {metrics.accuracy}%")
print(f"Mean Absolute Error: {metrics.mae}")
print(f"Anomaly Detection Precision: {metrics.precision}")
print(f"Anomaly Detection Recall: {metrics.recall}")
Pattern Statistics
stats = agent.get_pattern_statistics(time_range="30d")
print(f"Total Patterns Detected: {stats.total_patterns}")
print(f"Active Patterns: {stats.active_patterns}")
print(f"Pattern Types Distribution: {stats.type_distribution}")
Performance Optimization
Feature Caching
# Cache computed features
agent.enable_feature_caching(
cache_ttl=3600, # 1 hour
cache_size=1000
)
Parallel Processing
# Process multiple locations in parallel
agent.configure_parallel_processing(
num_workers=4,
batch_size=50
)
Model Optimization
# Use lightweight models for real-time detection
agent.configure_realtime_mode(
model_complexity="low",
max_latency_ms=100
)
Testing
Unit Tests
import pytest
def test_pattern_detection():
agent = PatternRecognitionAgent()
# Test with known pattern data
patterns = agent.detect_patterns(
location="Test Location",
time_range="7d"
)
assert len(patterns) > 0
assert all(p.strength >= 0 and p.strength <= 1 for p in patterns)
def test_anomaly_detection():
agent = PatternRecognitionAgent()
# Test with normal and anomalous data
normal_result = agent.detect_anomaly("CAM_001", normal_metrics)
anomalous_result = agent.detect_anomaly("CAM_001", anomalous_metrics)
assert normal_result.is_anomalous == False
assert anomalous_result.is_anomalous == True
Best Practices
1. Regular Model Retraining
# Retrain models with new data monthly
agent.schedule_retraining(
frequency="monthly",
min_new_samples=1000
)
2. Adaptive Thresholds
# Adjust thresholds based on location characteristics
agent.set_adaptive_thresholds(
location_type="urban_center",
traffic_density="high"
)
3. Multi-Model Ensemble
# Combine multiple models for better accuracy
agent.enable_ensemble(
models=["prophet", "lstm", "arima"],
voting_strategy="weighted"
)
Troubleshooting
Issue: Poor Prediction Accuracy
Solution: Increase training data and add more features
agent.add_features([
"holiday_indicator",
"special_events",
"construction_activity"
])
agent.retrain(min_training_days=90)
Issue: Too Many False Anomalies
Solution: Adjust contamination parameter
agent.configure_anomaly_detector(contamination=0.05)
Related Documentation
License
MIT License - Copyright (c) 2025 UIP Contributors (Nguyễn Nhật Quang, Nguyễn Việt Hoàng, Nguyễn Đình Anh Tuấn)
See LICENSE for details.