Complete Agents Reference
Overview
Comprehensive documentation for all 30+ agents in the HCMC Traffic Management System, organized by category with full technical specifications, configurations, and integration examples.
Table of Contents
Data Collection Agents
Analytics Agents
- Accident Detection Agent
- Pattern Recognition Agent
- Congestion Detection Agent
- Computer Vision Analysis Agent
- Citizen Report Agent
Transformation Agents
RDF & Linked Data Agents
- NGSI-LD to RDF Agent
- Triplestore Loader Agent
- LOD Linkset Enrichment Agent
- Content Negotiation Agent
- Smart Data Models Validation Agent
Context Management Agents
Notification Agents
- Alert Dispatcher Agent
- Subscription Manager Agent
- Incident Report Generator Agent
- Email Notification Handler Agent
- Webhook Notification Handler Agent
Graph Database Agents
Integration Agents
Monitoring Agents
State Management Agents
- State Manager Agent
- Accident State Manager Agent
- Congestion State Manager Agent
- Temporal State Tracker Agent
Additional Agents
Data Collection Agents
Camera Image Fetch Agent
Overview
Fetches images from 150+ traffic cameras across HCMC, processes with YOLOX-X for vehicle detection, and implements multi-tier caching for performance optimization.
Features
- Multi-Camera Support: Parallel processing of 150+ cameras
- Caching Strategy: 3-tier (Memory → Redis → MongoDB)
- Image Processing: Automatic resizing, normalization, quality checks
- Error Handling: Retry logic, fallback mechanisms, circuit breaker
- CV Integration: YOLOX-X vehicle detection preprocessing
- Scheduling: Configurable fetch intervals (1s - 60s)
Architecture
graph TB
A[Camera Sources] --> B[Image Fetch Agent]
B --> C{Cache Check}
C -->|Hit| D[Return Cached]
C -->|Miss| E[Fetch Image]
E --> F[Process Image]
F --> G[YOLOX Detection]
G --> H[Update Caches]
H --> I[Memory Cache]
H --> J[Redis Cache]
H --> K[MongoDB]
Configuration
# config/camera_image_config.yaml
camera_image_fetch:
enabled: true
fetch_interval: 5 # seconds
cameras:
source_type: "rtsp"
connection_timeout: 10
read_timeout: 15
max_retries: 3
image_processing:
target_size: [1280, 720]
quality: 85
format: "JPEG"
normalize: true
caching:
memory_ttl: 60
redis_ttl: 300
mongo_ttl: 86400
max_memory_items: 1000
yolox:
model_path: "yolox_x.pt"
confidence: 0.5
iou: 0.45
device: "cuda"
Usage
from src.agents.data_collection.camera_image_fetch_agent import CameraImageFetchAgent
# Initialize
agent = CameraImageFetchAgent()
# Fetch single camera
image = agent.fetch_camera("CAM_001")
# Fetch multiple cameras (parallel)
images = agent.fetch_cameras_batch(["CAM_001", "CAM_002", "CAM_003"])
# With CV detection
result = agent.fetch_and_detect("CAM_001")
print(f"Vehicles detected: {len(result.detections)}")
API Reference
class CameraImageFetchAgent:
def fetch_camera(self, camera_id: str) -> CameraImage
def fetch_cameras_batch(self, camera_ids: List[str]) -> List[CameraImage]
def fetch_and_detect(self, camera_id: str) -> DetectionResult
def get_cache_stats(self) -> CacheStatistics
def clear_cache(self, cache_type: str = "all")
Data Models
@dataclass
class CameraImage:
camera_id: str
timestamp: datetime
image: np.ndarray
metadata: ImageMetadata
@dataclass
class DetectionResult:
camera_id: str
timestamp: datetime
detections: List[Detection]
confidence: float
Weather Integration Agent
Overview
Integrates multiple weather data sources (OpenWeather, WeatherAPI) to provide real-time weather conditions correlated with traffic patterns.
Features
- Multi-Source Integration: 2+ weather APIs with fallback
- Traffic Correlation: Weather impact analysis on traffic flow
- Alert System: Severe weather alerts affecting traffic
- Forecasting: 24-hour weather predictions
- Historical Data: Weather pattern analysis
- Caching: Optimized API call reduction
Configuration
# config/weather_config.yaml
weather_integration:
enabled: true
update_interval: 300 # 5 minutes
sources:
openweather:
api_key: "${OPENWEATHER_API_KEY}"
endpoint: "https://api.openweathermap.org/data/2.5"
priority: 1
weatherapi:
api_key: "${WEATHERAPI_KEY}"
endpoint: "https://api.weatherapi.com/v1"
priority: 2
correlation:
enabled: true
traffic_threshold: 0.7
weather_impact_factors:
rain: 0.8
heavy_rain: 0.5
fog: 0.6
Usage
from src.agents.data_collection.weather_integration_agent import WeatherIntegrationAgent
agent = WeatherIntegrationAgent()
# Get current weather
weather = agent.get_current_weather(lat=10.7769, lon=106.7009)
print(f"Temperature: {weather.temperature}°C, Conditions: {weather.conditions}")
# Get forecast
forecast = agent.get_forecast(lat=10.7769, lon=106.7009, hours=24)
# Weather-traffic correlation
correlation = agent.analyze_weather_traffic_correlation()
print(f"Rain impact on traffic: {correlation.rain_impact}")
Air Quality Agent
Overview
Monitors air quality across HCMC using AQI data from multiple stations, providing health impact assessments and pollution analysis.
Features
- Multi-Station Monitoring: Data from 20+ AQI stations
- Pollutant Analysis: PM2.5, PM10, NO2, SO2, CO, O3
- Health Impact: AQI category and health recommendations
- Traffic Correlation: Analyze pollution from traffic sources
- Alert System: Air quality alerts
- Historical Trends: Long-term pollution patterns
Configuration
# config/air_quality_config.yaml
air_quality:
enabled: true
update_interval: 300
data_sources:
aqicn:
api_key: "${AQICN_API_KEY}"
endpoint: "https://api.waqi.info"
stations:
- id: "hcm-district1"
location: [10.7769, 106.7009]
- id: "hcm-district3"
location: [10.7868, 106.6897]
thresholds:
pm25_good: 12
pm25_moderate: 35.4
pm25_unhealthy: 55.4
Usage
from src.agents.data_collection.air_quality_agent import AirQualityAgent
agent = AirQualityAgent()
# Get current AQI
aqi_data = agent.get_current_aqi(lat=10.7769, lon=106.7009)
print(f"AQI: {aqi_data.aqi}, Category: {aqi_data.category}")
# Get pollutant breakdown
pollutants = agent.get_pollutant_details()
print(f"PM2.5: {pollutants.pm25} µg/m³")
# Health recommendations
health = agent.get_health_recommendations(aqi_data.aqi)
Analytics Agents
Accident Detection Agent
Overview
Real-time accident detection using YOLOX-X computer vision model with 4-level severity classification and automatic emergency response triggers.
Features
- YOLOX-X Detection: 95%+ accuracy accident detection
- Severity Classification: Minor, Moderate, Severe, Critical
- Real-time Alerts: Sub-second emergency notifications
- Multi-Camera Fusion: Cross-camera validation
- Historical Analysis: Accident pattern identification
- Emergency Integration: Automatic 911/emergency dispatch
Architecture
graph TB
A[Camera Feed] --> B[YOLOX-X Model]
B --> C[Accident Classifier]
C --> D{Severity}
D -->|Minor| E[Log Event]
D -->|Moderate| F[Alert Traffic Ops]
D -->|Severe| G[Notify Emergency]
D -->|Critical| H[Emergency Dispatch]
G --> I[Alert Dispatcher]
H --> I
Configuration
# config/accident_config.yaml
accident_detection:
enabled: true
yolox:
model_path: "yolox_x.pt"
confidence_threshold: 0.75
iou_threshold: 0.45
classes: [2, 3, 5, 7] # car, motorcycle, bus, truck
severity_classification:
minor:
vehicles: [1, 2]
blocking_lanes: 0
injuries: false
moderate:
vehicles: [2, 3]
blocking_lanes: [1, 2]
injuries: false
severe:
vehicles: [3, 5]
blocking_lanes: [2, 3]
injuries: true
critical:
vehicles: "5+"
blocking_lanes: "3+"
injuries: true
fire: true
emergency_response:
minor_delay: 300
moderate_delay: 60
severe_delay: 10
critical_delay: 0
Usage
from src.agents.analytics.accident_detection_agent import AccidentDetectionAgent
agent = AccidentDetectionAgent()
# Detect from camera
result = agent.detect_accident(camera_id="CAM_001")
if result.accident_detected:
print(f"Severity: {result.severity}")
print(f"Confidence: {result.confidence}")
print(f"Vehicles involved: {result.vehicles_count}")
print(f"Emergency notified: {result.emergency_notified}")
# Batch detection
results = agent.detect_accidents_batch(["CAM_001", "CAM_002", "CAM_003"])
API Reference
class AccidentDetectionAgent:
def detect_accident(self, camera_id: str) -> AccidentDetectionResult
def detect_accidents_batch(self, camera_ids: List[str]) -> List[AccidentDetectionResult]
def classify_severity(self, detection: Detection) -> SeverityLevel
def trigger_emergency_response(self, accident_id: str)
Data Models
@dataclass
class AccidentDetectionResult:
accident_id: str
camera_id: str
timestamp: datetime
accident_detected: bool
severity: SeverityLevel
confidence: float
vehicles_count: int
location: Location
bounding_boxes: List[BoundingBox]
emergency_notified: bool
Pattern Recognition Agent
Overview
Machine learning-based pattern detection for recurring traffic issues, anomaly detection, and predictive analytics.
Features
- Pattern Detection: Identify recurring congestion patterns
- Anomaly Detection: Detect unusual traffic behavior
- Hotspot Identification: High-accident/congestion zones
- Temporal Analysis: Time-based pattern recognition
- Predictive Analytics: Forecast traffic patterns
- Correlation Analysis: Multi-factor correlation
Configuration
# config/pattern_recognition.yaml
pattern_recognition:
enabled: true
models:
clustering:
algorithm: "DBSCAN"
eps: 0.5
min_samples: 5
anomaly_detection:
algorithm: "IsolationForest"
contamination: 0.1
analysis:
time_windows: [1, 6, 24, 168] # hours
confidence_threshold: 0.7
min_occurrences: 3
Usage
from src.agents.analytics.pattern_recognition_agent import PatternRecognitionAgent
agent = PatternRecognitionAgent()
# Detect patterns
patterns = agent.detect_patterns(
start_time=datetime(2025, 11, 20),
end_time=datetime(2025, 12, 5)
)
for pattern in patterns:
print(f"Pattern: {pattern.type}")
print(f"Locations: {pattern.locations}")
print(f"Time: {pattern.time_pattern}")
print(f"Confidence: {pattern.confidence}")
# Anomaly detection
anomalies = agent.detect_anomalies(camera_id="CAM_001")
# Hotspot analysis
hotspots = agent.identify_hotspots()
Congestion Detection Agent
Overview
Real-time congestion monitoring with 5-level classification, zone management, and impact assessment across HCMC.
Features
- 5-Level Classification: Free flow → Severe congestion
- Zone Management: 50+ traffic zones
- Real-time Updates: Sub-minute latency
- Speed Estimation: Per-camera speed analysis
- Queue Length: Vehicle queue measurement
- Impact Assessment: Delay estimation, alternate routes
Configuration
# config/congestion_config.yaml
congestion_detection:
enabled: true
update_interval: 30
classification:
free_flow: {speed_ratio: ">0.85", density: "<0.3"}
light: {speed_ratio: "0.7-0.85", density: "0.3-0.5"}
moderate: {speed_ratio: "0.5-0.7", density: "0.5-0.7"}
heavy: {speed_ratio: "0.3-0.5", density: "0.7-0.85"}
severe: {speed_ratio: "<0.3", density: ">0.85"}
zones:
- id: "ZONE_001"
name: "District 1 Central"
cameras: ["CAM_001", "CAM_002", "CAM_003"]
free_flow_speed: 40
Usage
from src.agents.analytics.congestion_detection_agent import CongestionDetectionAgent
agent = CongestionDetectionAgent()
# Detect congestion
congestion = agent.detect_congestion(zone_id="ZONE_001")
print(f"Level: {congestion.level}")
print(f"Average speed: {congestion.avg_speed} km/h")
print(f"Delay: {congestion.estimated_delay} minutes")
# Get all zones status
zones = agent.get_all_zones_status()
Computer Vision Analysis Agent
Overview
Advanced computer vision for vehicle detection, classification, speed estimation, and lane monitoring using YOLOX-X and tracking algorithms.
Features
- Vehicle Detection: Cars, motorcycles, trucks, buses
- Vehicle Classification: 10+ vehicle types
- Speed Estimation: Calibrated speed measurement
- Lane Monitoring: Lane occupation analysis
- Vehicle Tracking: Multi-object tracking
- Counting: Directional vehicle counting
Configuration
# config/cv_config.yaml
cv_analysis:
enabled: true
yolox:
model: "yolox_x.pt"
confidence: 0.6
iou: 0.45
tracking:
algorithm: "DeepSORT"
max_age: 30
min_hits: 3
speed_estimation:
calibration_required: true
pixel_to_meter_ratio: 0.05
Usage
from src.agents.analytics.cv_analysis_agent import CVAnalysisAgent
agent = CVAnalysisAgent()
# Analyze image
analysis = agent.analyze_image(camera_id="CAM_001")
print(f"Vehicles: {analysis.vehicle_count}")
print(f"Types: {analysis.vehicle_types}")
print(f"Average speed: {analysis.avg_speed} km/h")
# Track vehicles
tracks = agent.track_vehicles(camera_id="CAM_001", duration=60)
Citizen Report Agent
Overview
Processes citizen-submitted traffic reports with AI validation, image analysis, and priority classification for rapid response.
Features
- AI Validation: Automatic report verification
- Image Analysis: Computer vision on submitted photos
- Priority Classification: Urgent, High, Medium, Low
- Duplicate Detection: Prevent duplicate reports
- Sentiment Analysis: Extract urgency from text
- Geo-tagging: Automatic location extraction
Configuration
# config/citizen_report_config.yaml
citizen_report:
enabled: true
validation:
require_image: true
min_description_length: 20
auto_verify_threshold: 0.8
priority:
accident_keywords: ["accident", "crash", "collision"]
urgency_keywords: ["emergency", "urgent", "critical"]
moderation:
profanity_filter: true
spam_detection: true
Usage
from src.agents.analytics.citizen_report_agent import CitizenReportAgent
agent = CitizenReportAgent()
# Process report
report = {
"category": "accident",
"description": "Multi-vehicle accident on Nguyen Hue St",
"location": {"lat": 10.7769, "lon": 106.7009},
"images": [image_data]
}
result = agent.process_report(report)
print(f"Priority: {result.priority}")
print(f"Verified: {result.verified}")
print(f"Confidence: {result.confidence}")
Transformation Agents
NGSI-LD Transformer Agent
Overview
Transforms raw traffic data into FIWARE NGSI-LD entities compliant with Smart Data Models for context broker integration.
Features
- FIWARE Compliance: Smart Data Models alignment
- Multi-Entity Support: 15+ entity types
- Batch Processing: Bulk transformations
- Validation: Schema validation
- Relationship Management: Entity relationships
- Temporal Properties: Time-series support
Configuration
# config/ngsi_ld_mappings.yaml
ngsi_ld_transformer:
context: "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld"
entity_types:
- TrafficFlowObserved
- AccidentDetected
- CongestionDetected
- WeatherObserved
- AirQualityObserved
validation:
strict_mode: true
allow_custom_properties: false
Usage
from src.agents.transformation.ngsi_ld_transformer_agent import NGSILDTransformerAgent
agent = NGSILDTransformerAgent()
# Transform single entity
raw_data = {...}
entity = agent.transform_to_ngsi_ld(raw_data, "TrafficFlowObserved")
# Batch transform
entities = agent.batch_transform(raw_data_list)
# Validate
is_valid = agent.validate_entity(entity)
SOSA/SSN Mapper Agent
Overview
Maps sensor observations to W3C SOSA/SSN ontology for semantic interoperability and RDF-based data integration.
Features
- W3C Compliance: SOSA/SSN standards
- Sensor Modeling: Complete sensor descriptions
- Observation Mapping: Sensor observations
- RDF Serialization: Turtle, JSON-LD, RDF/XML
- Ontology Alignment: Domain-specific extensions
- SPARQL Ready: Queryable RDF graphs
Configuration
# config/sosa_mappings.yaml
sosa_ssn_mapper:
namespaces:
sosa: "http://www.w3.org/ns/sosa/"
ssn: "http://www.w3.org/ns/ssn/"
ex: "http://hcmc-traffic.example.org/"
sensor_properties:
accuracy: true
precision: true
frequency: true
Usage
from src.agents.transformation.sosa_ssn_mapper_agent import SOSASSNMapperAgent
agent = SOSASSNMapperAgent()
# Map observation
observation = agent.map_to_sosa_observation(raw_data)
# Serialize to RDF
turtle = observation.serialize(format="turtle")
jsonld = observation.serialize(format="json-ld")
# Create sensor description
sensor = agent.create_sensor_description(camera_id="CAM_001")
RDF & Linked Data Agents
NGSI-LD to RDF Agent
Overview
Converts NGSI-LD entities to RDF triples for semantic web integration and linked data publishing.
Features
- RDF Conversion: NGSI-LD → RDF transformation
- Multiple Formats: Turtle, JSON-LD, RDF/XML, N-Triples
- Named Graphs: Graph organization
- URI Generation: Stable URI schemes
- Property Mapping: Attribute → RDF property
- Relationship Handling: Entity relationships as RDF links
Configuration
# config/rdf_conversion.yaml
ngsi_ld_to_rdf:
base_uri: "http://hcmc-traffic.example.org/"
output_format: "turtle"
namespaces:
ex: "http://hcmc-traffic.example.org/"
ngsi-ld: "https://uri.etsi.org/ngsi-ld/"
Usage
from src.agents.rdf_linked_data.ngsi_ld_to_rdf_agent import NGSILDToRDFAgent
agent = NGSILDToRDFAgent()
# Convert entity
ngsi_entity = {...}
rdf_graph = agent.convert_to_rdf(ngsi_entity)
# Serialize
turtle = rdf_graph.serialize(format="turtle")
jsonld = rdf_graph.serialize(format="json-ld")
Triplestore Loader Agent
Overview
Manages loading, updating, and querying RDF triples in Apache Jena Fuseki triplestore with SPARQL support.
Features
- Bulk Loading: Efficient batch triple loading
- Named Graphs: Organized graph management
- SPARQL Queries: SELECT, CONSTRUCT, ASK, DESCRIBE
- SPARQL Updates: INSERT, DELETE operations
- Incremental Updates: Delta updates
- Transaction Support: ACID compliance
Usage
from src.agents.rdf_linked_data.triplestore_loader_agent import TriplestoreLoaderAgent
agent = TriplestoreLoaderAgent()
# Load triples
agent.load_triples(
file_path="data/observations.ttl",
graph_name="http://hcmc-traffic.example.org/graph/traffic"
)
# Execute query
results = agent.execute_query("""
PREFIX sosa: <http://www.w3.org/ns/sosa/>
SELECT ?sensor ?value WHERE {
?obs sosa:madeBySensor ?sensor ;
sosa:hasSimpleResult ?value .
}
""")
LOD Linkset Enrichment Agent
Overview
Enriches local data with Linked Open Data from DBpedia, Wikidata, and GeoNames for semantic enrichment.
Features
- Entity Linking: Link to LOD resources
- DBpedia Integration: Extract entity information
- Wikidata Alignment: Cross-reference entities
- GeoNames: Location enrichment
- Automatic Linking: Entity resolution
- Confidence Scoring: Link quality assessment
Configuration
# config/lod_enrichment.yaml
lod_linkset:
sources:
dbpedia:
endpoint: "https://dbpedia.org/sparql"
enabled: true
wikidata:
endpoint: "https://query.wikidata.org/sparql"
enabled: true
geonames:
api_key: "${GEONAMES_KEY}"
enabled: true
Usage
from src.agents.rdf_linked_data.lod_linkset_enrichment_agent import LODLinksetAgent
agent = LODLinksetAgent()
# Enrich location
enriched = agent.enrich_location(
name="Ho Chi Minh City",
lat=10.7769,
lon=106.7009
)
print(f"DBpedia: {enriched.dbpedia_uri}")
print(f"Wikidata: {enriched.wikidata_id}")
print(f"GeoNames: {enriched.geonames_id}")
Content Negotiation Agent
Overview
Handles HTTP content negotiation for serving RDF data in multiple formats based on client Accept headers.
Features
- Format Negotiation: Turtle, JSON-LD, RDF/XML, N-Triples
- HTTP Compliance: RFC 7231 content negotiation
- Quality Values: Support q-values
- Default Fallback: Turtle as default
- Compression: Gzip support
- Caching: ETag and Last-Modified headers
Configuration
# config/content_negotiation_config.yaml
content_negotiation:
formats:
- mime_type: "text/turtle"
extension: "ttl"
priority: 1
- mime_type: "application/ld+json"
extension: "jsonld"
priority: 2
- mime_type: "application/rdf+xml"
extension: "rdf"
priority: 3
Usage
from src.agents.rdf_linked_data.content_negotiation_agent import ContentNegotiationAgent
agent = ContentNegotiationAgent()
# Negotiate format
accept_header = "application/ld+json, text/turtle;q=0.8"
format = agent.negotiate_format(accept_header)
# Serve data
response = agent.serve_entity(
entity_id="urn:ngsi-ld:TrafficFlowObserved:001",
accept_header=accept_header
)
Smart Data Models Validation Agent
Overview
Validates NGSI-LD entities against FIWARE Smart Data Models schemas for standards compliance.
Features
- Schema Validation: JSON Schema validation
- FIWARE Compliance: Smart Data Models
- Custom Schemas: Domain-specific extensions
- Validation Reports: Detailed error reports
- Batch Validation: Multiple entities
- Auto-correction: Suggest fixes
Configuration
# config/smart_data_models_validation.yaml
validation:
schemas_path: "schemas/fiware/"
strict_mode: true
models:
- TrafficFlowObserved
- OffStreetParking
- WeatherObserved
Usage
from src.agents.rdf_linked_data.smart_data_models_validation_agent import ValidationAgent
agent = ValidationAgent()
# Validate entity
result = agent.validate_entity(entity, "TrafficFlowObserved")
if not result.is_valid:
print(f"Errors: {result.errors}")
# Batch validate
results = agent.batch_validate(entities)
Context Management Agents
Entity Publisher Agent
Overview
Manages NGSI-LD entity lifecycle in Stellio Context Broker with CRUD operations, batch processing, and subscriptions.