Chuyển tới nội dung chính

Agent System Overview

The HCMC Traffic Monitoring System is powered by 30+ specialized agents that work together to collect, process, analyze, and publish traffic data.

🤖 What is an Agent?

An agent is a self-contained Python module that performs a specific task in the data pipeline. Each agent:

  • ✅ Has a single responsibility (e.g., fetch camera images, detect accidents, publish to Kafka)
  • ✅ Can run independently or as part of an orchestrated workflow
  • ✅ Implements the BaseAgent interface
  • ✅ Returns structured AgentResult objects
  • ✅ Handles errors gracefully with retry logic

🏗️ Agent Architecture

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum

class AgentStatus(Enum):
SUCCESS = "success"
FAILURE = "failure"
PARTIAL = "partial"
SKIPPED = "skipped"

@dataclass
class AgentResult:
status: AgentStatus
data: dict
errors: list
execution_time: float

class BaseAgent(ABC):
@abstractmethod
async def execute(self, context: dict) -> AgentResult:
"""Execute agent logic"""
pass

📊 Agent Categories

1. Data Collection Agents (3 agents)

Fetch external data from APIs and sensors.

AgentPurposeFrequencyOutputStatus
CameraImageFetchAgentFetch images from 1,000+ camerasEvery 30sRaw images + metadata🟢 Active
WeatherIntegrationAgentFetch weather data from OpenWeather APIEvery 5minTemperature, humidity, wind, precipitation🟢 Active
AirQualityAgentFetch AQI data from AQICN APIEvery 10minPM2.5, PM10, NO2, O3, AQI index🟢 Active

2. Analytics Agents (5 agents)

Process and analyze data for insights.

AgentPurposeFrequencyTechnology
AccidentDetectionAgentDetect accidents using YOLOX-XReal-timeComputer Vision (YOLOX-X)
CongestionAnalysisAgentAnalyze traffic congestion patternsEvery 2minStatistical analysis
PatternRecognitionAgentIdentify recurring traffic patternsHourlyMachine Learning
CVAnalysisAgentAdvanced computer vision analysisReal-timeOpenCV + PyTorch
CitizenReportAgentProcess citizen-submitted reportsOn-demandNLP + Geo-analysis

3. Transformation Agents (2 agents)

Convert data between formats.

AgentInput FormatOutput FormatStandard
NGSILDTransformerAgentPython objectsNGSI-LD JSON-LDETSI CIM 009
SOSASSNMapperAgentTraffic dataRDF/TurtleW3C SOSA/SSN

4. RDF & Linked Data Agents (5 agents)

Manage semantic web and linked data.

AgentPurposeTechnologyEndpoint
NGSILDToRDFAgentConvert NGSI-LD to RDFRDFLib-
TriplestoreLoaderAgentLoad RDF to FusekiSPARQL UPDATEhttp://fuseki:3030
LODLinksetEnrichmentAgentEnrich with LOD linksDBpedia, Wikidata-
ContentNegotiationAgentServe multiple RDF formatsHTTP Content-Typeapplication/ld+json, text/turtle
SmartDataModelsValidationAgentValidate FIWARE modelsJSON SchemaFIWARE Smart Data Models

5. Context Management Agents (4 agents)

Manage NGSI-LD Context Broker.

AgentOperationBrokerProtocol
EntityPublisherAgentCreate/Update entitiesStellioNGSI-LD API
StellioStateQueryAgentQuery current stateStellioNGSI-LD Query
TemporalDataManagerAgentManage temporal dataStellioNGSI-LD Temporal
StateUpdaterAgentUpdate entity stateStellioNGSI-LD PATCH

6. Notification Agents (5 agents)

Handle alerts and notifications.

AgentChannelPriorityLatency
AlertDispatcherAgentEmail, SMS, WebhookHigh<1s
EmailNotificationAgentSMTPMedium<5s
SMSNotificationAgentTwilio APIHigh<2s
WebhookNotificationAgentHTTP POSTMedium<3s
PushNotificationAgentFirebaseHigh<1s

7. Graph Database Agents (2 agents)

Manage Neo4j graph relationships.

AgentPurposeQuery LanguageDatabase
Neo4jSyncAgentSync relationshipsCypherNeo4j 5.11
Neo4jQueryAgentQuery graph patternsCypherNeo4j 5.11

8. Integration Agents (2 agents)

API gateway and caching.

AgentPurposeTechnologyPort
APIGatewayAgentREST API gatewayFastAPI8000
CacheManagerAgentMulti-tier cachingRedis6379

9. Monitoring Agents (3 agents)

System health and performance.

AgentMetricsFrequencyAlerts
HealthCheckAgentService statusEvery 30s
PerformanceMonitorAgentCPU, Memory, LatencyEvery 10s
DataQualityValidatorAgentData completeness, accuracyEvery 5min

10. State Management Agents (4 agents)

Manage application state.

