Skip to main content

Data Quality Validator Agent

The Data Quality Validator Agent ensures data integrity and quality across all data sources and pipelines.

πŸ“‹ Overview​

PropertyValue
Modulesrc.agents.monitoring.data_quality_validator_agent
ClassDataQualityValidatorAgent
AuthorUIP Team
Version1.0.0

🎯 Purpose​

The Data Quality Validator Agent provides:

  • Schema validation for incoming data
  • Completeness checks for required fields
  • Consistency validation across data sources
  • Freshness monitoring for real-time data
  • Anomaly detection for unexpected values

πŸ“Š Validation Types​

Schema Validation​

CheckDescription
Type validationCorrect data types
Required fieldsAll mandatory fields present
Format validationProper formatting (dates, IDs)
Range validationValues within acceptable ranges

Data Quality Metrics​

MetricFormulaTarget
Completeness(non-null values / total) Γ— 100> 95%
Accuracy(valid values / total) Γ— 100> 99%
Consistency(matching records / total) Γ— 100> 98%
TimelinessAge of latest record< 5 min

πŸ”§ Architecture​

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Data Quality Validator Agent β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Schema β”‚ β”‚Complete-β”‚ β”‚ Consistency β”‚ β”‚
β”‚ β”‚Validatorβ”‚ β”‚ ness β”‚ β”‚ Checker β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β–Ό β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Quality Score β”‚ β”‚
β”‚ β”‚ Calculator β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β”‚ β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β–Ό β–Ό β–Ό β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Report β”‚ β”‚ Alert β”‚ β”‚ Metrics β”‚ β”‚
β”‚ β”‚Generatorβ”‚ β”‚ Trigger β”‚ β”‚ Export β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸš€ Usage​

Basic Validation​

from src.agents.monitoring.data_quality_validator_agent import DataQualityValidatorAgent

validator = DataQualityValidatorAgent()

# Validate entity
result = validator.validate_entity(camera_entity)
if result.is_valid:
print("Entity is valid")
else:
print(f"Validation errors: {result.errors}")

Batch Validation​

# Validate multiple entities
entities = load_entities()
results = validator.validate_batch(entities)

print(f"Valid: {results.valid_count}")
print(f"Invalid: {results.invalid_count}")
print(f"Quality Score: {results.quality_score}%")

Schema Definition​

from src.validation import Schema, Field

camera_schema = Schema(
name="Camera",
fields=[
Field("id", type="string", required=True, pattern=r"^CAM_\d+$"),
Field("name", type="string", required=True, max_length=100),
Field("location", type="object", required=True),
Field("status", type="string", enum=["active", "inactive", "maintenance"]),
Field("lastObservation", type="datetime", max_age_minutes=10)
]
)

validator.register_schema("Camera", camera_schema)

βš™οΈ Configuration​

# config/data_quality_config.yaml
data_quality:
enabled: true

# Validation rules
schemas:
Camera:
required_fields:
- id
- name
- location
- status
field_types:
id: string
name: string
location: object
status: string
patterns:
id: "^CAM_\\d+$"
enums:
status: ["active", "inactive", "maintenance"]

Observation:
required_fields:
- id
- observedAt
- vehicleCount
- avgSpeed
field_types:
vehicleCount: integer
avgSpeed: number
ranges:
vehicleCount:
min: 0
max: 1000
avgSpeed:
min: 0
max: 200

# Quality thresholds
thresholds:
completeness_warning: 95
completeness_critical: 90
freshness_warning_minutes: 5
freshness_critical_minutes: 15

# Alerting
alerts:
enabled: true
on_quality_drop: true
on_schema_violation: true

πŸ“ˆ Quality Reports​

Generate Report​

# Generate quality report
report = validator.generate_report(
time_range="24h",
entity_types=["Camera", "Observation", "Accident"]
)

print(f"Overall Quality Score: {report.overall_score}%")
for entity_type, score in report.by_entity_type.items():
print(f" {entity_type}: {score}%")

Report Format​

{
"timestamp": "2025-11-29T10:00:00Z",
"overall_quality_score": 97.5,
"metrics": {
"completeness": 98.2,
"accuracy": 99.1,
"consistency": 96.5,
"timeliness": 95.8
},
"by_entity_type": {
"Camera": {
"score": 99.0,
"records_validated": 150,
"errors": 2
},
"Observation": {
"score": 96.5,
"records_validated": 45000,
"errors": 1580
}
},
"issues": [
{
"type": "missing_field",
"entity": "CAM_045",
"field": "lastObservation",
"severity": "warning"
}
]
}

πŸ›‘οΈ Error Handling​

# Handle validation errors
for entity in entities:
result = validator.validate_entity(entity)

if not result.is_valid:
for error in result.errors:
if error.severity == "critical":
logger.error(f"Critical: {error.message}")
quarantine_entity(entity)
elif error.severity == "warning":
logger.warning(f"Warning: {error.message}")
flag_for_review(entity)

See the complete agents reference for all available agents.