Chuyển tới nội dung chính

Complete Agents Reference

Overview

Comprehensive documentation for all 30+ agents in the HCMC Traffic Management System, organized by category with full technical specifications, configurations, and integration examples.


Table of Contents

Data Collection Agents

  1. Camera Image Fetch Agent
  2. Weather Integration Agent
  3. Air Quality Agent

Analytics Agents

  1. Accident Detection Agent
  2. Pattern Recognition Agent
  3. Congestion Detection Agent
  4. Computer Vision Analysis Agent
  5. Citizen Report Agent

Transformation Agents

  1. NGSI-LD Transformer Agent
  2. SOSA/SSN Mapper Agent

RDF & Linked Data Agents

  1. NGSI-LD to RDF Agent
  2. Triplestore Loader Agent
  3. LOD Linkset Enrichment Agent
  4. Content Negotiation Agent
  5. Smart Data Models Validation Agent

Context Management Agents

  1. Entity Publisher Agent
  2. Stellio State Query Agent
  3. Temporal Data Manager Agent
  4. State Updater Agent

Notification Agents

  1. Alert Dispatcher Agent
  2. Subscription Manager Agent
  3. Incident Report Generator Agent
  4. Email Notification Handler Agent
  5. Webhook Notification Handler Agent

Graph Database Agents

  1. Neo4j Sync Agent
  2. Neo4j Query Agent

Integration Agents

  1. API Gateway Agent
  2. Cache Manager Agent

Monitoring Agents

  1. Health Check Agent
  2. Performance Monitor Agent
  3. Data Quality Validator Agent

State Management Agents

  1. State Manager Agent
  2. Accident State Manager Agent
  3. Congestion State Manager Agent
  4. Temporal State Tracker Agent

Additional Agents

  1. Cache Invalidator Agent
  2. Kafka Entity Publisher Agent

Data Collection Agents

Camera Image Fetch Agent

Overview

Fetches images from 150+ traffic cameras across HCMC, processes with YOLOX-X for vehicle detection, and implements multi-tier caching for performance optimization.

Features

  • Multi-Camera Support: Parallel processing of 150+ cameras
  • Caching Strategy: 3-tier (Memory → Redis → MongoDB)
  • Image Processing: Automatic resizing, normalization, quality checks
  • Error Handling: Retry logic, fallback mechanisms, circuit breaker
  • CV Integration: YOLOX-X vehicle detection preprocessing
  • Scheduling: Configurable fetch intervals (1s - 60s)

Architecture

graph TB
A[Camera Sources] --> B[Image Fetch Agent]
B --> C{Cache Check}
C -->|Hit| D[Return Cached]
C -->|Miss| E[Fetch Image]
E --> F[Process Image]
F --> G[YOLOX Detection]
G --> H[Update Caches]
H --> I[Memory Cache]
H --> J[Redis Cache]
H --> K[MongoDB]

Configuration

# config/camera_image_config.yaml
camera_image_fetch:
enabled: true
fetch_interval: 5 # seconds

cameras:
source_type: "rtsp"
connection_timeout: 10
read_timeout: 15
max_retries: 3

image_processing:
target_size: [1280, 720]
quality: 85
format: "JPEG"
normalize: true

caching:
memory_ttl: 60
redis_ttl: 300
mongo_ttl: 86400
max_memory_items: 1000

yolox:
model_path: "yolox_x.pt"
confidence: 0.5
iou: 0.45
device: "cuda"

Usage

from src.agents.data_collection.camera_image_fetch_agent import CameraImageFetchAgent

# Initialize
agent = CameraImageFetchAgent()

# Fetch single camera
image = agent.fetch_camera("CAM_001")

# Fetch multiple cameras (parallel)
images = agent.fetch_cameras_batch(["CAM_001", "CAM_002", "CAM_003"])

# With CV detection
result = agent.fetch_and_detect("CAM_001")
print(f"Vehicles detected: {len(result.detections)}")

API Reference

class CameraImageFetchAgent:
def fetch_camera(self, camera_id: str) -> CameraImage
def fetch_cameras_batch(self, camera_ids: List[str]) -> List[CameraImage]
def fetch_and_detect(self, camera_id: str) -> DetectionResult
def get_cache_stats(self) -> CacheStatistics
def clear_cache(self, cache_type: str = "all")

Data Models

@dataclass
class CameraImage:
camera_id: str
timestamp: datetime
image: np.ndarray
metadata: ImageMetadata

@dataclass
class DetectionResult:
camera_id: str
timestamp: datetime
detections: List[Detection]
confidence: float

Weather Integration Agent

Overview

Integrates multiple weather data sources (OpenWeather, WeatherAPI) to provide real-time weather conditions correlated with traffic patterns.

Features

  • Multi-Source Integration: 2+ weather APIs with fallback
  • Traffic Correlation: Weather impact analysis on traffic flow
  • Alert System: Severe weather alerts affecting traffic
  • Forecasting: 24-hour weather predictions
  • Historical Data: Weather pattern analysis
  • Caching: Optimized API call reduction

Configuration

# config/weather_config.yaml
weather_integration:
enabled: true
update_interval: 300 # 5 minutes

sources:
openweather:
api_key: "${OPENWEATHER_API_KEY}"
endpoint: "https://api.openweathermap.org/data/2.5"
priority: 1

weatherapi:
api_key: "${WEATHERAPI_KEY}"
endpoint: "https://api.weatherapi.com/v1"
priority: 2

correlation:
enabled: true
traffic_threshold: 0.7
weather_impact_factors:
rain: 0.8
heavy_rain: 0.5
fog: 0.6