AgentScopeStoragePersistence
StateManagerAgentGlobal stateRedisIn-memory
AccidentStateManagerAgentAccident lifecycleMongoDBPersistent
CongestionStateManagerAgentCongestion zonesMongoDBPersistent
TemporalStateTrackerAgentHistorical statesTimescaleDBTime-series
CameraImageFetchAgentFetch images from 1,000+ camerasEvery 30sRaw images + metadata
WeatherIntegrationAgentGet weather data (OpenWeatherMap)Every 5 minTemperature, humidity, conditions
AirQualityAgentFetch AQI data from sensorsEvery 10 minPM2.5, PM10, AQI index

Example: CameraImageFetchAgent

class CameraImageFetchAgent(BaseAgent):
async def execute(self, context):
cameras = await self.get_camera_list()
results = []

for camera in cameras:
try:
image = await self.fetch_image(camera.url)
results.append({
'camera_id': camera.id,
'image': image,
'timestamp': datetime.now()
})
except Exception as e:
self.logger.error(f"Failed to fetch {camera.id}: {e}")

return AgentResult(
status=AgentStatus.SUCCESS,
data={'images': results},
errors=[],
execution_time=time.time() - start_time
)

2. Ingestion Agents (2 agents)

Accept data from external sources.

AgentPurposeProtocolEndpoint
CitizenIngestionAgentAccept citizen-submitted reportsFastAPI RESTPOST /citizen-reports
RealTimeStreamAgentIngest streaming dataKafka Consumertraffic-events topic

Example: CitizenIngestionAgent

@app.post("/citizen-reports")
async def submit_report(report: CitizenReport):
# Validate report
if not report.location or not report.type:
raise HTTPException(400, "Missing required fields")

# Store in MongoDB
result = await db.citizen_reports.insert_one(report.dict())

# Verify with YOLOX if image provided
if report.image:
verification = await verify_with_yolo(report.image)
await db.citizen_reports.update_one(
{'_id': result.inserted_id},
{'$set': {'verification': verification}}
)

return {'id': str(result.inserted_id), 'status': 'pending'}

3. Analytics Agents (4 agents)

Analyze data for insights.

AgentPurposeInputOutput
AccidentDetectionAgentDetect accidents from imagesCamera imagesAccident events with severity
PatternRecognitionAgentIdentify traffic patternsHistorical dataPattern types, locations
CongestionAnalysisAgentAnalyze traffic congestionVehicle countsCongestion level (0-100)
PredictiveAnalyticsAgentForecast future trafficTime-series dataPredictions with confidence

Example: AccidentDetectionAgent (YOLOX)

from yolox.exp import get_exp
from yolox.utils import get_model_info

class AccidentDetectionAgent(BaseAgent):
def __init__(self):
self.exp = get_exp(None, "yolox-x")
self.model = self.exp.get_model()

async def execute(self, context):
images = context.get('images', [])
accidents = []

for img_data in images:
results = self.model(img_data['image'])

# Detect crashes, fires, damaged vehicles
for detection in results:
if detection.cls in [0, 1, 2]: # Accident classes
accidents.append({
'camera_id': img_data['camera_id'],
'type': detection.cls,
'confidence': detection.conf,
'bbox': detection.bbox,
'timestamp': img_data['timestamp']
})

return AgentResult(
status=AgentStatus.SUCCESS,
data={'accidents': accidents},
errors=[],
execution_time=...
)

4. Transformation Agents (2 agents)

Transform data between formats.

AgentPurposeInput FormatOutput Format
NGSILDTransformerAgentConvert to NGSI-LDRaw JSONNGSI-LD entities
SOSASSNMapperAgentMap to SOSA/SSN ontologyNGSI-LD entitiesRDF triples

Example: NGSILDTransformerAgent

class NGSILDTransformerAgent(BaseAgent):
async def execute(self, context):
raw_data = context.get('data')
entities = []

for item in raw_data:
entity = {
'id': f"urn:ngsi-ld:Accident:{item['id']}",
'type': 'Accident',
'@context': [
'https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld',
'https://raw.githubusercontent.com/smart-data-models/dataModel.Transportation/master/context.jsonld'
],
'location': {
'type': 'GeoProperty',
'value': {
'type': 'Point',
'coordinates': [item['lon'], item['lat']]
}
},
'severity': {
'type': 'Property',
'value': item['severity']
},
'observedAt': {
'type': 'Property',
'value': item['timestamp']
}
}
entities.append(entity)

return AgentResult(
status=AgentStatus.SUCCESS,
data={'entities': entities},
errors=[],
execution_time=...
)

5. Context Management Agents (4 agents)

Manage NGSI-LD context information.

AgentPurposeTechnologyEndpoint
EntityPublisherAgentPublish entities to StellioStellioPOST /ngsi-ld/v1/entities
StellioStateQueryAgentQuery current stateStellioGET /ngsi-ld/v1/entities
TemporalDataManagerAgentManage temporal attributesTimescaleDBINSERT INTO temporal_data
StateUpdaterAgentUpdate entity statesStellioPATCH /ngsi-ld/v1/entities/{id}

6. RDF & Linked Data Agents (5 agents)

Publish data as Linked Open Data.

