Pattern Recognition Agent
Overview
The Pattern Recognition Agent employs machine learning algorithms to identify recurring traffic patterns, predict congestion, detect anomalies, and provide actionable insights for traffic management optimization.
Features
- Traffic Pattern Detection: Identify daily, weekly, and seasonal patterns
- Anomaly Detection: Real-time detection of unusual traffic behavior
- Predictive Analytics: Forecast traffic conditions up to 24 hours ahead
- Hotspot Identification: Locate recurring congestion points
- Correlation Analysis: Link patterns with events, weather, accidents
- Adaptive Learning: Continuous model improvement from new data
Architecture
graph TB
A[Historical Traffic Data] --> B[Feature Extractor]
C[Real-time Data Stream] --> B
B --> D[Pattern Detector]
D --> E[ML Models]
E --> F{Pattern Type}
F -->|Normal| G[Baseline Update]
F -->|Anomaly| H[Anomaly Handler]
F -->|Congestion| I[Prediction Model]
H --> J[Alert System]
I --> K[Forecast Generator]
G --> L[MongoDB Storage]
K --> M[Visualization Service]
Configuration
File: config/pattern_recognition.yaml
pattern_recognition:
models:
time_series:
type: "prophet"
seasonality_mode: "multiplicative"
changepoint_prior_scale: 0.05
anomaly_detection:
type: "isolation_forest"
contamination: 0.1
n_estimators: 100
clustering:
type: "dbscan"
eps: 0.5
min_samples: 5
feature_extraction:
temporal_features:
- "hour_of_day"
- "day_of_week"
- "month"
- "is_holiday"
- "is_weekend"
traffic_features:
- "vehicle_count"
- "average_speed"
- "congestion_level"
- "flow_rate"
environmental_features:
- "weather_condition"
- "temperature"
- "precipitation"
- "visibility"
pattern_types:
daily_patterns:
time_window: "24h"
min_occurrences: 5
weekly_patterns:
time_window: "7d"
min_occurrences: 3
event_patterns:
look_back: "30d"
correlation_threshold: 0.7
thresholds:
anomaly_score: 0.75
prediction_confidence: 0.80
pattern_strength: 0.65
Usage
Basic Pattern Detection
from src.agents.analytics.pattern_recognition_agent import PatternRecognitionAgent
# Initialize agent
agent = PatternRecognitionAgent()
# Detect patterns in historical data
patterns = agent.detect_patterns(
location="District 1",
time_range="30d"
)
print(f"Patterns found: {len(patterns)}")
for pattern in patterns:
print(f"Type: {pattern.type}")
print(f"Strength: {pattern.strength}")
print(f"Occurrences: {pattern.occurrences}")
Anomaly Detection
# Real-time anomaly detection
anomaly = agent.detect_anomaly(
camera_id="CAM_001",
current_metrics={
"vehicle_count": 150,
"average_speed": 15,
"congestion_level": "high"
}
)
if anomaly.is_anomalous:
print(f"Anomaly detected!")
print(f"Severity: {anomaly.severity}")
print(f"Confidence: {anomaly.confidence}")
print(f"Expected: {anomaly.expected_values}")
print(f"Actual: {anomaly.actual_values}")
Traffic Prediction
# Predict traffic for next 24 hours
predictions = agent.predict_traffic(
location="District 1",
forecast_hours=24
)
for hour in predictions:
print(f"{hour.timestamp}: {hour.vehicle_count} vehicles")
print(f" Congestion Level: {hour.congestion_level}")
print(f" Confidence: {hour.confidence}")
Hotspot Analysis
# Identify congestion hotspots
hotspots = agent.identify_hotspots(
time_range="30d",
min_occurrences=10
)
for hotspot in hotspots:
print(f"Location: {hotspot.location}")
print(f"Frequency: {hotspot.frequency}")
print(f"Peak Hours: {hotspot.peak_hours}")
print(f"Average Duration: {hotspot.avg_duration}min")
API Reference
Class: PatternRecognitionAgent
Methods
detect_patterns(location: str, time_range: str) -> List[Pattern]
Detect recurring traffic patterns.
Parameters:
location(str): Geographic locationtime_range(str): Analysis period (e.g., "7d", "30d")
Returns:
- List[Pattern]: Detected patterns
Example:
patterns = agent.detect_patterns(
location="District 1",
time_range="30d"
)