Usage

from src.agents.data_collection.weather_integration_agent import WeatherIntegrationAgent

agent = WeatherIntegrationAgent()

# Get current weather
weather = agent.get_current_weather(lat=10.7769, lon=106.7009)
print(f"Temperature: {weather.temperature}°C, Conditions: {weather.conditions}")

# Get forecast
forecast = agent.get_forecast(lat=10.7769, lon=106.7009, hours=24)

# Weather-traffic correlation
correlation = agent.analyze_weather_traffic_correlation()
print(f"Rain impact on traffic: {correlation.rain_impact}")

Air Quality Agent

Overview

Monitors air quality across HCMC using AQI data from multiple stations, providing health impact assessments and pollution analysis.

Features

  • Multi-Station Monitoring: Data from 20+ AQI stations
  • Pollutant Analysis: PM2.5, PM10, NO2, SO2, CO, O3
  • Health Impact: AQI category and health recommendations
  • Traffic Correlation: Analyze pollution from traffic sources
  • Alert System: Air quality alerts
  • Historical Trends: Long-term pollution patterns

Configuration

# config/air_quality_config.yaml
air_quality:
enabled: true
update_interval: 300

data_sources:
aqicn:
api_key: "${AQICN_API_KEY}"
endpoint: "https://api.waqi.info"

stations:
- id: "hcm-district1"
location: [10.7769, 106.7009]
- id: "hcm-district3"
location: [10.7868, 106.6897]

thresholds:
pm25_good: 12
pm25_moderate: 35.4
pm25_unhealthy: 55.4

Usage

from src.agents.data_collection.air_quality_agent import AirQualityAgent

agent = AirQualityAgent()

# Get current AQI
aqi_data = agent.get_current_aqi(lat=10.7769, lon=106.7009)
print(f"AQI: {aqi_data.aqi}, Category: {aqi_data.category}")

# Get pollutant breakdown
pollutants = agent.get_pollutant_details()
print(f"PM2.5: {pollutants.pm25} µg/m³")

# Health recommendations
health = agent.get_health_recommendations(aqi_data.aqi)

Analytics Agents

Accident Detection Agent

Overview

Real-time accident detection using YOLOX-X computer vision model with 4-level severity classification and automatic emergency response triggers.

Features

  • YOLOX-X Detection: 95%+ accuracy accident detection
  • Severity Classification: Minor, Moderate, Severe, Critical
  • Real-time Alerts: Sub-second emergency notifications
  • Multi-Camera Fusion: Cross-camera validation
  • Historical Analysis: Accident pattern identification
  • Emergency Integration: Automatic 911/emergency dispatch

Architecture

graph TB
A[Camera Feed] --> B[YOLOX-X Model]
B --> C[Accident Classifier]
C --> D{Severity}
D -->|Minor| E[Log Event]
D -->|Moderate| F[Alert Traffic Ops]
D -->|Severe| G[Notify Emergency]
D -->|Critical| H[Emergency Dispatch]
G --> I[Alert Dispatcher]
H --> I

Configuration

# config/accident_config.yaml
accident_detection:
enabled: true

yolox:
model_path: "yolox_x.pt"
confidence_threshold: 0.75
iou_threshold: 0.45
classes: [2, 3, 5, 7] # car, motorcycle, bus, truck

severity_classification:
minor:
vehicles: [1, 2]
blocking_lanes: 0
injuries: false

moderate:
vehicles: [2, 3]
blocking_lanes: [1, 2]
injuries: false

severe:
vehicles: [3, 5]
blocking_lanes: [2, 3]
injuries: true

critical:
vehicles: "5+"
blocking_lanes: "3+"
injuries: true
fire: true

emergency_response:
minor_delay: 300
moderate_delay: 60
severe_delay: 10
critical_delay: 0

Usage

from src.agents.analytics.accident_detection_agent import AccidentDetectionAgent

agent = AccidentDetectionAgent()

# Detect from camera
result = agent.detect_accident(camera_id="CAM_001")

if result.accident_detected:
print(f"Severity: {result.severity}")
print(f"Confidence: {result.confidence}")
print(f"Vehicles involved: {result.vehicles_count}")
print(f"Emergency notified: {result.emergency_notified}")

# Batch detection
results = agent.detect_accidents_batch(["CAM_001", "CAM_002", "CAM_003"])

API Reference

class AccidentDetectionAgent:
def detect_accident(self, camera_id: str) -> AccidentDetectionResult
def detect_accidents_batch(self, camera_ids: List[str]) -> List[AccidentDetectionResult]
def classify_severity(self, detection: Detection) -> SeverityLevel
def trigger_emergency_response(self, accident_id: str)

Data Models

@dataclass
class AccidentDetectionResult:
accident_id: str
camera_id: str
timestamp: datetime
accident_detected: bool
severity: SeverityLevel
confidence: float
vehicles_count: int
location: Location
bounding_boxes: List[BoundingBox]
emergency_notified: bool

Pattern Recognition Agent

Overview

Machine learning-based pattern detection for recurring traffic issues, anomaly detection, and predictive analytics.

Features

  • Pattern Detection: Identify recurring congestion patterns
  • Anomaly Detection: Detect unusual traffic behavior
  • Hotspot Identification: High-accident/congestion zones
  • Temporal Analysis: Time-based pattern recognition
  • Predictive Analytics: Forecast traffic patterns
  • Correlation Analysis: Multi-factor correlation

Configuration

# config/pattern_recognition.yaml
pattern_recognition:
enabled: true

