🤖 Deep Dive vào Agent System của UIP
· 4 phút để đọc
Trong bài viết này, tôi sẽ chia sẻ chi tiết về Agent System - thành phần cốt lõi giúp UIP hoạt động mạnh mẽ và linh hoạt.
🎯 Tại sao sử dụng Agent Architecture?
Khi thiết kế UIP, chúng tôi đối mặt với nhiều thách thức:
- Xử lý song song - Cần xử lý dữ liệu từ 1000+ camera
- Modular - Dễ thêm/bớt tính năng
- Fault Tolerant - Một agent lỗi không ảnh hưởng toàn hệ thống
- Scalable - Mở rộng theo nhu cầu
Agent Architecture là giải pháp hoàn hảo!
🏗️ Cấu trúc Agent
Base Agent Interface
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
class AgentStatus(Enum):
SUCCESS = "success"
FAILURE = "failure"
PARTIAL = "partial"
SKIPPED = "skipped"
@dataclass
class AgentResult:
status: AgentStatus
data: dict
errors: list
execution_time: float
class BaseAgent(ABC):
"""Base class cho tất cả agents trong UIP"""
def __init__(self, config: dict):
self.config = config
self.logger = self._setup_logger()
@abstractmethod
async def execute(self, context: dict) -> AgentResult:
"""Execute agent logic"""
pass
async def pre_execute(self, context: dict) -> bool:
"""Hook trước khi execute"""
return True
async def post_execute(self, result: AgentResult) -> None:
"""Hook sau khi execute"""
pass
📊 Các loại Agent trong UIP
1. Data Collection Agents 📥
Thu thập dữ liệu từ nhiều nguồn:
| Agent | Chức năng | Frequency |
|---|---|---|
ImageRefreshAgent | Lấy ảnh từ camera | 5s |
WeatherIntegrationAgent | Dữ liệu thời tiết | 10m |
AirQualityAgent | Chỉ số AQI | 30m |
class ImageRefreshAgent(BaseAgent):
"""Refresh camera images from HCM traffic system"""
async def execute(self, context: dict) -> AgentResult:
cameras = await self.fetch_camera_list()
async with aiohttp.ClientSession() as session:
tasks = [
self.fetch_image(session, cam)
for cam in cameras
]
results = await asyncio.gather(*tasks)
return AgentResult(
status=AgentStatus.SUCCESS,
data={"images": results, "count": len(results)},
errors=[],
execution_time=time.time() - start
)
2. Analytics Agents 📈
Phân tích dữ liệu thông minh:
CVAnalysisAgent- Phân tích hình ảnh với YOLOXAccidentDetectionAgent- Phát hiện tai nạnCongestionDetectionAgent- Phát hiện ùn tắcPatternRecognitionAgent- Nhận dạng pattern giao thông
class AccidentDetectionAgent(BaseAgent):
"""Detect accidents using YOLOX model"""
def __init__(self, config: dict):
super().__init__(config)
self.model = self._load_yolox_model()
self.threshold = config.get("confidence_threshold", 0.7)
async def execute(self, context: dict) -> AgentResult:
images = context.get("camera_images", [])
accidents = []
for img in images:
detections = self.model.predict(img)
if self._is_accident(detections):
accidents.append({
"camera_id": img["camera_id"],
"confidence": detections["confidence"],
"location": img["location"],
"timestamp": datetime.now().isoformat()
})
# Dispatch alerts if accidents detected
if accidents:
await self.dispatch_alerts(accidents)
return AgentResult(
status=AgentStatus.SUCCESS,
data={"accidents": accidents},
errors=[],
execution_time=self.execution_time
)
3. Transformation Agents 🔄
Chuyển đổi dữ liệu sang các format chuẩn:
NGSILDTransformerAgent- Chuyển sang NGSI-LDSOSASSNMapperAgent- Map sang SOSA/SSN ontology
class NGSILDTransformerAgent(BaseAgent):
"""Transform data to NGSI-LD format"""
async def execute(self, context: dict) -> AgentResult:
entities = []
for camera in context.get("cameras", []):
entity = {
"@context": NGSI_LD_CONTEXT,
"id": f"urn:ngsi-ld:TrafficCamera:{camera['id']}",
"type": "TrafficCamera",
"location": {
"type": "GeoProperty",
"value": {
"type": "Point",
"coordinates": [camera["lon"], camera["lat"]]
}
},
"vehicleCount": {
"type": "Property",
"value": camera["vehicle_count"],
"observedAt": datetime.now().isoformat()
}
}
entities.append(entity)
return AgentResult(
status=AgentStatus.SUCCESS,
data={"entities": entities},
errors=[],
execution_time=self.execution_time
)
4. RDF/LOD Agents 🔗
Xử lý Linked Open Data:
TriplestoreLoaderAgent- Load RDF vào FusekiContentNegotiationAgent- Serve RDF với nhiều formatLODLinksetEnrichmentAgent- Liên kết với external datasets
🔄 Orchestrator - Điều phối viên
Orchestrator quản lý việc thực thi các agents:
class Orchestrator:
"""Điều phối execution của agents"""
def __init__(self, config_path: str):
self.config = load_config(config_path)
self.agents = self._load_agents()
self.scheduler = AsyncIOScheduler()
async def run_pipeline(self):
"""Execute full pipeline"""
context = {}
# Phase 1: Data Collection
for agent in self.agents["collection"]:
result = await agent.execute(context)
context.update(result.data)
# Phase 2: Analytics
analytics_tasks = [
agent.execute(context)
for agent in self.agents["analytics"]
]
results = await asyncio.gather(*analytics_tasks)
# Phase 3: Transformation & Publishing
for agent in self.agents["transformation"]:
result = await agent.execute(context)
context.update(result.data)
return context
📈 Performance Metrics
Sau 6 tháng vận hành:
| Metric | Value |
|---|---|
| Agents deployed | 30+ |
| Avg execution time | 2.3s |
| Success rate | 99.7% |
| Data processed/day | 50GB+ |
| Cameras monitored | 1000+ |
🎓 Lessons Learned
- Async is key - Sử dụng
asynciocho tất cả I/O operations - Graceful degradation - Agent lỗi không crash hệ thống
- Observability - Log và metric cho mọi agent
- Config-driven - Thay đổi behavior không cần code
🚀 Kết luận
Agent System giúp UIP:
- ✅ Xử lý 1000+ cameras real-time
- ✅ Phát hiện tai nạn trong < 3 giây
- ✅ Scale horizontally dễ dàng
- ✅ Maintain và extend đơn giản
Bạn có câu hỏi về Agent System? Comment bên dưới hoặc mở issue trên GitHub!
Nguyễn Nhật Quang - Lead Developer @ UIP Team
