Advanced Text Classification Pipeline
Enterprise-grade text classification system implementing transformer models, multi-task learning, and automated model training for natural language processing
System Architecture
A comprehensive text classification system that combines state-of-the-art language models with automated training and deployment pipelines.
Core Components
1. Text Processing Pipeline
class TextProcessor:
def __init__(self, config: Dict[str, Any]):
self.tokenizer = AutoTokenizer.from_pretrained(
config['model_name']
)
self.cleaner = TextCleaner(config['cleaning_config'])
self.augmenter = TextAugmenter(config['augmentation_config'])
async def process_text(
self,
texts: List[str],
augment: bool = False
) -> ProcessedText:
# Clean texts
cleaned_texts = await self.cleaner.clean_batch(texts)
# Augment if requested
if augment:
augmented_texts = await self.augmenter.augment_batch(
cleaned_texts
)
cleaned_texts.extend(augmented_texts)
# Tokenize
encodings = self.tokenizer(
cleaned_texts,
padding=True,
truncation=True,
max_length=512,
return_tensors='pt'
)
return ProcessedText(
input_ids=encodings['input_ids'],
attention_mask=encodings['attention_mask'],
original_texts=texts
)
class TextCleaner:
def __init__(self, config: Dict[str, Any]):
self.processors = [
URLProcessor(),
EmailProcessor(),
HTMLProcessor(),
SpecialCharsProcessor(),
WhitespaceProcessor()
]
self.language_detector = LanguageDetector()
async def clean_batch(
self,
texts: List[str]
) -> List[str]:
cleaned_texts = []
for text in texts:
# Detect language
lang = self.language_detector.detect(text)
# Apply processors sequentially
cleaned_text = text
for processor in self.processors:
cleaned_text = await processor.process(
cleaned_text,
lang
)
cleaned_texts.append(cleaned_text)
return cleaned_texts
2. Model Architecture
class TextClassifier(nn.Module):
def __init__(self, config: Dict[str, Any]):
super().__init__()
self.encoder = AutoModel.from_pretrained(
config['model_name']
)
self.dropout = nn.Dropout(config['dropout_rate'])
self.classifiers = nn.ModuleDict({
task: self._build_classifier(
config['hidden_size'],
num_classes
)
for task, num_classes in config['tasks'].items()
})
def forward(
self,
input_ids: torch.Tensor,
attention_mask: torch.Tensor,
task: str
) -> torch.Tensor:
# Get encoder outputs
outputs = self.encoder(
input_ids=input_ids,
attention_mask=attention_mask
)
# Pool and classify
pooled = self._pool_outputs(outputs, attention_mask)
pooled = self.dropout(pooled)
return self.classifiers[task](pooled)
def _build_classifier(
self,
hidden_size: int,
num_classes: int
) -> nn.Module:
return nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_size // 2, num_classes)
)
3. Training Pipeline
class TrainingPipeline:
def __init__(self, config: Dict[str, Any]):
self.model = TextClassifier(config['model_config'])
self.optimizer = self._setup_optimizer(config['optimizer_config'])
self.scheduler = self._setup_scheduler(config['scheduler_config'])
self.trainer = Trainer(config['trainer_config'])
async def train(
self,
train_data: Dataset,
val_data: Dataset
) -> TrainingResults:
# Setup training
train_loader = self._create_dataloader(
train_data,
shuffle=True
)
val_loader = self._create_dataloader(
val_data,
shuffle=False
)
# Train model
results = await self.trainer.train(
model=self.model,
train_loader=train_loader,
val_loader=val_loader,
optimizer=self.optimizer,
scheduler=self.scheduler
)
return results
class Trainer:
def __init__(self, config: Dict[str, Any]):
self.device = torch.device(config['device'])
self.num_epochs = config['num_epochs']
self.metrics = MetricsCalculator(config['metrics'])
self.early_stopping = EarlyStopping(config['patience'])
async def train(
self,
model: nn.Module,
train_loader: DataLoader,
val_loader: DataLoader,
optimizer: Optimizer,
scheduler: LRScheduler
) -> TrainingResults:
model = model.to(self.device)
best_model = None
best_score = float('-inf')
for epoch in range(self.num_epochs):
# Training phase
train_metrics = await self._train_epoch(
model,
train_loader,
optimizer
)
# Validation phase
val_metrics = await self._validate_epoch(
model,
val_loader
)
# Update scheduler
scheduler.step(val_metrics['val_loss'])
# Check for improvement
if val_metrics['val_score'] > best_score:
best_score = val_metrics['val_score']
best_model = copy.deepcopy(model)
# Early stopping check
if self.early_stopping.should_stop(val_metrics['val_loss']):
break
return TrainingResults(
model=best_model,
metrics=val_metrics
)
4. Inference Pipeline
class InferencePipeline:
def __init__(self, config: Dict[str, Any]):
self.processor = TextProcessor(config['processor_config'])
self.model = self._load_model(config['model_path'])
self.cache = PredictionCache(config['cache_config'])
async def classify(
self,
texts: List[str],
tasks: List[str]
) -> Dict[str, List[Prediction]]:
# Check cache
cache_hits, texts_to_process = await self.cache.get_predictions(
texts
)
if texts_to_process:
# Process new texts
processed = await self.processor.process_text(
texts_to_process
)
# Get predictions
predictions = {}
for task in tasks:
task_preds = await self._get_predictions(
processed,
task
)
predictions[task] = task_preds
# Update cache
await self.cache.store_predictions(
texts_to_process,
predictions
)
# Merge with cache hits
predictions = self._merge_predictions(
cache_hits,
predictions
)
return predictions
Usage Example
# Initialize classification system
config = {
'model_config': {
'model_name': 'bert-base-uncased',
'tasks': {
'sentiment': 3,
'topic': 10,
'intent': 5
}
},
'training_config': {
'batch_size': 32,
'learning_rate': 2e-5,
'num_epochs': 10
}
}
classifier = TextClassificationSystem(config)
# Train model
training_results = await classifier.train(train_data, val_data)
# Make predictions
texts = [
"The product quality is excellent!",
"How do I reset my password?",
"Looking for technical documentation"
]
predictions = await classifier.classify(
texts=texts,
tasks=['sentiment', 'intent']
)