Advanced Time Series Anomaly Detection
Enterprise-grade anomaly detection system implementing deep learning, statistical methods, and real-time monitoring for time series data
System Architecture
A comprehensive time series anomaly detection system that combines multiple detection strategies with real-time processing and automated response capabilities.
Core Components
1. Time Series Preprocessor
class TimeSeriesPreprocessor:
def __init__(self, config: Dict[str, Any]):
self.scaler = RobustScaler()
self.imputer = KNNImputer(n_neighbors=5)
self.decomposer = TimeSeriesDecomposer(
config['decomposition_config']
)
self.feature_extractor = TSFeatureExtractor(
config['feature_config']
)
async def preprocess(
self,
data: pd.DataFrame
) -> ProcessedTimeSeries:
# Handle missing values
imputed_data = await self._handle_missing_values(data)
# Decompose time series
decomposition = await self.decomposer.decompose(imputed_data)
# Extract features
features = await self.feature_extractor.extract_features(
imputed_data,
decomposition
)
# Scale data
scaled_data = self.scaler.fit_transform(imputed_data)
return ProcessedTimeSeries(
data=scaled_data,
features=features,
decomposition=decomposition
)
class TimeSeriesDecomposer:
def __init__(self, config: Dict[str, Any]):
self.methods = {
'stl': self._stl_decomposition,
'seasonal': self._seasonal_decomposition,
'wavelet': self._wavelet_decomposition
}
self.selected_method = config['method']
async def decompose(
self,
data: pd.DataFrame
) -> Dict[str, np.ndarray]:
decomposition_func = self.methods[self.selected_method]
return await decomposition_func(data)
2. Anomaly Detection Models
class AnomalyDetector:
def __init__(self, config: Dict[str, Any]):
self.models = {
'deep': DeepAnomalyDetector(config['deep_config']),
'statistical': StatisticalDetector(config['stat_config']),
'isolation': IsolationForestDetector(config['if_config'])
}
self.ensemble = AnomalyEnsemble(config['ensemble_config'])
async def detect_anomalies(
self,
data: ProcessedTimeSeries
) -> AnomalyResults:
# Get predictions from all models
predictions = {}
for name, model in self.models.items():
predictions[name] = await model.predict(data)
# Combine predictions using ensemble
ensemble_scores = await self.ensemble.combine_predictions(
predictions,
data.features
)
return AnomalyResults(
scores=ensemble_scores,
individual_predictions=predictions,
threshold=self.ensemble.get_threshold(ensemble_scores)
)
class DeepAnomalyDetector:
def __init__(self, config: Dict[str, Any]):
self.model = TransformerAE(
input_dim=config['input_dim'],
hidden_dim=config['hidden_dim'],
num_layers=config['num_layers']
)
self.threshold_estimator = AdaptiveThreshold(
config['threshold_config']
)
async def predict(
self,
data: ProcessedTimeSeries
) -> np.ndarray:
# Generate reconstructions
reconstructions = self.model(data.data)
# Compute reconstruction errors
errors = torch.mean(
torch.square(data.data - reconstructions),
dim=1
)
# Estimate threshold
threshold = self.threshold_estimator.estimate(errors)
return {
'scores': errors.numpy(),
'threshold': threshold
}
3. Real-time Monitoring
class AnomalyMonitor:
def __init__(self, config: Dict[str, Any]):
self.detector = AnomalyDetector(config['detector_config'])
self.alert_manager = AlertManager(config['alert_config'])
self.stream_processor = StreamProcessor(
config['stream_config']
)
async def monitor_stream(
self,
data_stream: AsyncIterator[pd.DataFrame]
) -> AsyncIterator[MonitoringResult]:
async for batch in self.stream_processor.process(data_stream):
# Preprocess batch
processed_batch = await self.preprocessor.preprocess(batch)
# Detect anomalies
anomalies = await self.detector.detect_anomalies(
processed_batch
)
# Handle alerts
if anomalies.has_anomalies():
await self.alert_manager.handle_anomalies(
anomalies,
batch
)
yield MonitoringResult(
timestamp=batch.index[-1],
anomalies=anomalies,
metrics=self._compute_metrics(anomalies)
)
class StreamProcessor:
def __init__(self, config: Dict[str, Any]):
self.window_size = config['window_size']
self.stride = config['stride']
self.buffer = TimeSeriesBuffer(
max_size=config['buffer_size']
)
async def process(
self,
data_stream: AsyncIterator[pd.DataFrame]
) -> AsyncIterator[pd.DataFrame]:
async for data in data_stream:
# Update buffer
await self.buffer.add(data)
# Generate windows
if self.buffer.is_ready():
windows = self.buffer.get_windows(
self.window_size,
self.stride
)
for window in windows:
yield window
4. Anomaly Analysis
class AnomalyAnalyzer:
def __init__(self, config: Dict[str, Any]):
self.root_cause_analyzer = RootCauseAnalyzer(
config['root_cause_config']
)
self.pattern_detector = PatternDetector(
config['pattern_config']
)
self.impact_analyzer = ImpactAnalyzer(
config['impact_config']
)
async def analyze_anomalies(
self,
anomalies: AnomalyResults,
data: ProcessedTimeSeries
) -> AnomalyAnalysis:
# Analyze root causes
root_causes = await self.root_cause_analyzer.analyze(
anomalies,
data
)
# Detect patterns
patterns = await self.pattern_detector.detect_patterns(
anomalies,
data
)
# Analyze impact
impact = await self.impact_analyzer.analyze_impact(
anomalies,
data
)
return AnomalyAnalysis(
root_causes=root_causes,
patterns=patterns,
impact=impact
)
Usage Example
# Initialize anomaly detection system
config = {
'preprocessing_config': {
'decomposition_config': {
'method': 'stl',
'period': 24
},
'feature_config': {
'window_sizes': [12, 24, 48]
}
},
'detector_config': {
'deep_config': {
'input_dim': 24,
'hidden_dim': 64,
'num_layers': 3
},
'ensemble_config': {
'weights': [0.4, 0.3, 0.3]
}
}
}
anomaly_detector = TimeSeriesAnomalyDetection(config)
# Start monitoring
async for result in anomaly_detector.monitor_stream(data_stream):
if result.anomalies.has_critical():
analysis = await anomaly_detector.analyze_anomalies(
result.anomalies,
result.data
)
await handle_critical_anomalies(analysis)