Advanced Sentiment Analysis Pipeline
Enterprise-grade sentiment analysis system implementing transformer models, aspect-based analysis, and multilingual support for natural language processing
System Architecture
A comprehensive sentiment analysis system that combines multiple analysis approaches with real-time processing and contextual understanding.
Core Components
1. Text Processing Pipeline
class SentimentProcessor:
def __init__(self, config: Dict[str, Any]):
self.tokenizer = AutoTokenizer.from_pretrained(
config['model_name']
)
self.preprocessor = TextPreprocessor(config['preprocess_config'])
self.language_detector = LanguageDetector(
config['language_config']
)
async def process_text(
self,
texts: List[str]
) -> ProcessedText:
# Detect languages
languages = await self.language_detector.detect_batch(texts)
# Group by language
language_groups = defaultdict(list)
for text, lang in zip(texts, languages):
language_groups[lang].append(text)
# Process each language group
processed_groups = await asyncio.gather(*[
self._process_language_group(lang, texts)
for lang, texts in language_groups.items()
])
# Combine results
return self._merge_processed_groups(processed_groups)
async def _process_language_group(
self,
language: str,
texts: List[str]
) -> ProcessedGroup:
# Preprocess texts
cleaned_texts = await self.preprocessor.preprocess_batch(
texts,
language
)
# Tokenize
encodings = self.tokenizer(
cleaned_texts,
padding=True,
truncation=True,
max_length=512,
return_tensors='pt'
)
return ProcessedGroup(
language=language,
input_ids=encodings['input_ids'],
attention_mask=encodings['attention_mask']
)
2. Sentiment Analysis Models
class SentimentAnalyzer:
def __init__(self, config: Dict[str, Any]):
self.models = {
lang: self._load_model(lang, config)
for lang in config['languages']
}
self.aspect_extractor = AspectExtractor(
config['aspect_config']
)
self.emotion_detector = EmotionDetector(
config['emotion_config']
)
async def analyze_sentiment(
self,
processed_text: ProcessedText
) -> SentimentResults:
# Analyze each language group
results = await asyncio.gather(*[
self._analyze_language_group(
group,
self.models[group.language]
)
for group in processed_text.groups
])
# Extract aspects
aspects = await self.aspect_extractor.extract(
processed_text
)
# Detect emotions
emotions = await self.emotion_detector.detect(
processed_text
)
return SentimentResults(
sentiments=self._merge_results(results),
aspects=aspects,
emotions=emotions
)
class AspectExtractor:
def __init__(self, config: Dict[str, Any]):
self.model = AspectExtractionModel(
config['model_config']
)
self.aspect_categories = config['categories']
async def extract(
self,
text: ProcessedText
) -> List[AspectSentiment]:
# Extract aspect mentions
aspects = await self.model.extract_aspects(text)
# Classify aspect sentiments
sentiments = await self.model.classify_aspects(
text,
aspects
)
return [
AspectSentiment(aspect, sentiment)
for aspect, sentiment in zip(aspects, sentiments)
]
3. Contextual Analysis
class ContextAnalyzer:
def __init__(self, config: Dict[str, Any]):
self.entity_recognizer = EntityRecognizer(
config['entity_config']
)
self.context_embedder = ContextEmbedder(
config['embedding_config']
)
self.sarcasm_detector = SarcasmDetector(
config['sarcasm_config']
)
async def analyze_context(
self,
text: ProcessedText,
sentiment_results: SentimentResults
) -> ContextualAnalysis:
# Extract entities
entities = await self.entity_recognizer.extract(text)
# Generate contextual embeddings
context_embeddings = await self.context_embedder.embed(
text,
entities
)
# Detect sarcasm
sarcasm_scores = await self.sarcasm_detector.detect(
text,
context_embeddings
)
# Adjust sentiment based on context
adjusted_sentiments = self._adjust_sentiments(
sentiment_results,
sarcasm_scores,
context_embeddings
)
return ContextualAnalysis(
adjusted_sentiments=adjusted_sentiments,
entities=entities,
context_embeddings=context_embeddings
)
4. Sentiment Aggregation
class SentimentAggregator:
def __init__(self, config: Dict[str, Any]):
self.time_aggregator = TimeBasedAggregator(
config['time_config']
)
self.topic_aggregator = TopicAggregator(
config['topic_config']
)
self.trend_analyzer = TrendAnalyzer(
config['trend_config']
)
async def aggregate_sentiments(
self,
results: List[SentimentResults],
metadata: Dict[str, Any]
) -> AggregatedSentiments:
# Aggregate by time
time_aggregation = await self.time_aggregator.aggregate(
results,
metadata['timestamps']
)
# Aggregate by topic
topic_aggregation = await self.topic_aggregator.aggregate(
results,
metadata['topics']
)
# Analyze trends
trends = await self.trend_analyzer.analyze(
time_aggregation,
topic_aggregation
)
return AggregatedSentiments(
time_series=time_aggregation,
topic_distribution=topic_aggregation,
trends=trends
)
Usage Example
# Initialize sentiment analysis system
config = {
'model_config': {
'model_name': 'xlm-roberta-large',
'languages': ['en', 'es', 'fr', 'de'],
'aspect_categories': [
'price', 'quality', 'service', 'location'
]
},
'analysis_config': {
'context_window': 5,
'min_aspect_confidence': 0.7,
'emotion_threshold': 0.6
}
}
sentiment_analyzer = SentimentAnalysis(config)
# Analyze texts
texts = [
"The product quality is excellent but the price is too high!",
"Le service client est vraiment exceptionnel.",
"Die Lieferung war leider sehr spät."
]
results = await sentiment_analyzer.analyze_texts(texts)
# Aggregate results
aggregated = await sentiment_analyzer.aggregate_results(
results,
metadata={
'timestamps': [...],
'topics': [...]
}
)
# Generate insights
insights = await sentiment_analyzer.generate_insights(aggregated)