models:
clustering:
algorithm: "DBSCAN"
eps: 0.5
min_samples: 5

anomaly_detection:
algorithm: "IsolationForest"
contamination: 0.1

analysis:
time_windows: [1, 6, 24, 168] # hours
confidence_threshold: 0.7
min_occurrences: 3

Usage

from src.agents.analytics.pattern_recognition_agent import PatternRecognitionAgent

agent = PatternRecognitionAgent()

# Detect patterns
patterns = agent.detect_patterns(
start_time=datetime(2025, 11, 20),
end_time=datetime(2025, 12, 5)
)

for pattern in patterns:
print(f"Pattern: {pattern.type}")
print(f"Locations: {pattern.locations}")
print(f"Time: {pattern.time_pattern}")
print(f"Confidence: {pattern.confidence}")

# Anomaly detection
anomalies = agent.detect_anomalies(camera_id="CAM_001")

# Hotspot analysis
hotspots = agent.identify_hotspots()

Congestion Detection Agent

Overview

Real-time congestion monitoring with 5-level classification, zone management, and impact assessment across HCMC.

Features

  • 5-Level Classification: Free flow → Severe congestion
  • Zone Management: 50+ traffic zones
  • Real-time Updates: Sub-minute latency
  • Speed Estimation: Per-camera speed analysis
  • Queue Length: Vehicle queue measurement
  • Impact Assessment: Delay estimation, alternate routes

Configuration

# config/congestion_config.yaml
congestion_detection:
enabled: true
update_interval: 30

classification:
free_flow: {speed_ratio: ">0.85", density: "<0.3"}
light: {speed_ratio: "0.7-0.85", density: "0.3-0.5"}
moderate: {speed_ratio: "0.5-0.7", density: "0.5-0.7"}
heavy: {speed_ratio: "0.3-0.5", density: "0.7-0.85"}
severe: {speed_ratio: "<0.3", density: ">0.85"}

zones:
- id: "ZONE_001"
name: "District 1 Central"
cameras: ["CAM_001", "CAM_002", "CAM_003"]
free_flow_speed: 40

Usage

from src.agents.analytics.congestion_detection_agent import CongestionDetectionAgent

agent = CongestionDetectionAgent()

# Detect congestion
congestion = agent.detect_congestion(zone_id="ZONE_001")
print(f"Level: {congestion.level}")
print(f"Average speed: {congestion.avg_speed} km/h")
print(f"Delay: {congestion.estimated_delay} minutes")

# Get all zones status
zones = agent.get_all_zones_status()

Computer Vision Analysis Agent

Overview

Advanced computer vision for vehicle detection, classification, speed estimation, and lane monitoring using YOLOX-X and tracking algorithms.

Features

  • Vehicle Detection: Cars, motorcycles, trucks, buses
  • Vehicle Classification: 10+ vehicle types
  • Speed Estimation: Calibrated speed measurement
  • Lane Monitoring: Lane occupation analysis
  • Vehicle Tracking: Multi-object tracking
  • Counting: Directional vehicle counting

Configuration

# config/cv_config.yaml
cv_analysis:
enabled: true

yolox:
model: "yolox_x.pt"
confidence: 0.6
iou: 0.45

tracking:
algorithm: "DeepSORT"
max_age: 30
min_hits: 3

speed_estimation:
calibration_required: true
pixel_to_meter_ratio: 0.05

Usage

from src.agents.analytics.cv_analysis_agent import CVAnalysisAgent

agent = CVAnalysisAgent()

# Analyze image
analysis = agent.analyze_image(camera_id="CAM_001")
print(f"Vehicles: {analysis.vehicle_count}")
print(f"Types: {analysis.vehicle_types}")
print(f"Average speed: {analysis.avg_speed} km/h")

# Track vehicles
tracks = agent.track_vehicles(camera_id="CAM_001", duration=60)

Citizen Report Agent

Overview

Processes citizen-submitted traffic reports with AI validation, image analysis, and priority classification for rapid response.

Features

  • AI Validation: Automatic report verification
  • Image Analysis: Computer vision on submitted photos
  • Priority Classification: Urgent, High, Medium, Low
  • Duplicate Detection: Prevent duplicate reports
  • Sentiment Analysis: Extract urgency from text
  • Geo-tagging: Automatic location extraction

Configuration

# config/citizen_report_config.yaml
citizen_report:
enabled: true

validation:
require_image: true
min_description_length: 20
auto_verify_threshold: 0.8

priority:
accident_keywords: ["accident", "crash", "collision"]
urgency_keywords: ["emergency", "urgent", "critical"]

moderation:
profanity_filter: true
spam_detection: true

Usage

from src.agents.analytics.citizen_report_agent import CitizenReportAgent

agent = CitizenReportAgent()

# Process report
report = {
"category": "accident",
"description": "Multi-vehicle accident on Nguyen Hue St",
"location": {"lat": 10.7769, "lon": 106.7009},
"images": [image_data]
}

result = agent.process_report(report)
print(f"Priority: {result.priority}")
print(f"Verified: {result.verified}")
print(f"Confidence: {result.confidence}")

Transformation Agents

NGSI-LD Transformer Agent

Overview

Transforms raw traffic data into FIWARE NGSI-LD entities compliant with Smart Data Models for context broker integration.

Features

  • FIWARE Compliance: Smart Data Models alignment
  • Multi-Entity Support: 15+ entity types
  • Batch Processing: Bulk transformations
  • Validation: Schema validation
  • Relationship Management: Entity relationships
  • Temporal Properties: Time-series support

Configuration

