Camera Image Fetch Agent
Overview
The Camera Image Fetch Agent is responsible for collecting real-time traffic camera images from HCMC's transportation network. It implements intelligent caching with TTL (Time To Live) management to optimize bandwidth usage and ensure data freshness.
Features
- Real-time Image Collection: Fetches images from 100+ traffic cameras across HCMC
- Intelligent Caching: Redis-based caching with configurable TTL (default: 60 seconds)
- Rate Limiting: Prevents API overload with configurable request intervals
- Error Handling: Automatic retry with exponential backoff
- Image Validation: Checks image integrity and format before storage
- Metadata Enrichment: Adds timestamp, location, and camera status information
Architecture
graph LR
A[Camera API] --> B[Image Fetch Agent]
B --> C[Redis Cache]
B --> D[Image Validator]
D --> E[MongoDB Storage]
C --> F[Downstream Agents]
E --> F
Configuration
File: config/data_sources.yaml
camera_sources:
hcmc_traffic_cameras:
base_url: "https://api.hcmc-transport.gov.vn/cameras"
api_key: "${CAMERA_API_KEY}"
refresh_interval: 60 # seconds
cache_ttl: 300 # 5 minutes
timeout: 10 # seconds
retry_attempts: 3
retry_delay: 2 # seconds
camera_filters:
districts: [1, 3, 5, 7, 10, "Binh Thanh", "Phu Nhuan"]
active_only: true
min_resolution: "720p"
Usage
Basic Usage
from src.agents.data_collection.image_refresh_agent import ImageRefreshAgent
# Initialize agent
agent = ImageRefreshAgent(config_path="config/data_sources.yaml")
# Fetch all camera images
images = agent.fetch_all_cameras()
# Fetch specific camera
image = agent.fetch_camera_image(camera_id="CAM_001")
# Get cached image
cached_image = agent.get_cached_image(camera_id="CAM_001")
Advanced Usage with Caching
from src.agents.data_collection.image_refresh_agent import ImageRefreshAgent
from src.agents.cache.cache_manager_agent import CacheManagerAgent
# Initialize with custom cache
cache_manager = CacheManagerAgent()
agent = ImageRefreshAgent(cache_manager=cache_manager)
# Fetch with cache strategy
image = agent.fetch_with_strategy(
camera_id="CAM_001",
strategy="cache_first", # cache_first, network_first, cache_only
max_age=120 # seconds
)
# Batch fetch with parallel processing
images = agent.batch_fetch(
camera_ids=["CAM_001", "CAM_002", "CAM_003"],
max_workers=5,
timeout=30
)
Integration with Other Agents
from src.orchestrator import Orchestrator
# Use in orchestration workflow
orchestrator = Orchestrator()
# Register agent
orchestrator.register_agent("image_refresh", ImageRefreshAgent())
# Execute workflow
result = orchestrator.execute_workflow([
"image_refresh",
"cv_analysis",
"accident_detection"
])
API Reference
Class: ImageRefreshAgent
Methods
__init__(config_path: str, cache_manager: Optional[CacheManager] = None)
Initialize the agent with configuration.
Parameters:
config_path(str): Path to configuration filecache_manager(CacheManager, optional): Custom cache manager instance
Example:
agent = ImageRefreshAgent(
config_path="config/data_sources.yaml",
cache_manager=custom_cache
)
fetch_all_cameras() -> List[CameraImage]
Fetch images from all active cameras.
Returns:
- List[CameraImage]: List of camera image objects
Raises:
CameraAPIError: If API request failsValidationError: If image validation fails
Example:
images = agent.fetch_all_cameras()
for img in images:
print(f"Camera: {img.camera_id}, Size: {img.size_bytes}")
fetch_camera_image(camera_id: str, force_refresh: bool = False) -> CameraImage
Fetch image from specific camera.
Parameters:
camera_id(str): Unique camera identifierforce_refresh(bool): Bypass cache and fetch fresh image
Returns:
- CameraImage: Camera image object with metadata
Example:
image = agent.fetch_camera_image("CAM_001", force_refresh=True)
get_cached_image(camera_id: str) -> Optional[CameraImage]
Retrieve image from cache.
Parameters:
camera_id(str): Unique camera identifier
Returns:
- Optional[CameraImage]: Cached image or None if not found
Example:
cached = agent.get_cached_image("CAM_001")
if cached:
print(f"Cache hit! Age: {cached.age_seconds}s")
Data Models
CameraImage
from dataclasses import dataclass
from datetime import datetime
@dataclass
class CameraImage:
camera_id: str
image_data: bytes
timestamp: datetime
location: dict
resolution: tuple
format: str
size_bytes: int
metadata: dict
CameraStatus
@dataclass
class CameraStatus:
camera_id: str
is_active: bool
last_update: datetime
error_count: int
uptime_percentage: float
Performance Optimization
Cache Strategy
The agent implements a multi-tier caching strategy:
- L1 Cache (Memory): In-memory LRU cache for frequently accessed images
- L2 Cache (Redis): Distributed cache for cross-agent sharing
- L3 Cache (MongoDB): Persistent storage for historical data
# Configure cache tiers
agent.configure_cache(
l1_size=100, # Number of images in memory
l2_ttl=300, # Redis TTL in seconds
l3_retention=7 # MongoDB retention in days
)