Advanced Anomaly Detection System
Enterprise-grade anomaly detection system using deep learning, statistical methods, and real-time processing for multivariate time series data
System Architecture
A comprehensive anomaly detection system that combines multiple detection strategies with real-time processing and automated response capabilities.
Core Components
1. Deep Learning Detector
class DeepAnomalyDetector:
def __init__(self, config: Dict[str, Any]):
self.model = VariationalAutoencoder(
input_dim=config['input_dim'],
latent_dim=config['latent_dim'],
hidden_layers=config['hidden_layers']
)
self.threshold = self._compute_dynamic_threshold()
self.scaler = RobustScaler()
def detect(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
scaled_data = self.scaler.transform(data)
reconstructed = self.model(scaled_data)
reconstruction_error = np.mean(
np.square(scaled_data - reconstructed),
axis=1
)
return reconstruction_error > self.threshold, reconstruction_error
class VariationalAutoencoder(nn.Module):
def __init__(
self,
input_dim: int,
latent_dim: int,
hidden_layers: List[int]
):
super().__init__()
# Encoder
encoder_layers = []
prev_dim = input_dim
for hidden_dim in hidden_layers:
encoder_layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.ReLU(),
nn.Dropout(0.2)
])
prev_dim = hidden_dim
self.encoder = nn.Sequential(*encoder_layers)
self.fc_mu = nn.Linear(hidden_layers[-1], latent_dim)
self.fc_var = nn.Linear(hidden_layers[-1], latent_dim)
# Decoder
decoder_layers = []
hidden_layers.reverse()
prev_dim = latent_dim
for hidden_dim in hidden_layers:
decoder_layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.ReLU(),
nn.Dropout(0.2)
])
prev_dim = hidden_dim
decoder_layers.append(nn.Linear(hidden_layers[-1], input_dim))
self.decoder = nn.Sequential(*decoder_layers)
def encode(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
hidden = self.encoder(x)
return self.fc_mu(hidden), self.fc_var(hidden)
def reparameterize(
self,
mu: torch.Tensor,
log_var: torch.Tensor
) -> torch.Tensor:
std = torch.exp(0.5 * log_var)
eps = torch.randn_like(std)
return mu + eps * std
def forward(self, x: torch.Tensor) -> torch.Tensor:
mu, log_var = self.encode(x)
z = self.reparameterize(mu, log_var)
return self.decoder(z)
2. Statistical Detector
class StatisticalDetector:
def __init__(self, config: Dict[str, Any]):
self.methods = {
'isolation_forest': IsolationForest(
contamination=config['contamination'],
random_state=42
),
'robust_covariance': EllipticEnvelope(
contamination=config['contamination'],
random_state=42
),
'one_class_svm': OneClassSVM(
kernel='rbf',
nu=config['contamination']
)
}
self.ensemble_weights = config['ensemble_weights']
def fit(self, data: np.ndarray) -> None:
for detector in self.methods.values():
detector.fit(data)
def detect(self, data: np.ndarray) -> np.ndarray:
predictions = np.zeros((len(data), len(self.methods)))
for i, detector in enumerate(self.methods.values()):
predictions[:, i] = detector.predict(data)
# Weighted voting
weighted_pred = np.average(
predictions,
weights=self.ensemble_weights,
axis=1
)
return weighted_pred < 0 # Anomaly is -1, normal is 1
3. Real-time Processing Engine
class RealTimeAnomalyDetector:
def __init__(self, config: Dict[str, Any]):
self.deep_detector = DeepAnomalyDetector(config['deep_config'])
self.statistical_detector = StatisticalDetector(config['stat_config'])
self.stream_processor = StreamProcessor(
window_size=config['window_size'],
stride=config['stride']
)
self.alert_manager = AlertManager(config['alert_config'])
async def process_stream(
self,
data_stream: AsyncIterator[np.ndarray]
) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.stream_processor.process(data_stream):
# Run detectors in parallel
deep_results, stat_results = await asyncio.gather(
self._run_deep_detector(batch),
self._run_statistical_detector(batch)
)
# Combine results
combined_results = self._combine_detector_results(
deep_results,
stat_results
)
# Handle alerts if necessary
if combined_results['is_anomaly']:
await self.alert_manager.handle_anomaly(combined_results)
yield combined_results
async def _run_deep_detector(
self,
batch: np.ndarray
) -> Dict[str, Any]:
is_anomaly, scores = self.deep_detector.detect(batch)
return {
'method': 'deep',
'is_anomaly': is_anomaly,
'scores': scores
}
4. Alert Management
class AlertManager:
def __init__(self, config: Dict[str, Any]):
self.alert_levels = {
'critical': self._handle_critical,
'warning': self._handle_warning,
'info': self._handle_info
}
self.notification_service = NotificationService(
config['notification']
)
self.alert_store = AlertStore(config['storage'])
async def handle_anomaly(
self,
anomaly_data: Dict[str, Any]
) -> None:
severity = self._determine_severity(anomaly_data)
# Store alert
await self.alert_store.store_alert({
'timestamp': datetime.now(),
'severity': severity,
'data': anomaly_data
})
# Handle based on severity
handler = self.alert_levels[severity]
await handler(anomaly_data)
def _determine_severity(
self,
anomaly_data: Dict[str, Any]
) -> str:
score = anomaly_data['scores'].max()
if score > 0.9:
return 'critical'
elif score > 0.7:
return 'warning'
return 'info'
Usage Example
# Initialize detector
config = {
'deep_config': {
'input_dim': 100,
'latent_dim': 20,
'hidden_layers': [64, 32]
},
'stat_config': {
'contamination': 0.1,
'ensemble_weights': [0.4, 0.3, 0.3]
},
'window_size': 100,
'stride': 10,
'alert_config': {
'notification': {
'email': ['alerts@company.com'],
'slack_webhook': 'https://hooks.slack.com/...'
},
'storage': {
'type': 'elasticsearch',
'host': 'localhost',
'port': 9200
}
}
}
detector = RealTimeAnomalyDetector(config)
# Process data stream
async for result in detector.process_stream(data_stream):
if result['is_anomaly']:
print(f"Anomaly detected: {result}")