# config/ngsi_ld_mappings.yaml
ngsi_ld_transformer:
context: "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld"

entity_types:
- TrafficFlowObserved
- AccidentDetected
- CongestionDetected
- WeatherObserved
- AirQualityObserved

validation:
strict_mode: true
allow_custom_properties: false

Usage

from src.agents.transformation.ngsi_ld_transformer_agent import NGSILDTransformerAgent

agent = NGSILDTransformerAgent()

# Transform single entity
raw_data = {...}
entity = agent.transform_to_ngsi_ld(raw_data, "TrafficFlowObserved")

# Batch transform
entities = agent.batch_transform(raw_data_list)

# Validate
is_valid = agent.validate_entity(entity)

SOSA/SSN Mapper Agent

Overview

Maps sensor observations to W3C SOSA/SSN ontology for semantic interoperability and RDF-based data integration.

Features

  • W3C Compliance: SOSA/SSN standards
  • Sensor Modeling: Complete sensor descriptions
  • Observation Mapping: Sensor observations
  • RDF Serialization: Turtle, JSON-LD, RDF/XML
  • Ontology Alignment: Domain-specific extensions
  • SPARQL Ready: Queryable RDF graphs

Configuration

# config/sosa_mappings.yaml
sosa_ssn_mapper:
namespaces:
sosa: "http://www.w3.org/ns/sosa/"
ssn: "http://www.w3.org/ns/ssn/"
ex: "http://hcmc-traffic.example.org/"

sensor_properties:
accuracy: true
precision: true
frequency: true

Usage

from src.agents.transformation.sosa_ssn_mapper_agent import SOSASSNMapperAgent

agent = SOSASSNMapperAgent()

# Map observation
observation = agent.map_to_sosa_observation(raw_data)

# Serialize to RDF
turtle = observation.serialize(format="turtle")
jsonld = observation.serialize(format="json-ld")

# Create sensor description
sensor = agent.create_sensor_description(camera_id="CAM_001")

RDF & Linked Data Agents

NGSI-LD to RDF Agent

Overview

Converts NGSI-LD entities to RDF triples for semantic web integration and linked data publishing.

Features

  • RDF Conversion: NGSI-LD → RDF transformation
  • Multiple Formats: Turtle, JSON-LD, RDF/XML, N-Triples
  • Named Graphs: Graph organization
  • URI Generation: Stable URI schemes
  • Property Mapping: Attribute → RDF property
  • Relationship Handling: Entity relationships as RDF links

Configuration

# config/rdf_conversion.yaml
ngsi_ld_to_rdf:
base_uri: "http://hcmc-traffic.example.org/"
output_format: "turtle"

namespaces:
ex: "http://hcmc-traffic.example.org/"
ngsi-ld: "https://uri.etsi.org/ngsi-ld/"

Usage

from src.agents.rdf_linked_data.ngsi_ld_to_rdf_agent import NGSILDToRDFAgent

agent = NGSILDToRDFAgent()

# Convert entity
ngsi_entity = {...}
rdf_graph = agent.convert_to_rdf(ngsi_entity)

# Serialize
turtle = rdf_graph.serialize(format="turtle")
jsonld = rdf_graph.serialize(format="json-ld")

Triplestore Loader Agent

Overview

Manages loading, updating, and querying RDF triples in Apache Jena Fuseki triplestore with SPARQL support.

Features

  • Bulk Loading: Efficient batch triple loading
  • Named Graphs: Organized graph management
  • SPARQL Queries: SELECT, CONSTRUCT, ASK, DESCRIBE
  • SPARQL Updates: INSERT, DELETE operations
  • Incremental Updates: Delta updates
  • Transaction Support: ACID compliance

Usage

from src.agents.rdf_linked_data.triplestore_loader_agent import TriplestoreLoaderAgent

agent = TriplestoreLoaderAgent()

# Load triples
agent.load_triples(
file_path="data/observations.ttl",
graph_name="http://hcmc-traffic.example.org/graph/traffic"
)

# Execute query
results = agent.execute_query("""
PREFIX sosa: <http://www.w3.org/ns/sosa/>
SELECT ?sensor ?value WHERE {
?obs sosa:madeBySensor ?sensor ;
sosa:hasSimpleResult ?value .
}
""")

LOD Linkset Enrichment Agent

Overview

Enriches local data with Linked Open Data from DBpedia, Wikidata, and GeoNames for semantic enrichment.

Features

  • Entity Linking: Link to LOD resources
  • DBpedia Integration: Extract entity information
  • Wikidata Alignment: Cross-reference entities
  • GeoNames: Location enrichment
  • Automatic Linking: Entity resolution
  • Confidence Scoring: Link quality assessment

Configuration

# config/lod_enrichment.yaml
lod_linkset:
sources:
dbpedia:
endpoint: "https://dbpedia.org/sparql"
enabled: true

wikidata:
endpoint: "https://query.wikidata.org/sparql"
enabled: true

geonames:
api_key: "${GEONAMES_KEY}"
enabled: true

Usage

from src.agents.rdf_linked_data.lod_linkset_enrichment_agent import LODLinksetAgent

agent = LODLinksetAgent()

# Enrich location
enriched = agent.enrich_location(
name="Ho Chi Minh City",
lat=10.7769,
lon=106.7009
)

print(f"DBpedia: {enriched.dbpedia_uri}")
print(f"Wikidata: {enriched.wikidata_id}")
print(f"GeoNames: {enriched.geonames_id}")

Content Negotiation Agent

Overview

Handles HTTP content negotiation for serving RDF data in multiple formats based on client Accept headers.

