Camera Image Fetch Agent
Overview
The Camera Image Fetch Agent is responsible for collecting real-time traffic camera images from HCMC's transportation network. It implements intelligent caching with TTL (Time To Live) management to optimize bandwidth usage and ensure data freshness.
Features
- Real-time Image Collection: Fetches images from 100+ traffic cameras across HCMC
- Intelligent Caching: Redis-based caching with configurable TTL (default: 60 seconds)
- Rate Limiting: Prevents API overload with configurable request intervals
- Error Handling: Automatic retry with exponential backoff
- Image Validation: Checks image integrity and format before storage
- Metadata Enrichment: Adds timestamp, location, and camera status information
Architecture
graph LR
A[Camera API] --> B[Image Fetch Agent]
B --> C[Redis Cache]
B --> D[Image Validator]
D --> E[MongoDB Storage]
C --> F[Downstream Agents]
E --> F
Configuration
File: config/data_sources.yaml
camera_sources:
hcmc_traffic_cameras:
base_url: "https://api.hcmc-transport.gov.vn/cameras"
api_key: "${CAMERA_API_KEY}"
refresh_interval: 60 # seconds
cache_ttl: 300 # 5 minutes
timeout: 10 # seconds
retry_attempts: 3
retry_delay: 2 # seconds
camera_filters:
districts: [1, 3, 5, 7, 10, "Binh Thanh", "Phu Nhuan"]
active_only: true
min_resolution: "720p"
Usage
Basic Usage
from src.agents.data_collection.image_refresh_agent import ImageRefreshAgent
# Initialize agent
agent = ImageRefreshAgent(config_path="config/data_sources.yaml")
# Fetch all camera images
images = agent.fetch_all_cameras()
# Fetch specific camera
image = agent.fetch_camera_image(camera_id="CAM_001")
# Get cached image
cached_image = agent.get_cached_image(camera_id="CAM_001")
Advanced Usage with Caching
from src.agents.data_collection.image_refresh_agent import ImageRefreshAgent
from src.agents.cache.cache_manager_agent import CacheManagerAgent
# Initialize with custom cache
cache_manager = CacheManagerAgent()
agent = ImageRefreshAgent(cache_manager=cache_manager)
# Fetch with cache strategy
image = agent.fetch_with_strategy(
camera_id="CAM_001",
strategy="cache_first", # cache_first, network_first, cache_only
max_age=120 # seconds
)
# Batch fetch with parallel processing
images = agent.batch_fetch(
camera_ids=["CAM_001", "CAM_002", "CAM_003"],
max_workers=5,
timeout=30
)
Integration with Other Agents
from src.orchestrator import Orchestrator
# Use in orchestration workflow
orchestrator = Orchestrator()
# Register agent
orchestrator.register_agent("image_refresh", ImageRefreshAgent())
# Execute workflow
result = orchestrator.execute_workflow([
"image_refresh",
"cv_analysis",
"accident_detection"
])
API Reference
Class: ImageRefreshAgent
Methods
__init__(config_path: str, cache_manager: Optional[CacheManager] = None)
Initialize the agent with configuration.
Parameters:
config_path(str): Path to configuration filecache_manager(CacheManager, optional): Custom cache manager instance
Example:
agent = ImageRefreshAgent(
config_path="config/data_sources.yaml",
cache_manager=custom_cache
)
fetch_all_cameras() -> List[CameraImage]
Fetch images from all active cameras.
Returns:
- List[CameraImage]: List of camera image objects
Raises:
CameraAPIError: If API request failsValidationError: If image validation fails
Example:
images = agent.fetch_all_cameras()
for img in images:
print(f"Camera: {img.camera_id}, Size: {img.size_bytes}")
fetch_camera_image(camera_id: str, force_refresh: bool = False) -> CameraImage
Fetch image from specific camera.
Parameters:
camera_id(str): Unique camera identifierforce_refresh(bool): Bypass cache and fetch fresh image
Returns:
- CameraImage: Camera image object with metadata
Example:
image = agent.fetch_camera_image("CAM_001", force_refresh=True)
get_cached_image(camera_id: str) -> Optional[CameraImage]
Retrieve image from cache.
Parameters:
camera_id(str): Unique camera identifier
Returns:
- Optional[CameraImage]: Cached image or None if not found
Example:
cached = agent.get_cached_image("CAM_001")
if cached:
print(f"Cache hit! Age: {cached.age_seconds}s")
Data Models
CameraImage
from dataclasses import dataclass
from datetime import datetime
@dataclass
class CameraImage:
camera_id: str
image_data: bytes
timestamp: datetime
location: dict
resolution: tuple
format: str
size_bytes: int
metadata: dict
CameraStatus
@dataclass
class CameraStatus:
camera_id: str
is_active: bool
last_update: datetime
error_count: int
uptime_percentage: float
Performance Optimization
Cache Strategy
The agent implements a multi-tier caching strategy:
- L1 Cache (Memory): In-memory LRU cache for frequently accessed images
- L2 Cache (Redis): Distributed cache for cross-agent sharing
- L3 Cache (MongoDB): Persistent storage for historical data
# Configure cache tiers
agent.configure_cache(
l1_size=100, # Number of images in memory
l2_ttl=300, # Redis TTL in seconds
l3_retention=7 # MongoDB retention in days
)
Parallel Processing
Optimize batch operations with parallel processing:
# Configure parallel fetching
agent.configure_parallel(
max_workers=10,
chunk_size=20,
timeout_per_camera=5
)
# Execute parallel batch
images = agent.batch_fetch_parallel(
camera_ids=all_camera_ids,
progress_callback=lambda done, total: print(f"{done}/{total}")
)
Monitoring & Metrics
Health Check
# Check agent health
health = agent.health_check()
print(f"Status: {health.status}")
print(f"Active Cameras: {health.active_cameras}")
print(f"Cache Hit Rate: {health.cache_hit_rate}%")
print(f"Average Response Time: {health.avg_response_time}ms")
Metrics Export
# Export metrics for monitoring
metrics = agent.export_metrics()
# {
# "total_fetches": 1000,
# "successful_fetches": 980,
# "failed_fetches": 20,
# "cache_hits": 750,
# "cache_misses": 250,
# "avg_fetch_time_ms": 120
# }
Error Handling
Common Errors
CameraAPIError
from src.core.exceptions import CameraAPIError
try:
image = agent.fetch_camera_image("CAM_001")
except CameraAPIError as e:
print(f"API Error: {e.message}")
print(f"Status Code: {e.status_code}")
print(f"Camera ID: {e.camera_id}")
CacheError
from src.core.exceptions import CacheError
try:
cached = agent.get_cached_image("CAM_001")
except CacheError as e:
print(f"Cache Error: {e.message}")
# Fallback to network fetch
image = agent.fetch_camera_image("CAM_001", force_refresh=True)
Retry Logic
# Configure retry strategy
agent.configure_retry(
max_attempts=5,
backoff_factor=2,
max_delay=30,
retry_on=[500, 502, 503, 504] # HTTP status codes
)
Testing
Unit Tests
import pytest
from src.agents.data_collection.image_refresh_agent import ImageRefreshAgent
def test_fetch_camera_image(mock_camera_api):
agent = ImageRefreshAgent()
image = agent.fetch_camera_image("CAM_TEST_001")
assert image.camera_id == "CAM_TEST_001"
assert len(image.image_data) > 0
assert image.format in ["JPEG", "PNG"]
def test_cache_behavior(mock_cache):
agent = ImageRefreshAgent(cache_manager=mock_cache)
# First fetch - cache miss
image1 = agent.fetch_camera_image("CAM_001")
assert mock_cache.get_call_count() == 1
# Second fetch - cache hit
image2 = agent.get_cached_image("CAM_001")
assert image2 is not None
assert image1.camera_id == image2.camera_id
Integration Tests
@pytest.mark.integration
def test_end_to_end_workflow():
agent = ImageRefreshAgent()
# Fetch all cameras
images = agent.fetch_all_cameras()
assert len(images) > 0
# Verify cache population
for image in images:
cached = agent.get_cached_image(image.camera_id)
assert cached is not None
Best Practices
1. Cache Management
# Always check cache first
image = agent.get_cached_image(camera_id)
if not image or image.is_stale():
image = agent.fetch_camera_image(camera_id)
2. Error Handling
# Implement graceful degradation
try:
image = agent.fetch_camera_image(camera_id)
except CameraAPIError:
# Use cached image even if stale
image = agent.get_cached_image(camera_id)
if not image:
# Use placeholder image
image = agent.get_placeholder_image()
3. Resource Cleanup
# Always close connections
try:
agent = ImageRefreshAgent()
images = agent.fetch_all_cameras()
finally:
agent.close() # Close connections and cleanup resources