Enterprise Data Quality Framework
Production-grade data quality system implementing automated validation, monitoring, and anomaly detection for data pipelines
System Architecture
A comprehensive data quality framework that ensures data integrity through automated validation, profiling, and monitoring.
Core Components
1. Data Validation Engine
class DataValidator:
def __init__(self, config: Dict[str, Any]):
self.validators = {
'schema': SchemaValidator(config['schema_rules']),
'statistical': StatisticalValidator(config['stat_rules']),
'business': BusinessRuleValidator(config['business_rules']),
'integrity': DataIntegrityValidator(config['integrity_rules'])
}
self.alert_manager = AlertManager(config['alert_config'])
async def validate_dataset(
self,
data: pd.DataFrame
) -> ValidationReport:
validation_tasks = [
validator.validate(data)
for validator in self.validators.values()
]
results = await asyncio.gather(*validation_tasks)
report = self._compile_validation_report(results)
if report.has_critical_issues():
await self.alert_manager.send_alert(report)
return report
class SchemaValidator:
def __init__(self, rules: Dict[str, Any]):
self.required_columns = rules['required_columns']
self.column_types = rules['column_types']
self.unique_constraints = rules['unique_constraints']
async def validate(
self,
data: pd.DataFrame
) -> List[ValidationIssue]:
issues = []
# Check column presence and types
for col, expected_type in self.column_types.items():
if col not in data.columns:
issues.append(
ValidationIssue(
level='critical',
message=f"Missing required column: {col}"
)
)
elif not self._check_type(data[col], expected_type):
issues.append(
ValidationIssue(
level='error',
message=f"Invalid type for column {col}"
)
)
# Check uniqueness constraints
for constraint in self.unique_constraints:
if not self._check_uniqueness(data, constraint):
issues.append(
ValidationIssue(
level='error',
message=f"Uniqueness violation: {constraint}"
)
)
return issues
2. Data Profiling System
class DataProfiler:
def __init__(self, config: Dict[str, Any]):
self.profile_components = {
'basic_stats': BasicStatsProfiler(),
'distributions': DistributionProfiler(),
'correlations': CorrelationProfiler(),
'patterns': PatternProfiler()
}
self.history_manager = ProfileHistoryManager(
storage_config=config['storage']
)
async def generate_profile(
self,
data: pd.DataFrame
) -> DataProfile:
profile_tasks = [
profiler.profile(data)
for profiler in self.profile_components.values()
]
results = await asyncio.gather(*profile_tasks)
profile = self._merge_profile_results(results)
# Store profile history
await self.history_manager.store_profile(profile)
return profile
class DistributionProfiler:
def __init__(self):
self.distribution_tests = {
'normality': self._test_normality,
'uniformity': self._test_uniformity,
'stationarity': self._test_stationarity
}
async def profile(
self,
data: pd.DataFrame
) -> DistributionProfile:
profiles = {}
for column in data.select_dtypes(include=np.number):
column_profile = {}
for test_name, test_func in self.distribution_tests.items():
test_result = await test_func(data[column])
column_profile[test_name] = test_result
profiles[column] = column_profile
return DistributionProfile(profiles)
3. Quality Monitoring System
class QualityMonitor:
def __init__(self, config: Dict[str, Any]):
self.metrics = QualityMetrics(config['metric_config'])
self.detector = DriftDetector(config['drift_config'])
self.time_window = config['time_window']
self.store = MetricStore(config['store_config'])
async def monitor_quality(
self,
current_data: pd.DataFrame,
reference_data: pd.DataFrame
) -> QualityReport:
# Compute current metrics
current_metrics = await self.metrics.compute_metrics(
current_data
)
# Detect data drift
drift_results = await self.detector.detect_drift(
current_data,
reference_data
)
# Store metrics
await self.store.store_metrics(current_metrics)
# Generate report
return self._generate_quality_report(
current_metrics,
drift_results
)
class DriftDetector:
def __init__(self, config: Dict[str, Any]):
self.drift_tests = {
'ks_test': self._kolmogorov_smirnov_test,
'chi_square': self._chi_square_test,
'wasserstein': self._wasserstein_distance
}
self.threshold = config['drift_threshold']
async def detect_drift(
self,
current: pd.DataFrame,
reference: pd.DataFrame
) -> DriftReport:
drift_results = {}
for column in current.columns:
column_results = {}
for test_name, test_func in self.drift_tests.items():
p_value = await test_func(
current[column],
reference[column]
)
column_results[test_name] = {
'p_value': p_value,
'has_drift': p_value < self.threshold
}
drift_results[column] = column_results
return DriftReport(drift_results)
4. Automated Remediation
class QualityRemediation:
def __init__(self, config: Dict[str, Any]):
self.cleaners = {
'missing_values': MissingValueCleaner(config['mv_config']),
'outliers': OutlierCleaner(config['outlier_config']),
'duplicates': DuplicateCleaner(config['duplicate_config']),
'format': FormatCleaner(config['format_config'])
}
self.validation = DataValidator(config['validation_config'])
async def clean_data(
self,
data: pd.DataFrame
) -> Tuple[pd.DataFrame, RemediationReport]:
cleaned_data = data.copy()
report = RemediationReport()
# Apply cleaners sequentially
for cleaner_name, cleaner in self.cleaners.items():
cleaned_data, cleaner_report = await cleaner.clean(
cleaned_data
)
report.add_cleaner_report(cleaner_name, cleaner_report)
# Validate cleaned data
validation_report = await self.validation.validate_dataset(
cleaned_data
)
report.add_validation_report(validation_report)
return cleaned_data, report
Usage Example
# Initialize quality framework
config = {
'schema_rules': {
'required_columns': ['id', 'timestamp', 'value'],
'column_types': {
'id': 'string',
'timestamp': 'datetime',
'value': 'float'
},
'unique_constraints': [['id']]
},
'stat_rules': {
'value': {
'min': 0,
'max': 100,
'std_dev_threshold': 3
}
},
'monitoring_config': {
'drift_threshold': 0.05,
'time_window': '1d'
}
}
quality_framework = DataQualityFramework(config)
# Run quality checks
validation_report = await quality_framework.validate_dataset(data)
profile = await quality_framework.profile_dataset(data)
quality_report = await quality_framework.monitor_quality(
current_data,
reference_data
)
# Handle quality issues
if validation_report.has_issues():
cleaned_data, remediation_report = await quality_framework.clean_data(
data
)