Features

  • Format Negotiation: Turtle, JSON-LD, RDF/XML, N-Triples
  • HTTP Compliance: RFC 7231 content negotiation
  • Quality Values: Support q-values
  • Default Fallback: Turtle as default
  • Compression: Gzip support
  • Caching: ETag and Last-Modified headers

Configuration

# config/content_negotiation_config.yaml
content_negotiation:
formats:
- mime_type: "text/turtle"
extension: "ttl"
priority: 1

- mime_type: "application/ld+json"
extension: "jsonld"
priority: 2

- mime_type: "application/rdf+xml"
extension: "rdf"
priority: 3

Usage

from src.agents.rdf_linked_data.content_negotiation_agent import ContentNegotiationAgent

agent = ContentNegotiationAgent()

# Negotiate format
accept_header = "application/ld+json, text/turtle;q=0.8"
format = agent.negotiate_format(accept_header)

# Serve data
response = agent.serve_entity(
entity_id="urn:ngsi-ld:TrafficFlowObserved:001",
accept_header=accept_header
)

Smart Data Models Validation Agent

Overview

Validates NGSI-LD entities against FIWARE Smart Data Models schemas for standards compliance.

Features

  • Schema Validation: JSON Schema validation
  • FIWARE Compliance: Smart Data Models
  • Custom Schemas: Domain-specific extensions
  • Validation Reports: Detailed error reports
  • Batch Validation: Multiple entities
  • Auto-correction: Suggest fixes

Configuration

# config/smart_data_models_validation.yaml
validation:
schemas_path: "schemas/fiware/"
strict_mode: true

models:
- TrafficFlowObserved
- OffStreetParking
- WeatherObserved

Usage

from src.agents.rdf_linked_data.smart_data_models_validation_agent import ValidationAgent

agent = ValidationAgent()

# Validate entity
result = agent.validate_entity(entity, "TrafficFlowObserved")

if not result.is_valid:
print(f"Errors: {result.errors}")

# Batch validate
results = agent.batch_validate(entities)

Context Management Agents

Entity Publisher Agent

Overview

Manages NGSI-LD entity lifecycle in Stellio Context Broker with CRUD operations, batch processing, and subscriptions.

Features

  • CRUD Operations: Create, Read, Update, Delete
  • Batch Processing: Bulk operations
  • Temporal Entities: Time-series management
  • Subscriptions: Event notifications
  • Query Support: Complex entity queries
  • Relationship Management: Semantic links

Usage

from src.agents.context_management.entity_publisher_agent import EntityPublisherAgent

agent = EntityPublisherAgent()

# Create entity
entity = {...}
result = agent.create_entity(entity)

# Update attributes
agent.update_entity(
entity_id="urn:ngsi-ld:TrafficFlowObserved:001",
attributes={"intensity": {"type": "Property", "value": 52}}
)

# Batch operations
agent.batch_create(entities)

# Query
entities = agent.query_entities(
entity_type="TrafficFlowObserved",
geo_query={"geometry": "Point", "coordinates": [106.7009, 10.7769]}
)

Stellio State Query Agent

Overview

Queries current and historical state from Stellio Context Broker with support for temporal and geo-spatial queries.

Features

  • Entity Queries: By type, ID, attributes
  • Temporal Queries: Historical data retrieval
  • Geo-spatial Queries: Location-based search
  • Relationship Queries: Navigate entity graph
  • Aggregations: Statistical queries
  • Pagination: Large result sets

Usage

from src.agents.context_management.stellio_state_query_agent import StellioStateQueryAgent

agent = StellioStateQueryAgent()

# Get entity
entity = agent.get_entity("urn:ngsi-ld:TrafficFlowObserved:001")

# Temporal query
history = agent.query_temporal(
entity_id="urn:ngsi-ld:TrafficFlowObserved:001",
time_rel="after",
time="2025-11-29T00:00:00Z"
)

# Geo query
nearby = agent.query_geo(
entity_type="TrafficFlowObserved",
geometry="Point",
coordinates=[106.7009, 10.7769],
georel="near;maxDistance==1000"
)

Temporal Data Manager Agent

Overview

Manages time-series entity tracking with historical data preservation and temporal property updates.

Features

  • Temporal Properties: Time-aware attributes
  • Historical Tracking: Complete change history
  • Time-series Storage: Optimized TimescaleDB
  • Temporal Queries: Time range queries
  • Aggregations: Temporal statistics
  • Data Retention: Configurable retention policies

Usage

from src.agents.context_management.temporal_data_manager_agent import TemporalDataManager

agent = TemporalDataManager()

# Store temporal data
agent.store_temporal_data(entity_id="...", attribute="intensity", value=45)

# Query time range
data = agent.query_time_range(
entity_id="...",
attribute="intensity",
start_time=datetime(2025, 11, 20),
end_time=datetime(2025, 12, 5)
)

# Aggregations
stats = agent.get_temporal_stats(entity_id="...", attribute="intensity")

State Updater Agent

Overview

Synchronizes state updates across multiple systems with conflict resolution and update strategies.

Features

  • State Sync: Multi-system synchronization
  • Conflict Resolution: Merge strategies
  • Update Strategies: Last-write-wins, merge, custom
  • Transaction Support: ACID compliance
  • Event Sourcing: Complete state history
  • Retry Logic: Fault tolerance

Usage

from src.agents.context_management.state_updater_agent import StateUpdaterAgent

agent = StateUpdaterAgent()

# Update state
agent.update_state(
entity_id="...",
updates={"intensity": 52, "speed": 35.5},
strategy="merge"
)

