Chuyển tới nội dung chính

Neo4j Sync Agent

The Neo4j Sync Agent synchronizes traffic entities and relationships to Neo4j graph database for advanced relationship queries and graph-based analytics.

📋 Overview

PropertyValue
Modulesrc.agents.graph_database.neo4j_sync_agent
ClassNeo4jSyncAgent
AuthorNguyen Viet Hoang
Version1.0.0

🎯 Purpose

The Neo4j Sync Agent enables:

  • Graph-based relationships between traffic entities
  • Advanced pattern queries using Cypher
  • Spatial analytics with geographic relationships
  • Real-time graph updates from streaming data

📊 Graph Schema

Node Types

Node LabelDescriptionProperties
CameraTraffic cameraid, name, location, status
ObservationTraffic observationid, timestamp, vehicle_count, speed
AccidentAccident eventid, severity, location, timestamp
CongestionCongestion zoneid, level, duration, affected_roads
PatternTraffic patternid, type, frequency, time_of_day

Relationship Types

RelationshipFromToDescription
OBSERVESCameraObservationCamera captures observation
DETECTSCameraAccidentCamera detects accident
CAUSESAccidentCongestionAccident causes congestion
CORRELATES_WITHPatternPatternPattern correlation
NEAR_TOCameraCameraSpatial proximity

🔧 Architecture

┌─────────────────┐    ┌─────────────────┐
│ NGSI-LD │───▶│ Neo4j Sync │
│ Entities │ │ Agent │
└─────────────────┘ └────────┬────────┘

┌───────────┼───────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Create │ │ Update │ │ Delete │
│ Nodes │ │ Props │ │ Nodes │
└──────────┘ └──────────┘ └──────────┘
│ │ │
└───────────┼───────────┘

┌────────────────┐
│ Neo4j │
│ Database │
└────────────────┘

🚀 Usage

Basic Synchronization

from src.agents.graph_database.neo4j_sync_agent import Neo4jSyncAgent

config = {
"enabled": True,
"neo4j_uri": "bolt://localhost:7687",
"username": "neo4j",
"password": "password"
}
sync_agent = Neo4jSyncAgent(config)

# Sync camera entity
camera = {
"id": "urn:ngsi-ld:Camera:CAM_001",
"type": "Camera",
"name": {"value": "Camera 001"},
"location": {
"value": {
"coordinates": [106.660172, 10.762622]
}
}
}
sync_agent.sync_entity(camera)

Batch Synchronization

# Sync multiple entities
entities = [camera1, camera2, camera3]
sync_agent.sync_entities(entities)

# Sync with relationships
sync_agent.sync_with_relationships(
entity=observation,
relationships=[
("OBSERVED_BY", camera_id),
("PART_OF", pattern_id)
]
)

Query Graph

# Find all accidents near a camera
results = sync_agent.query("""
MATCH (c:Camera {id: $camera_id})-[:DETECTS]->(a:Accident)
WHERE a.severity = 'severe'
RETURN a
ORDER BY a.timestamp DESC
LIMIT 10
""", {"camera_id": "CAM_001"})

⚙️ Configuration

Environment Variables

NEO4J_URL=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=your_password

YAML Configuration

# config/neo4j_sync.yaml
neo4j:
enabled: true
uri: bolt://localhost:7687
username: neo4j
password: ${NEO4J_PASSWORD}

pool:
max_size: 50
acquisition_timeout: 30

sync:
batch_size: 100
create_indexes: true

indexes:
- label: Camera
property: id
- label: Observation
property: timestamp
- label: Accident
property: severity

📈 Cypher Query Examples

Find Traffic Patterns

// Find cameras with high accident rates
MATCH (c:Camera)-[:DETECTS]->(a:Accident)
WITH c, count(a) as accident_count
WHERE accident_count > 5
RETURN c.name, accident_count
ORDER BY accident_count DESC

Spatial Queries

// Find cameras within 1km radius
MATCH (c1:Camera), (c2:Camera)
WHERE c1.id <> c2.id
AND point.distance(
point({longitude: c1.lon, latitude: c1.lat}),
point({longitude: c2.lon, latitude: c2.lat})
) < 1000
RETURN c1.name, c2.name

Pattern Correlation

// Find correlated congestion patterns
MATCH (p1:Pattern)-[:CORRELATES_WITH]->(p2:Pattern)
WHERE p1.type = 'morning_rush'
RETURN p1, p2, p1.correlation_score

🛡️ Error Handling

try:
sync_agent.sync_entity(entity)
except ConnectionError:
logger.error("Neo4j connection failed")
# Fallback to queue for retry
retry_queue.add(entity)
except ConstraintError:
logger.warning("Duplicate entity, updating instead")
sync_agent.update_entity(entity)

See the complete agents reference for all available agents.