Skip to main content

Neo4j Sync Agent

The Neo4j Sync Agent synchronizes traffic entities and relationships to Neo4j graph database for advanced relationship queries and graph-based analytics.

πŸ“‹ Overview​

PropertyValue
Modulesrc.agents.graph_database.neo4j_sync_agent
ClassNeo4jSyncAgent
AuthorNguyen Viet Hoang
Version1.0.0

🎯 Purpose​

The Neo4j Sync Agent enables:

  • Graph-based relationships between traffic entities
  • Advanced pattern queries using Cypher
  • Spatial analytics with geographic relationships
  • Real-time graph updates from streaming data

πŸ“Š Graph Schema​

Node Types​

Node LabelDescriptionProperties
CameraTraffic cameraid, name, location, status
ObservationTraffic observationid, timestamp, vehicle_count, speed
AccidentAccident eventid, severity, location, timestamp
CongestionCongestion zoneid, level, duration, affected_roads
PatternTraffic patternid, type, frequency, time_of_day

Relationship Types​

RelationshipFromToDescription
OBSERVESCameraObservationCamera captures observation
DETECTSCameraAccidentCamera detects accident
CAUSESAccidentCongestionAccident causes congestion
CORRELATES_WITHPatternPatternPattern correlation
NEAR_TOCameraCameraSpatial proximity

πŸ”§ Architecture​

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ NGSI-LD │───▢│ Neo4j Sync β”‚
β”‚ Entities β”‚ β”‚ Agent β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β–Ό β–Ό β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Create β”‚ β”‚ Update β”‚ β”‚ Delete β”‚
β”‚ Nodes β”‚ β”‚ Props β”‚ β”‚ Nodes β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Neo4j β”‚
β”‚ Database β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸš€ Usage​

Basic Synchronization​

from src.agents.graph_database.neo4j_sync_agent import Neo4jSyncAgent

config = {
"enabled": True,
"neo4j_uri": "bolt://localhost:7687",
"username": "neo4j",
"password": "password"
}
sync_agent = Neo4jSyncAgent(config)

# Sync camera entity
camera = {
"id": "urn:ngsi-ld:Camera:CAM_001",
"type": "Camera",
"name": {"value": "Camera 001"},
"location": {
"value": {
"coordinates": [106.660172, 10.762622]
}
}
}
sync_agent.sync_entity(camera)

Batch Synchronization​

# Sync multiple entities
entities = [camera1, camera2, camera3]
sync_agent.sync_entities(entities)

# Sync with relationships
sync_agent.sync_with_relationships(
entity=observation,
relationships=[
("OBSERVED_BY", camera_id),
("PART_OF", pattern_id)
]
)

Query Graph​

# Find all accidents near a camera
results = sync_agent.query("""
MATCH (c:Camera {id: $camera_id})-[:DETECTS]->(a:Accident)
WHERE a.severity = 'severe'
RETURN a
ORDER BY a.timestamp DESC
LIMIT 10
""", {"camera_id": "CAM_001"})

βš™οΈ Configuration​

Environment Variables​

NEO4J_URL=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=your_password

YAML Configuration​

# config/neo4j_sync.yaml
neo4j:
enabled: true
uri: bolt://localhost:7687
username: neo4j
password: ${NEO4J_PASSWORD}

pool:
max_size: 50
acquisition_timeout: 30

sync:
batch_size: 100
create_indexes: true

indexes:
- label: Camera
property: id
- label: Observation
property: timestamp
- label: Accident
property: severity

πŸ“ˆ Cypher Query Examples​

Find Traffic Patterns​

// Find cameras with high accident rates
MATCH (c:Camera)-[:DETECTS]->(a:Accident)
WITH c, count(a) as accident_count
WHERE accident_count > 5
RETURN c.name, accident_count
ORDER BY accident_count DESC

Spatial Queries​

// Find cameras within 1km radius
MATCH (c1:Camera), (c2:Camera)
WHERE c1.id <> c2.id
AND point.distance(
point({longitude: c1.lon, latitude: c1.lat}),
point({longitude: c2.lon, latitude: c2.lat})
) < 1000
RETURN c1.name, c2.name

Pattern Correlation​

// Find correlated congestion patterns
MATCH (p1:Pattern)-[:CORRELATES_WITH]->(p2:Pattern)
WHERE p1.type = 'morning_rush'
RETURN p1, p2, p1.correlation_score

πŸ›‘οΈ Error Handling​

try:
sync_agent.sync_entity(entity)
except ConnectionError:
logger.error("Neo4j connection failed")
# Fallback to queue for retry
retry_queue.add(entity)
except ConstraintError:
logger.warning("Duplicate entity, updating instead")
sync_agent.update_entity(entity)

See the complete agents reference for all available agents.