# Batch updates
agent.batch_update_states(updates_list)

Notification Agents

Alert Dispatcher Agent

Overview

Multi-channel alert distribution system supporting email, SMS, push notifications, webhooks, and WebSocket.

Features

  • Multi-Channel: 5 delivery channels
  • Priority Routing: Urgent, High, Normal, Low
  • Template Engine: Customizable templates
  • Subscription Management: User preferences
  • Rate Limiting: Prevent alert fatigue
  • Delivery Tracking: Confirmation and retry

Usage

from src.agents.notification.alert_dispatcher_agent import AlertDispatcherAgent

agent = AlertDispatcherAgent()

# Dispatch alert
agent.dispatch_alert(
alert_type="accident",
severity="high",
title="Accident on Nguyen Hue St",
message="Multi-vehicle accident detected",
location={"lat": 10.7769, "lon": 106.7009},
channels=["email", "sms", "push"]
)

# Dispatch to specific users
agent.dispatch_to_users(
user_ids=["user1", "user2"],
alert_data={...}
)

Subscription Manager Agent

Overview

Manages user subscriptions for traffic alerts with preference management and geographic filters.

Features

  • User Subscriptions: Per-user preferences
  • Geographic Filters: Location-based subscriptions
  • Category Filters: Accident, congestion, weather
  • Frequency Control: Daily digest, real-time, hourly
  • Quiet Hours: Do-not-disturb periods
  • Multi-Device: Device-specific preferences

Configuration

# config/subscriptions.yaml
subscription_manager:
enabled: true

categories:
- accidents
- congestion
- weather
- air_quality

frequencies:
- realtime
- hourly
- daily

Usage

from src.agents.notification.subscription_manager_agent import SubscriptionManagerAgent

agent = SubscriptionManagerAgent()

# Create subscription
agent.create_subscription(
user_id="user123",
categories=["accidents", "congestion"],
location={"lat": 10.7769, "lon": 106.7009, "radius": 5000},
frequency="realtime"
)

# Update preferences
agent.update_subscription(user_id="user123", preferences={...})

# Get subscriptions
subscriptions = agent.get_user_subscriptions("user123")

Incident Report Generator Agent

Overview

Generates comprehensive incident reports with timeline reconstruction, impact analysis, and PDF export.

Features

  • Automated Reports: Incident summaries
  • Timeline Reconstruction: Event sequence
  • Impact Analysis: Traffic impact metrics
  • PDF Generation: Professional reports
  • Image Inclusion: Evidence photos
  • Multi-language: Vietnamese, English

Usage

from src.agents.notification.incident_report_generator_agent import IncidentReportGenerator

agent = IncidentReportGenerator()

# Generate report
report = agent.generate_report(incident_id="ACC_20251125_001")

# Export PDF
pdf_path = agent.export_pdf(report, output_path="reports/accident_001.pdf")

# Email report
agent.email_report(report, recipients=["ops@traffic.vn"])

Email Notification Handler Agent

Overview

Handles email notifications with SMTP integration, HTML templates, and delivery tracking.

Features

  • SMTP Integration: Multiple providers
  • HTML Templates: Rich email formatting
  • Attachment Support: Images, PDFs
  • Delivery Tracking: Open/click tracking
  • Retry Logic: Failed delivery retry
  • Batch Sending: Bulk emails

Configuration

# config/email_config.yaml
email_notification:
smtp:
host: "smtp.gmail.com"
port: 587
username: "${EMAIL_USER}"
password: "${EMAIL_PASS}"
use_tls: true

templates:
path: "templates/email/"
default: "alert.html"

Usage

from src.agents.notification.email_notification_handler_agent import EmailNotificationHandler

agent = EmailNotificationHandler()

# Send email
agent.send_email(
to="user@example.com",
subject="Traffic Alert",
template="accident_alert",
context={"accident": accident_data}
)

# Batch send
agent.send_batch_emails(recipients, template, context)

Webhook Notification Handler Agent

Overview

Delivers webhook notifications to external systems with retry logic and authentication support.

Features

  • HTTP POST: JSON payload delivery
  • Authentication: API key, Bearer token, OAuth
  • Retry Logic: Exponential backoff
  • Delivery Confirmation: Response validation
  • Timeout Handling: Configurable timeouts
  • Signature Verification: HMAC signatures

Configuration

# config/webhook_config.yaml
webhook_notification:
timeout: 30
max_retries: 3
retry_delay: 5

auth_types:
- api_key
- bearer_token
- oauth2

Usage

from src.agents.notification.webhook_notification_handler_agent import WebhookNotificationHandler

agent = WebhookNotificationHandler()

# Send webhook
result = agent.send_webhook(
url="https://api.example.com/webhooks/traffic",
payload={"event": "accident", "data": {...}},
auth={"type": "bearer_token", "token": "..."}
)

# Retry failed webhook
agent.retry_webhook(webhook_id="...")

Graph Database Agents

Neo4j Sync Agent

Overview

Synchronizes traffic data to Neo4j graph database for relationship analysis and graph queries.

Features

  • Graph Sync: Entity → Node mapping
  • Relationship Creation: Semantic edges
  • Batch Sync: Bulk operations
  • Incremental Updates: Delta sync
  • Schema Management: Graph schema
  • Index Optimization: Performance indexes

Configuration

# config/neo4j_sync.yaml
neo4j_sync:
uri: "bolt://localhost:7687"
username: "neo4j"
password: "${NEO4J_PASSWORD}"
database: "traffic"

sync_interval: 60
batch_size: 1000

