Skip to main content

Agent System Overview

The HCMC Traffic Monitoring System is powered by 30+ specialized agents that work together to collect, process, analyze, and publish traffic data.

πŸ€– What is an Agent?​

An agent is a self-contained Python module that performs a specific task in the data pipeline. Each agent:

  • βœ… Has a single responsibility (e.g., fetch camera images, detect accidents, publish to Kafka)
  • βœ… Can run independently or as part of an orchestrated workflow
  • βœ… Implements the BaseAgent interface
  • βœ… Returns structured AgentResult objects
  • βœ… Handles errors gracefully with retry logic

πŸ—οΈ Agent Architecture​

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum

class AgentStatus(Enum):
SUCCESS = "success"
FAILURE = "failure"
PARTIAL = "partial"
SKIPPED = "skipped"

@dataclass
class AgentResult:
status: AgentStatus
data: dict
errors: list
execution_time: float

class BaseAgent(ABC):
@abstractmethod
async def execute(self, context: dict) -> AgentResult:
"""Execute agent logic"""
pass

πŸ“Š Agent Categories​

1. Data Collection Agents (3 agents)​

Fetch external data from APIs and sensors.

AgentPurposeFrequencyOutputStatus
CameraImageFetchAgentFetch images from 1,000+ camerasEvery 30sRaw images + metadata🟒 Active
WeatherIntegrationAgentFetch weather data from OpenWeather APIEvery 5minTemperature, humidity, wind, precipitation🟒 Active
AirQualityAgentFetch AQI data from AQICN APIEvery 10minPM2.5, PM10, NO2, O3, AQI index🟒 Active

2. Analytics Agents (5 agents)​

Process and analyze data for insights.

AgentPurposeFrequencyTechnology
AccidentDetectionAgentDetect accidents using YOLOX-XReal-timeComputer Vision (YOLOX-X)
CongestionAnalysisAgentAnalyze traffic congestion patternsEvery 2minStatistical analysis
PatternRecognitionAgentIdentify recurring traffic patternsHourlyMachine Learning
CVAnalysisAgentAdvanced computer vision analysisReal-timeOpenCV + PyTorch
CitizenReportAgentProcess citizen-submitted reportsOn-demandNLP + Geo-analysis

3. Transformation Agents (2 agents)​

Convert data between formats.

AgentInput FormatOutput FormatStandard
NGSILDTransformerAgentPython objectsNGSI-LD JSON-LDETSI CIM 009
SOSASSNMapperAgentTraffic dataRDF/TurtleW3C SOSA/SSN

4. RDF & Linked Data Agents (5 agents)​

Manage semantic web and linked data.

AgentPurposeTechnologyEndpoint
NGSILDToRDFAgentConvert NGSI-LD to RDFRDFLib-
TriplestoreLoaderAgentLoad RDF to FusekiSPARQL UPDATEhttp://fuseki:3030
LODLinksetEnrichmentAgentEnrich with LOD linksDBpedia, Wikidata-
ContentNegotiationAgentServe multiple RDF formatsHTTP Content-Typeapplication/ld+json, text/turtle
SmartDataModelsValidationAgentValidate FIWARE modelsJSON SchemaFIWARE Smart Data Models

5. Context Management Agents (4 agents)​

Manage NGSI-LD Context Broker.

AgentOperationBrokerProtocol
EntityPublisherAgentCreate/Update entitiesStellioNGSI-LD API
StellioStateQueryAgentQuery current stateStellioNGSI-LD Query
TemporalDataManagerAgentManage temporal dataStellioNGSI-LD Temporal
StateUpdaterAgentUpdate entity stateStellioNGSI-LD PATCH

6. Notification Agents (5 agents)​

Handle alerts and notifications.

AgentChannelPriorityLatency
AlertDispatcherAgentEmail, SMS, WebhookHigh<1s
EmailNotificationAgentSMTPMedium<5s
SMSNotificationAgentTwilio APIHigh<2s
WebhookNotificationAgentHTTP POSTMedium<3s
PushNotificationAgentFirebaseHigh<1s

7. Graph Database Agents (2 agents)​

Manage Neo4j graph relationships.

AgentPurposeQuery LanguageDatabase
Neo4jSyncAgentSync relationshipsCypherNeo4j 5.11
Neo4jQueryAgentQuery graph patternsCypherNeo4j 5.11

8. Integration Agents (2 agents)​

API gateway and caching.

AgentPurposeTechnologyPort
APIGatewayAgentREST API gatewayFastAPI8000
CacheManagerAgentMulti-tier cachingRedis6379

9. Monitoring Agents (3 agents)​

System health and performance.

AgentMetricsFrequencyAlerts
HealthCheckAgentService statusEvery 30sβœ…
PerformanceMonitorAgentCPU, Memory, LatencyEvery 10sβœ…
DataQualityValidatorAgentData completeness, accuracyEvery 5minβœ…

10. State Management Agents (4 agents)​

Manage application state.