AgentPurposeTechnologyOutput
NGSILDToRDFAgentConvert NGSI-LD to RDFRDFLibRDF/XML, Turtle
TriplestoreLoaderAgentLoad triples to FusekiSPARQL UpdateHTTP 200
LODLinksetEnrichmentAgentEnrich with external LODDBpedia, GeoNamesowl:sameAs links
ContentNegotiationAgentServe data in multiple formatsFlaskJSON-LD, RDF/XML, Turtle
SmartDataModelsValidationAgentValidate against schemasJSON SchemaValidation errors

Example: LODLinksetEnrichmentAgent

class LODLinksetEnrichmentAgent(BaseAgent):
async def execute(self, context):
entities = context.get('entities')
enriched = []

for entity in entities:
# Link to GeoNames
if 'location' in entity:
geonames_uri = await self.find_geonames_match(
entity['location']['value']['coordinates']
)
if geonames_uri:
entity['sameAs'] = {
'type': 'Relationship',
'object': geonames_uri
}

# Link to DBpedia
if 'name' in entity:
dbpedia_uri = await self.find_dbpedia_match(
entity['name']['value']
)
if dbpedia_uri:
entity['seeAlso'] = {
'type': 'Relationship',
'object': dbpedia_uri
}

enriched.append(entity)

return AgentResult(
status=AgentStatus.SUCCESS,
data={'enriched_entities': enriched},
errors=[],
execution_time=...
)

7. Graph Database Agents (2 agents)

Manage Neo4j graph database.

AgentPurposeQuery TypeExample
Neo4jSyncAgentSync entities to Neo4jCREATE/MERGEMERGE (c:Camera {id: $id})
Neo4jQueryAgentExecute Cypher queriesMATCHMATCH (c:Camera)-[:DETECTED]->(a:Accident)

8. Integration Agents (2 agents)

Infrastructure integration.

AgentPurposeTechnologyFeatures
APIGatewayAgentAPI gateway with rate limitingFastAPIRate limit, circuit breaker
CacheManagerAgentMulti-level cachingRedisL1 (memory), L2 (Redis), L3 (disk)

Example: CacheManagerAgent (3-Level Cache)

class CacheManagerAgent(BaseAgent):
def __init__(self):
self.l1_cache = {} # In-memory
self.l2_cache = redis.Redis() # Redis
self.l3_cache = DiskCache() # Disk

async def get(self, key):
# Try L1 (fastest)
if key in self.l1_cache:
return self.l1_cache[key]

# Try L2 (fast)
value = await self.l2_cache.get(key)
if value:
self.l1_cache[key] = value
return value

# Try L3 (slow but persistent)
value = await self.l3_cache.get(key)
if value:
self.l2_cache.set(key, value, ex=3600)
self.l1_cache[key] = value
return value

return None

9. Monitoring Agents (3 agents)

System health and performance.

AgentPurposeMetricsAlerts
HealthCheckAgentService health checksUp/down status, latencyEmail, Slack
PerformanceMonitorAgentPerformance metricsCPU, memory, throughputGrafana dashboard
DataQualityValidatorAgentData quality checksCompleteness, accuracyData quality reports

10. State Management Agents (4 agents)

Track and manage state.

AgentPurposeState TypeStorage
StateManagerAgentGeneral state managementGlobal stateRedis
AccidentStateManagerAgentTrack accident lifecycleactive/resolved/false_alarmMongoDB
CongestionStateManagerAgentTrack congestion eventslow/medium/high/criticalTimescaleDB
TemporalStateTrackerAgentHistorical state trackingTime-series stateTimescaleDB

🔄 Agent Orchestration

Execution Flow

graph LR
A[Orchestrator] -->|1. Schedule| B[Data Collection Agents]
B -->|2. Images| C[Analytics Agents]
C -->|3. Detected Accidents| D[Transformation Agents]
D -->|4. NGSI-LD Entities| E[Context Management Agents]
E -->|5. Published| F[RDF/LOD Agents]
F -->|6. RDF Triples| G[Graph Database Agents]
C -->|7. Metrics| H[Monitoring Agents]

Scheduling Strategies

  1. Time-based: Run every N seconds/minutes
  2. Event-driven: Trigger on Kafka messages
  3. Dependency-based: Run after another agent completes
  4. Manual: Triggered by API call

Example: Orchestrator Configuration

# config/workflow.yaml
agents:
- name: CameraImageFetchAgent
schedule: "*/30 * * * * *" # Every 30 seconds
timeout: 60
retry: 3
dependencies: []

- name: AccidentDetectionAgent
schedule: null # Event-driven
trigger: CameraImageFetchAgent.SUCCESS
dependencies: [CameraImageFetchAgent]

- name: NGSILDTransformerAgent
schedule: null
trigger: AccidentDetectionAgent.SUCCESS
dependencies: [AccidentDetectionAgent]

📖 Agent Documentation

Each agent has detailed documentation:

🛠️ Creating Your Own Agent

See the guide: Adding a New Agent


Next: Explore individual agents in the Data Collection section.