Usage

from src.agents.graph_database.neo4j_sync_agent import Neo4jSyncAgent

agent = Neo4jSyncAgent()

# Sync entity
agent.sync_entity(entity_id="...", entity_data={...})

# Batch sync
agent.batch_sync(entities)

# Create relationship
agent.create_relationship(
from_id="camera:CAM_001",
to_id="accident:ACC_001",
relationship_type="DETECTED"
)

Neo4j Query Agent

Overview

Executes Cypher queries on Neo4j for graph analysis, path finding, and relationship traversal.

Features

  • Cypher Queries: Full Cypher support
  • Path Finding: Shortest path, all paths
  • Relationship Traversal: Multi-hop queries
  • Pattern Matching: Graph patterns
  • Aggregations: Graph statistics
  • Subgraph Extraction: Graph projections

Usage

from src.agents.graph_database.neo4j_query_agent import Neo4jQueryAgent

agent = Neo4jQueryAgent()

# Execute query
results = agent.execute_query("""
MATCH (c:Camera)-[:DETECTED]->(a:Accident)
WHERE a.severity = 'severe'
RETURN c.id, a.timestamp
""")

# Find path
path = agent.find_shortest_path(
start_node="camera:CAM_001",
end_node="accident:ACC_001"
)

Integration Agents

API Gateway Agent

Overview

Central API gateway handling authentication, rate limiting, routing, and request validation.

Features

  • Request Routing: Path-based routing
  • Authentication: JWT, API keys
  • Rate Limiting: Per-user limits
  • Request Validation: Schema validation
  • Load Balancing: Service distribution
  • Logging: Request/response logging

Configuration

# config/api_gateway_config.yaml
api_gateway:
port: 8080
cors:
origins: ["https://traffic.example.com"]

rate_limiting:
default: 1000 # requests per hour
burst: 20 # requests per second

routes:
- path: "/api/cameras"
service: "camera_service"
methods: ["GET", "POST"]

Usage

from src.agents.integration.api_gateway_agent import APIGatewayAgent

agent = APIGatewayAgent()

# Start gateway
agent.start(port=8080)

# Add route
agent.add_route(
path="/api/custom",
handler=custom_handler,
methods=["GET", "POST"]
)

Cache Manager Agent

Overview

Manages multi-tier caching strategy with Redis and memory cache for performance optimization.

Features

  • Multi-Tier Caching: Memory → Redis → Database
  • TTL Management: Expiration policies
  • Cache Invalidation: Selective clearing
  • Cache Warming: Preload frequently accessed data
  • Hit Rate Tracking: Performance metrics
  • Distributed Cache: Redis cluster support

Configuration

# config/cache_config.yaml
cache_manager:
redis:
host: "localhost"
port: 6379
db: 0

memory:
max_size: 1000
ttl: 60

default_ttl: 300

Usage

from src.agents.integration.cache_manager_agent import CacheManagerAgent

agent = CacheManagerAgent()

# Cache data
agent.set("camera:CAM_001:image", image_data, ttl=300)

# Get cached data
data = agent.get("camera:CAM_001:image")

# Invalidate cache
agent.invalidate("camera:CAM_001:*")

# Get stats
stats = agent.get_cache_stats()

Monitoring Agents

Health Check Agent

Overview

Monitors system health with endpoint checks, service availability, and automated alerting.

Features

  • Endpoint Monitoring: HTTP health checks
  • Service Availability: Service status tracking
  • Response Time Tracking: Latency monitoring
  • Automated Alerts: Health degradation alerts
  • Dashboard Integration: Grafana integration
  • Custom Checks: Pluggable health checks

Configuration

# config/health_check_config.yaml
health_check:
enabled: true
interval: 30

endpoints:
- name: "Stellio"
url: "http://stellio:8080/health"
timeout: 5

- name: "MongoDB"
type: "database"
connection: "${MONGO_URI}"

Usage

from src.agents.monitoring.health_check_agent import HealthCheckAgent

agent = HealthCheckAgent()

# Check system health
health = agent.check_system_health()
print(f"Status: {health.status}")
print(f"Services: {health.services}")

# Check specific service
stellio_health = agent.check_service("stellio")

Performance Monitor Agent

Overview

Tracks system performance metrics including CPU, memory, request latency, and throughput.

Features

  • System Metrics: CPU, memory, disk
  • Application Metrics: Request latency, throughput
  • Agent Metrics: Per-agent performance
  • Custom Metrics: User-defined metrics
  • Prometheus Export: Metrics export
  • Alert Thresholds: Performance alerts

Configuration

# config/performance_monitor_config.yaml
performance_monitor:
enabled: true
collection_interval: 60

metrics:
- cpu_usage
- memory_usage
- request_latency
- throughput

thresholds:
cpu_warning: 70
cpu_critical: 90
memory_warning: 80
memory_critical: 95

Usage

from src.agents.monitoring.performance_monitor_agent import PerformanceMonitorAgent

agent = PerformanceMonitorAgent()

# Get current metrics
metrics = agent.get_current_metrics()

# Get agent performance
agent_perf = agent.get_agent_performance("accident_detection")

# Track custom metric
agent.track_metric("custom_metric", value=100)

Data Quality Validator Agent

Overview

Validates data quality with completeness, accuracy, consistency, and timeliness checks.

Features

  • Completeness Validation: Missing data detection
  • Accuracy Validation: Value range checks
  • Consistency Validation: Cross-field validation
  • Timeliness Validation: Freshness checks
  • Quality Reports: Detailed reports
  • Auto-correction: Data cleaning