AgentScopeStoragePersistence
StateManagerAgentGlobal stateRedisIn-memory
AccidentStateManagerAgentAccident lifecycleMongoDBPersistent
CongestionStateManagerAgentCongestion zonesMongoDBPersistent
TemporalStateTrackerAgentHistorical statesTimescaleDBTime-series
CameraImageFetchAgentFetch images from 1,000+ camerasEvery 30sRaw images + metadata
WeatherIntegrationAgentGet weather data (OpenWeatherMap)Every 5 minTemperature, humidity, conditions
AirQualityAgentFetch AQI data from sensorsEvery 10 minPM2.5, PM10, AQI index

Example: CameraImageFetchAgent

class CameraImageFetchAgent(BaseAgent):
async def execute(self, context):
cameras = await self.get_camera_list()
results = []

for camera in cameras:
try:
image = await self.fetch_image(camera.url)
results.append({
'camera_id': camera.id,
'image': image,
'timestamp': datetime.now()
})
except Exception as e:
self.logger.error(f"Failed to fetch {camera.id}: {e}")

return AgentResult(
status=AgentStatus.SUCCESS,
data={'images': results},
errors=[],
execution_time=time.time() - start_time
)

2. Ingestion Agents (2 agents)​

Accept data from external sources.

AgentPurposeProtocolEndpoint
CitizenIngestionAgentAccept citizen-submitted reportsFastAPI RESTPOST /citizen-reports
RealTimeStreamAgentIngest streaming dataKafka Consumertraffic-events topic

Example: CitizenIngestionAgent

@app.post("/citizen-reports")
async def submit_report(report: CitizenReport):
# Validate report
if not report.location or not report.type:
raise HTTPException(400, "Missing required fields")

# Store in MongoDB
result = await db.citizen_reports.insert_one(report.dict())

# Verify with YOLOX if image provided
if report.image:
verification = await verify_with_yolo(report.image)
await db.citizen_reports.update_one(
{'_id': result.inserted_id},
{'$set': {'verification': verification}}
)

return {'id': str(result.inserted_id), 'status': 'pending'}

3. Analytics Agents (4 agents)​

Analyze data for insights.

AgentPurposeInputOutput
AccidentDetectionAgentDetect accidents from imagesCamera imagesAccident events with severity
PatternRecognitionAgentIdentify traffic patternsHistorical dataPattern types, locations
CongestionAnalysisAgentAnalyze traffic congestionVehicle countsCongestion level (0-100)
PredictiveAnalyticsAgentForecast future trafficTime-series dataPredictions with confidence

Example: AccidentDetectionAgent (YOLOX)

from yolox.exp import get_exp
from yolox.utils import get_model_info

class AccidentDetectionAgent(BaseAgent):
def __init__(self):
self.exp = get_exp(None, "yolox-x")
self.model = self.exp.get_model()

async def execute(self, context):
images = context.get('images', [])
accidents = []

for img_data in images:
results = self.model(img_data['image'])

# Detect crashes, fires, damaged vehicles
for detection in results:
if detection.cls in [0, 1, 2]: # Accident classes
accidents.append({
'camera_id': img_data['camera_id'],
'type': detection.cls,
'confidence': detection.conf,
'bbox': detection.bbox,
'timestamp': img_data['timestamp']
})

return AgentResult(
status=AgentStatus.SUCCESS,
data={'accidents': accidents},
errors=[],
execution_time=...
)

4. Transformation Agents (2 agents)​

Transform data between formats.

AgentPurposeInput FormatOutput Format
NGSILDTransformerAgentConvert to NGSI-LDRaw JSONNGSI-LD entities
SOSASSNMapperAgentMap to SOSA/SSN ontologyNGSI-LD entitiesRDF triples

Example: NGSILDTransformerAgent

class NGSILDTransformerAgent(BaseAgent):
async def execute(self, context):
raw_data = context.get('data')
entities = []

for item in raw_data:
entity = {
'id': f"urn:ngsi-ld:Accident:{item['id']}",
'type': 'Accident',
'@context': [
'https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld',
'https://raw.githubusercontent.com/smart-data-models/dataModel.Transportation/master/context.jsonld'
],
'location': {
'type': 'GeoProperty',
'value': {
'type': 'Point',
'coordinates': [item['lon'], item['lat']]
}
},
'severity': {
'type': 'Property',
'value': item['severity']
},
'observedAt': {
'type': 'Property',
'value': item['timestamp']
}
}
entities.append(entity)

return AgentResult(
status=AgentStatus.SUCCESS,
data={'entities': entities},
errors=[],
execution_time=...
)

5. Context Management Agents (4 agents)​

Manage NGSI-LD context information.

AgentPurposeTechnologyEndpoint
EntityPublisherAgentPublish entities to StellioStellioPOST /ngsi-ld/v1/entities
StellioStateQueryAgentQuery current stateStellioGET /ngsi-ld/v1/entities
TemporalDataManagerAgentManage temporal attributesTimescaleDBINSERT INTO temporal_data
StateUpdaterAgentUpdate entity statesStellioPATCH /ngsi-ld/v1/entities/{id}

6. RDF & Linked Data Agents (5 agents)​

Publish data as Linked Open Data.

