Enterprise A/B Testing Framework
Production-grade A/B testing system implementing sequential analysis, multi-armed bandits, and automated experimentation pipelines
System Architecture
A comprehensive A/B testing framework that supports advanced experimentation techniques, statistical analysis, and automated decision-making.
Core Components
1. Experiment Manager
class ExperimentManager:
def __init__(self, config: Dict[str, Any]):
self.store = ExperimentStore(config['store_config'])
self.validator = ExperimentValidator(config['validation_rules'])
self.sampler = TrafficSampler(config['sampling_config'])
self.monitor = ExperimentMonitor(config['monitoring_config'])
async def create_experiment(
self,
experiment_config: ExperimentConfig
) -> str:
# Validate experiment configuration
await self.validator.validate(experiment_config)
# Generate experiment ID and store metadata
experiment_id = await self.store.create_experiment(
experiment_config
)
# Initialize monitoring
await self.monitor.initialize_experiment(
experiment_id,
experiment_config
)
return experiment_id
async def get_variant(
self,
experiment_id: str,
user_id: str,
context: Dict[str, Any]
) -> str:
experiment = await self.store.get_experiment(experiment_id)
if not experiment.is_active:
return experiment.default_variant
return await self.sampler.get_variant(
experiment,
user_id,
context
)
class ExperimentConfig:
def __init__(
self,
name: str,
variants: List[str],
metrics: List[Metric],
population: PopulationSpec,
hypothesis: Hypothesis,
min_sample_size: int,
max_duration: timedelta
):
self.name = name
self.variants = variants
self.metrics = metrics
self.population = population
self.hypothesis = hypothesis
self.min_sample_size = min_sample_size
self.max_duration = max_duration
2. Statistical Analysis Engine
class StatisticalAnalyzer:
def __init__(self, config: Dict[str, Any]):
self.analyzers = {
'frequentist': FrequentistAnalyzer(),
'bayesian': BayesianAnalyzer(),
'sequential': SequentialAnalyzer()
}
self.effect_size_calculator = EffectSizeCalculator()
async def analyze_experiment(
self,
experiment_id: str,
data: pd.DataFrame
) -> AnalysisResults:
experiment = await self.store.get_experiment(experiment_id)
# Run all relevant analyses
results = {}
for analyzer_name, analyzer in self.analyzers.items():
if analyzer_name in experiment.analysis_methods:
results[analyzer_name] = await analyzer.analyze(
data,
experiment.hypothesis
)
# Calculate effect sizes
effect_sizes = await self.effect_size_calculator.calculate(
data,
experiment.metrics
)
return AnalysisResults(
statistical_results=results,
effect_sizes=effect_sizes
)
class BayesianAnalyzer:
def __init__(self):
self.prior_generator = PriorGenerator()
self.mcmc_sampler = MCMCSampler()
async def analyze(
self,
data: pd.DataFrame,
hypothesis: Hypothesis
) -> BayesianResults:
# Generate priors
priors = self.prior_generator.generate_priors(data)
# Run MCMC sampling
posterior_samples = await self.mcmc_sampler.sample(
data,
priors,
num_samples=10000
)
# Calculate probabilities
prob_superior = self._calculate_superiority_probability(
posterior_samples
)
return BayesianResults(
posterior_samples=posterior_samples,
prob_superior=prob_superior
)
3. Multi-Armed Bandit System
class BanditOptimizer:
def __init__(self, config: Dict[str, Any]):
self.algorithms = {
'thompson': ThompsonSampling(),
'ucb': UpperConfidenceBound(),
'eps_greedy': EpsilonGreedy(config['epsilon'])
}
self.reward_calculator = RewardCalculator()
async def get_variant(
self,
experiment_id: str,
context: Dict[str, Any]
) -> str:
experiment = await self.store.get_experiment(experiment_id)
algorithm = self.algorithms[experiment.bandit_algorithm]
# Get current statistics
stats = await self.store.get_experiment_stats(experiment_id)
# Select variant using bandit algorithm
selected_variant = algorithm.select_arm(
stats,
context
)
return selected_variant
async def update_statistics(
self,
experiment_id: str,
variant: str,
outcome: float
) -> None:
await self.store.update_variant_stats(
experiment_id,
variant,
outcome
)
class ThompsonSampling:
def select_arm(
self,
stats: Dict[str, VariantStats],
context: Dict[str, Any]
) -> str:
samples = {}
for variant, stat in stats.items():
samples[variant] = np.random.beta(
stat.successes + 1,
stat.failures + 1
)
return max(samples.items(), key=lambda x: x[1])[0]
4. Automated Decision Making
class DecisionMaker:
def __init__(self, config: Dict[str, Any]):
self.decision_rules = DecisionRules(config['rules'])
self.notifier = DecisionNotifier(config['notification'])
async def make_decision(
self,
experiment_id: str,
analysis_results: AnalysisResults
) -> ExperimentDecision:
# Apply decision rules
decision = await self.decision_rules.evaluate(
analysis_results
)
if decision.is_conclusive:
# Notify stakeholders
await self.notifier.notify_decision(
experiment_id,
decision
)
# Update experiment status
await self.store.update_experiment_status(
experiment_id,
decision.status
)
return decision
Usage Example
# Initialize A/B testing framework
config = {
'store_config': {
'database_url': 'postgresql://localhost:5432/ab_testing'
},
'sampling_config': {
'strategy': 'deterministic',
'seed': 42
},
'monitoring_config': {
'metrics_store': 'prometheus',
'alert_threshold': 0.1
}
}
ab_testing = ABTestingFramework(config)
# Create new experiment
experiment_config = ExperimentConfig(
name='new_recommendation_algorithm',
variants=['control', 'treatment'],
metrics=[
Metric('click_through_rate', 'ratio'),
Metric('conversion_rate', 'ratio')
],
population=PopulationSpec(
target_size=10000,
segments=['mobile_users', 'desktop_users']
),
hypothesis=Hypothesis(
metric='click_through_rate',
min_detectable_effect=0.1,
confidence_level=0.95
),
min_sample_size=5000,
max_duration=timedelta(days=14)
)
experiment_id = await ab_testing.create_experiment(experiment_config)
# Get variant for user
variant = await ab_testing.get_variant(
experiment_id,
user_id='user_123',
context={'platform': 'mobile'}
)
# Record outcome
await ab_testing.record_outcome(
experiment_id,
user_id='user_123',
variant=variant,
metrics={
'click_through_rate': 1.0,
'conversion_rate': 0.5
}
)
# Analyze results
results = await ab_testing.analyze_experiment(experiment_id)