Configuration

# config/data_quality_config.yaml
data_quality:
enabled: true

validation_rules:
- field: "intensity"
type: "range"
min: 0
max: 200

- field: "location"
type: "required"

- field: "timestamp"
type: "freshness"
max_age: 300

Usage

from src.agents.monitoring.data_quality_validator_agent import DataQualityValidator

agent = DataQualityValidator()

# Validate data
result = agent.validate(data)

if not result.is_valid:
print(f"Issues: {result.issues}")

# Generate report
report = agent.generate_quality_report()

State Management Agents

State Manager Agent

Overview

Central state management coordinating state across all agents with event sourcing and state persistence.

Features

  • Centralized State: Single source of truth
  • Event Sourcing: Complete state history
  • State Persistence: MongoDB storage
  • State Snapshots: Point-in-time recovery
  • State Queries: Query current/historical state
  • State Subscriptions: State change notifications

Usage

from src.agents.state_management.state_manager_agent import StateManagerAgent

agent = StateManagerAgent()

# Get state
state = agent.get_state("system")

# Update state
agent.update_state("traffic", {"congestion_level": "high"})

# Subscribe to changes
agent.subscribe_to_state_changes("traffic", callback=on_change)

Accident State Manager Agent

Overview

Manages accident lifecycle state from detection through resolution with status tracking.

Features

  • Lifecycle Management: Detection → Resolution
  • Status Tracking: Active, investigating, resolved
  • Event Timeline: Complete accident timeline
  • Emergency Coordination: Emergency service status
  • Impact Assessment: Traffic impact tracking
  • Historical Records: Accident database

Usage

from src.agents.state_management.accident_state_manager_agent import AccidentStateManager

agent = AccidentStateManager()

# Create accident
agent.create_accident(accident_data)

# Update status
agent.update_accident_status(
accident_id="ACC_001",
status="investigating"
)

# Get active accidents
active = agent.get_active_accidents()

Congestion State Manager Agent

Overview

Tracks congestion state evolution with zone-level monitoring and historical trending.

Features

  • Zone Monitoring: Per-zone congestion state
  • Level Tracking: 5-level congestion tracking
  • State Transitions: Congestion evolution
  • Historical Trends: Pattern analysis
  • Prediction: Congestion forecasting
  • Alert Triggers: Threshold-based alerts

Usage

from src.agents.state_management.congestion_state_manager_agent import CongestionStateManager

agent = CongestionStateManager()

# Update zone state
agent.update_zone_state(
zone_id="ZONE_001",
level="heavy",
avg_speed=25.3
)

# Get zone state
state = agent.get_zone_state("ZONE_001")

# Get trends
trends = agent.get_congestion_trends(zone_id="ZONE_001", hours=24)

Temporal State Tracker Agent

Overview

Tracks temporal state changes with time-series analysis and state prediction.

Features

  • Temporal Tracking: Time-aware state changes
  • Change Detection: State transition detection
  • Trend Analysis: Temporal patterns
  • State Prediction: Future state forecasting
  • Anomaly Detection: Unusual state changes
  • Rollback Support: State reversion

Usage

from src.agents.state_management.temporal_state_tracker_agent import TemporalStateTracker

agent = TemporalStateTracker()

# Track state change
agent.track_state_change(
entity_id="...",
attribute="congestion_level",
value="high",
timestamp=datetime.now()
)

# Get state history
history = agent.get_state_history(
entity_id="...",
attribute="congestion_level",
start_time=...,
end_time=...
)

# Predict future state
prediction = agent.predict_state(entity_id="...", hours_ahead=2)

Additional Agents

Cache Invalidator Agent

Overview

Intelligent cache invalidation based on data changes, events, and TTL policies.

Features

  • Event-Based Invalidation: Automatic invalidation
  • Selective Invalidation: Pattern-based clearing
  • Cascade Invalidation: Related cache clearing
  • TTL Management: Expiration handling
  • Invalidation Tracking: Audit log
  • Performance Optimization: Minimal invalidation

Usage

from src.agents.cache_invalidator_agent import CacheInvalidatorAgent

agent = CacheInvalidatorAgent()

# Invalidate cache
agent.invalidate_cache("camera:CAM_001:*")

# Invalidate by event
agent.invalidate_by_event("accident_detected", event_data)

# Cascade invalidation
agent.cascade_invalidate("accident:ACC_001")

Kafka Entity Publisher Agent

Overview

Publishes NGSI-LD entities to Kafka topics for event streaming and microservices integration.

Features

  • Kafka Integration: Apache Kafka publishing
  • Topic Management: Dynamic topic creation
  • Partitioning: Partition key support
  • Serialization: JSON, Avro, Protobuf
  • Delivery Guarantees: At-least-once, exactly-once
  • Schema Registry: Schema management

Configuration

# config/kafka_config.yaml
kafka_publisher:
bootstrap_servers: ["localhost:9092"]

topics:
traffic_flow: "traffic.flow"
accidents: "traffic.accidents"
congestion: "traffic.congestion"

serialization: "json"

Usage

from src.agents.kafka_entity_publisher_agent import KafkaEntityPublisher

agent = KafkaEntityPublisher()

# Publish entity
agent.publish_entity(
topic="traffic.accidents",
entity=accident_entity,
key=accident_id
)

# Batch publish
agent.batch_publish(entities, topic="traffic.flow")

License

MIT License - Copyright (c) 2025 UIP Contributors (Nguyễn Nhật Quang, Nguyễn Việt Hoàng, Nguyễn Đình Anh Tuấn)

See LICENSE for details.