import HeaderLink from './HeaderLink.astro';

Advanced Recommendation Engine

Scalable recommendation system implementing collaborative filtering, content-based filtering, and deep learning approaches

System Architecture

A production-ready recommendation engine that combines multiple recommendation strategies with real-time processing capabilities.

Core Components

1. Collaborative Filtering Engine

class CollaborativeFilter:
    def __init__(self, config: Dict[str, Any]):
        self.model = NCF(
            num_users=config['num_users'],
            num_items=config['num_items'],
            embedding_dim=config['embedding_dim']
        )
        self.optimizer = torch.optim.Adam(
            self.model.parameters(),
            lr=config['learning_rate']
        )
        
    def train_epoch(
        self,
        dataloader: DataLoader
    ) -> float:
        self.model.train()
        total_loss = 0.0
        
        for batch in dataloader:
            user_ids, item_ids, ratings = batch
            
            self.optimizer.zero_grad()
            predictions = self.model(user_ids, item_ids)
            loss = F.binary_cross_entropy(predictions, ratings)
            
            loss.backward()
            self.optimizer.step()
            
            total_loss += loss.item()
            
        return total_loss / len(dataloader)

class NCF(nn.Module):
    def __init__(
        self,
        num_users: int,
        num_items: int,
        embedding_dim: int
    ):
        super().__init__()
        self.user_embedding = nn.Embedding(num_users, embedding_dim)
        self.item_embedding = nn.Embedding(num_items, embedding_dim)
        
        self.fc_layers = nn.Sequential(
            nn.Linear(embedding_dim * 2, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )
        
    def forward(
        self,
        user_ids: torch.Tensor,
        item_ids: torch.Tensor
    ) -> torch.Tensor:
        user_embeds = self.user_embedding(user_ids)
        item_embeds = self.item_embedding(item_ids)
        
        concat = torch.cat([user_embeds, item_embeds], dim=1)
        return self.fc_layers(concat).squeeze()

2. Content-Based Filtering

class ContentBasedRecommender:
    def __init__(self, config: Dict[str, Any]):
        self.feature_extractor = FeatureExtractor(
            model_name=config['feature_extractor']
        )
        self.index = faiss.IndexFlatL2(config['embedding_dim'])
        self.item_embeddings = {}
        
    def build_item_embeddings(
        self,
        items: List[Dict[str, Any]]
    ) -> None:
        embeddings = []
        for item in items:
            embedding = self.feature_extractor(item['content'])
            self.item_embeddings[item['id']] = embedding
            embeddings.append(embedding)
            
        self.index.add(np.array(embeddings))
        
    def get_recommendations(
        self,
        user_profile: Dict[str, Any],
        k: int = 10
    ) -> List[str]:
        query_vector = self.feature_extractor(user_profile['preferences'])
        distances, indices = self.index.search(
            query_vector.reshape(1, -1),
            k
        )
        return [
            list(self.item_embeddings.keys())[idx]
            for idx in indices[0]
        ]

3. Real-time Processing Engine

class RealtimeRecommender:
    def __init__(self, config: Dict[str, Any]):
        self.redis_client = redis.Redis(
            host=config['redis_host'],
            port=config['redis_port']
        )
        self.update_interval = config['update_interval']
        self.cache_ttl = config['cache_ttl']
        
    async def get_recommendations(
        self,
        user_id: str,
        context: Dict[str, Any]
    ) -> List[Dict[str, Any]]:
        cache_key = f"recommendations:{user_id}"
        
        # Try cache first
        cached = await self.redis_client.get(cache_key)
        if cached:
            return json.loads(cached)
            
        # Generate new recommendations
        recommendations = await self._generate_recommendations(
            user_id,
            context
        )
        
        # Cache results
        await self.redis_client.setex(
            cache_key,
            self.cache_ttl,
            json.dumps(recommendations)
        )
        
        return recommendations

4. Hybrid Recommender

class HybridRecommender:
    def __init__(self, config: Dict[str, Any]):
        self.collaborative = CollaborativeFilter(config['cf_config'])
        self.content_based = ContentBasedRecommender(config['cb_config'])
        self.realtime = RealtimeRecommender(config['rt_config'])
        
        self.weights = {
            'collaborative': 0.4,
            'content_based': 0.3,
            'realtime': 0.3
        }
        
    async def get_recommendations(
        self,
        user_id: str,
        context: Dict[str, Any]
    ) -> List[Dict[str, Any]]:
        # Get recommendations from each system
        results = await asyncio.gather(
            self.collaborative.get_recommendations(user_id),
            self.content_based.get_recommendations(user_id),
            self.realtime.get_recommendations(user_id, context)
        )
        
        # Merge and rank recommendations
        return self._merge_recommendations(results)
        
    def _merge_recommendations(
        self,
        results: List[List[Dict[str, Any]]]
    ) -> List[Dict[str, Any]]:
        merged = defaultdict(float)
        
        for recs, weight in zip(results, self.weights.values()):
            for rank, item in enumerate(recs):
                score = 1.0 / (rank + 1)
                merged[item['id']] += score * weight
                
        return sorted(
            merged.items(),
            key=lambda x: x[1],
            reverse=True
        )

Performance Monitoring

class RecommenderMetrics:
    def __init__(self):
        self.metrics = {
            'precision': self._calculate_precision,
            'recall': self._calculate_recall,
            'ndcg': self._calculate_ndcg,
            'diversity': self._calculate_diversity
        }
        
    def evaluate(
        self,
        predictions: List[str],
        ground_truth: List[str]
    ) -> Dict[str, float]:
        return {
            name: metric(predictions, ground_truth)
            for name, metric in self.metrics.items()
        }
        
    def _calculate_ndcg(
        self,
        predictions: List[str],
        ground_truth: List[str]
    ) -> float:
        relevance = [
            1 if item in ground_truth else 0
            for item in predictions
        ]
        ideal = sorted(relevance, reverse=True)
        return ndcg_score([relevance], [ideal])[0]

Usage Example

# Initialize recommender
config = {
    'cf_config': {
        'num_users': 10000,
        'num_items': 50000,
        'embedding_dim': 64
    },
    'cb_config': {
        'feature_extractor': 'bert-base-uncased',
        'embedding_dim': 768
    },
    'rt_config': {
        'redis_host': 'localhost',
        'redis_port': 6379,
        'update_interval': 300,
        'cache_ttl': 3600
    }
}

recommender = HybridRecommender(config)

# Get recommendations
user_id = "user123"
context = {
    "time": "2024-02-10T15:30:00",
    "location": "New York",
    "device": "mobile"
}

recommendations = await recommender.get_recommendations(user_id, context)

# Evaluate performance
metrics = RecommenderMetrics()
performance = metrics.evaluate(
    [rec['id'] for rec in recommendations],
    ground_truth_items
)

View Source Code | Documentation | Contributing Guidelines