AgentPurposeTechnologyOutput
NGSILDToRDFAgentConvert NGSI-LD to RDFRDFLibRDF/XML, Turtle
TriplestoreLoaderAgentLoad triples to FusekiSPARQL UpdateHTTP 200
LODLinksetEnrichmentAgentEnrich with external LODDBpedia, GeoNamesowl:sameAs links
ContentNegotiationAgentServe data in multiple formatsFlaskJSON-LD, RDF/XML, Turtle
SmartDataModelsValidationAgentValidate against schemasJSON SchemaValidation errors

Example: LODLinksetEnrichmentAgent

class LODLinksetEnrichmentAgent(BaseAgent):
async def execute(self, context):
entities = context.get('entities')
enriched = []

for entity in entities:
# Link to GeoNames
if 'location' in entity:
geonames_uri = await self.find_geonames_match(
entity['location']['value']['coordinates']
)
if geonames_uri:
entity['sameAs'] = {
'type': 'Relationship',
'object': geonames_uri
}

# Link to DBpedia
if 'name' in entity:
dbpedia_uri = await self.find_dbpedia_match(
entity['name']['value']
)
if dbpedia_uri:
entity['seeAlso'] = {
'type': 'Relationship',
'object': dbpedia_uri
}

enriched.append(entity)

return AgentResult(
status=AgentStatus.SUCCESS,
data={'enriched_entities': enriched},
errors=[],
execution_time=...
)

7. Graph Database Agents (2 agents)​

Manage Neo4j graph database.

AgentPurposeQuery TypeExample
Neo4jSyncAgentSync entities to Neo4jCREATE/MERGEMERGE (c:Camera {id: $id})
Neo4jQueryAgentExecute Cypher queriesMATCHMATCH (c:Camera)-[:DETECTED]->(a:Accident)

8. Integration Agents (2 agents)​

Infrastructure integration.

AgentPurposeTechnologyFeatures
APIGatewayAgentAPI gateway with rate limitingFastAPIRate limit, circuit breaker
CacheManagerAgentMulti-level cachingRedisL1 (memory), L2 (Redis), L3 (disk)

Example: CacheManagerAgent (3-Level Cache)

class CacheManagerAgent(BaseAgent):
def __init__(self):
self.l1_cache = {} # In-memory
self.l2_cache = redis.Redis() # Redis
self.l3_cache = DiskCache() # Disk

async def get(self, key):
# Try L1 (fastest)
if key in self.l1_cache:
return self.l1_cache[key]

# Try L2 (fast)
value = await self.l2_cache.get(key)
if value:
self.l1_cache[key] = value
return value

# Try L3 (slow but persistent)
value = await self.l3_cache.get(key)
if value:
self.l2_cache.set(key, value, ex=3600)
self.l1_cache[key] = value
return value

return None

9. Monitoring Agents (3 agents)​

System health and performance.

AgentPurposeMetricsAlerts
HealthCheckAgentService health checksUp/down status, latencyEmail, Slack
PerformanceMonitorAgentPerformance metricsCPU, memory, throughputGrafana dashboard
DataQualityValidatorAgentData quality checksCompleteness, accuracyData quality reports

10. State Management Agents (4 agents)​

Track and manage state.

AgentPurposeState TypeStorage
StateManagerAgentGeneral state managementGlobal stateRedis
AccidentStateManagerAgentTrack accident lifecycleactive/resolved/false_alarmMongoDB
CongestionStateManagerAgentTrack congestion eventslow/medium/high/criticalTimescaleDB
TemporalStateTrackerAgentHistorical state trackingTime-series stateTimescaleDB

πŸ”„ Agent Orchestration​

Execution Flow​

graph LR
A[Orchestrator] -->|1. Schedule| B[Data Collection Agents]
B -->|2. Images| C[Analytics Agents]
C -->|3. Detected Accidents| D[Transformation Agents]
D -->|4. NGSI-LD Entities| E[Context Management Agents]
E -->|5. Published| F[RDF/LOD Agents]
F -->|6. RDF Triples| G[Graph Database Agents]
C -->|7. Metrics| H[Monitoring Agents]

Scheduling Strategies​

  1. Time-based: Run every N seconds/minutes
  2. Event-driven: Trigger on Kafka messages
  3. Dependency-based: Run after another agent completes
  4. Manual: Triggered by API call

Example: Orchestrator Configuration​

# config/workflow.yaml
agents:
- name: CameraImageFetchAgent
schedule: "*/30 * * * * *" # Every 30 seconds
timeout: 60
retry: 3
dependencies: []

- name: AccidentDetectionAgent
schedule: null # Event-driven
trigger: CameraImageFetchAgent.SUCCESS
dependencies: [CameraImageFetchAgent]

- name: NGSILDTransformerAgent
schedule: null
trigger: AccidentDetectionAgent.SUCCESS
dependencies: [AccidentDetectionAgent]

πŸ“– Agent Documentation​

Each agent has detailed documentation:

πŸ› οΈ Creating Your Own Agent​

See the guide: Adding a New Agent


Next: Explore individual agents in the Data Collection section.