Advanced Recommendation Engine
Scalable recommendation system implementing collaborative filtering, content-based filtering, and deep learning approaches
System Architecture
A production-ready recommendation engine that combines multiple recommendation strategies with real-time processing capabilities.
Core Components
1. Collaborative Filtering Engine
class CollaborativeFilter:
def __init__(self, config: Dict[str, Any]):
self.model = NCF(
num_users=config['num_users'],
num_items=config['num_items'],
embedding_dim=config['embedding_dim']
)
self.optimizer = torch.optim.Adam(
self.model.parameters(),
lr=config['learning_rate']
)
def train_epoch(
self,
dataloader: DataLoader
) -> float:
self.model.train()
total_loss = 0.0
for batch in dataloader:
user_ids, item_ids, ratings = batch
self.optimizer.zero_grad()
predictions = self.model(user_ids, item_ids)
loss = F.binary_cross_entropy(predictions, ratings)
loss.backward()
self.optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
class NCF(nn.Module):
def __init__(
self,
num_users: int,
num_items: int,
embedding_dim: int
):
super().__init__()
self.user_embedding = nn.Embedding(num_users, embedding_dim)
self.item_embedding = nn.Embedding(num_items, embedding_dim)
self.fc_layers = nn.Sequential(
nn.Linear(embedding_dim * 2, 64),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Sigmoid()
)
def forward(
self,
user_ids: torch.Tensor,
item_ids: torch.Tensor
) -> torch.Tensor:
user_embeds = self.user_embedding(user_ids)
item_embeds = self.item_embedding(item_ids)
concat = torch.cat([user_embeds, item_embeds], dim=1)
return self.fc_layers(concat).squeeze()
2. Content-Based Filtering
class ContentBasedRecommender:
def __init__(self, config: Dict[str, Any]):
self.feature_extractor = FeatureExtractor(
model_name=config['feature_extractor']
)
self.index = faiss.IndexFlatL2(config['embedding_dim'])
self.item_embeddings = {}
def build_item_embeddings(
self,
items: List[Dict[str, Any]]
) -> None:
embeddings = []
for item in items:
embedding = self.feature_extractor(item['content'])
self.item_embeddings[item['id']] = embedding
embeddings.append(embedding)
self.index.add(np.array(embeddings))
def get_recommendations(
self,
user_profile: Dict[str, Any],
k: int = 10
) -> List[str]:
query_vector = self.feature_extractor(user_profile['preferences'])
distances, indices = self.index.search(
query_vector.reshape(1, -1),
k
)
return [
list(self.item_embeddings.keys())[idx]
for idx in indices[0]
]
3. Real-time Processing Engine
class RealtimeRecommender:
def __init__(self, config: Dict[str, Any]):
self.redis_client = redis.Redis(
host=config['redis_host'],
port=config['redis_port']
)
self.update_interval = config['update_interval']
self.cache_ttl = config['cache_ttl']
async def get_recommendations(
self,
user_id: str,
context: Dict[str, Any]
) -> List[Dict[str, Any]]:
cache_key = f"recommendations:{user_id}"
# Try cache first
cached = await self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Generate new recommendations
recommendations = await self._generate_recommendations(
user_id,
context
)
# Cache results
await self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(recommendations)
)
return recommendations
4. Hybrid Recommender
class HybridRecommender:
def __init__(self, config: Dict[str, Any]):
self.collaborative = CollaborativeFilter(config['cf_config'])
self.content_based = ContentBasedRecommender(config['cb_config'])
self.realtime = RealtimeRecommender(config['rt_config'])
self.weights = {
'collaborative': 0.4,
'content_based': 0.3,
'realtime': 0.3
}
async def get_recommendations(
self,
user_id: str,
context: Dict[str, Any]
) -> List[Dict[str, Any]]:
# Get recommendations from each system
results = await asyncio.gather(
self.collaborative.get_recommendations(user_id),
self.content_based.get_recommendations(user_id),
self.realtime.get_recommendations(user_id, context)
)
# Merge and rank recommendations
return self._merge_recommendations(results)
def _merge_recommendations(
self,
results: List[List[Dict[str, Any]]]
) -> List[Dict[str, Any]]:
merged = defaultdict(float)
for recs, weight in zip(results, self.weights.values()):
for rank, item in enumerate(recs):
score = 1.0 / (rank + 1)
merged[item['id']] += score * weight
return sorted(
merged.items(),
key=lambda x: x[1],
reverse=True
)
Performance Monitoring
class RecommenderMetrics:
def __init__(self):
self.metrics = {
'precision': self._calculate_precision,
'recall': self._calculate_recall,
'ndcg': self._calculate_ndcg,
'diversity': self._calculate_diversity
}
def evaluate(
self,
predictions: List[str],
ground_truth: List[str]
) -> Dict[str, float]:
return {
name: metric(predictions, ground_truth)
for name, metric in self.metrics.items()
}
def _calculate_ndcg(
self,
predictions: List[str],
ground_truth: List[str]
) -> float:
relevance = [
1 if item in ground_truth else 0
for item in predictions
]
ideal = sorted(relevance, reverse=True)
return ndcg_score([relevance], [ideal])[0]
Usage Example
# Initialize recommender
config = {
'cf_config': {
'num_users': 10000,
'num_items': 50000,
'embedding_dim': 64
},
'cb_config': {
'feature_extractor': 'bert-base-uncased',
'embedding_dim': 768
},
'rt_config': {
'redis_host': 'localhost',
'redis_port': 6379,
'update_interval': 300,
'cache_ttl': 3600
}
}
recommender = HybridRecommender(config)
# Get recommendations
user_id = "user123"
context = {
"time": "2024-02-10T15:30:00",
"location": "New York",
"device": "mobile"
}
recommendations = await recommender.get_recommendations(user_id, context)
# Evaluate performance
metrics = RecommenderMetrics()
performance = metrics.evaluate(
[rec['id'] for rec in recommendations],
ground_truth_items
)