Enterprise Feature Store System
Production-grade feature store implementing online/offline feature serving, versioning, and automated feature computation for machine learning
System Architecture
A scalable feature store system that provides unified feature management, serving, and computation for machine learning applications.
Core Components
1. Feature Registry
class FeatureRegistry:
def __init__(self, config: Dict[str, Any]):
self.metadata_store = MetadataStore(config['metadata_config'])
self.validator = FeatureValidator(config['validation_rules'])
self.versioner = FeatureVersioner()
async def register_feature(
self,
feature_def: FeatureDefinition
) -> str:
# Validate feature definition
await self.validator.validate(feature_def)
# Generate feature version
version = self.versioner.generate_version(feature_def)
# Store metadata
feature_id = await self.metadata_store.store_feature(
feature_def,
version
)
return feature_id
async def get_feature_info(
self,
feature_id: str,
version: Optional[str] = None
) -> FeatureInfo:
return await self.metadata_store.get_feature(
feature_id,
version
)
class FeatureDefinition:
def __init__(
self,
name: str,
entity: str,
value_type: str,
transformation: Transform,
dependencies: List[str],
ttl: Optional[timedelta] = None
):
self.name = name
self.entity = entity
self.value_type = value_type
self.transformation = transformation
self.dependencies = dependencies
self.ttl = ttl
2. Feature Computation Engine
class FeatureComputer:
def __init__(self, config: Dict[str, Any]):
self.executor = SparkExecutor(config['spark_config'])
self.cache = FeatureCache(config['cache_config'])
self.scheduler = ComputationScheduler(
config['schedule_config']
)
async def compute_features(
self,
feature_ids: List[str],
entity_keys: List[str]
) -> pd.DataFrame:
# Get computation graph
graph = await self._build_computation_graph(feature_ids)
# Check cache
cached_features = await self.cache.get_features(
feature_ids,
entity_keys
)
# Compute missing features
missing_features = await self._compute_missing_features(
graph,
cached_features,
entity_keys
)
# Merge and return results
return pd.concat([cached_features, missing_features])
async def _compute_missing_features(
self,
graph: ComputationGraph,
cached: pd.DataFrame,
keys: List[str]
) -> pd.DataFrame:
# Generate Spark job
job = self.executor.create_job(graph)
# Execute computation
result = await job.execute(keys)
# Cache results
await self.cache.store_features(result)
return result
3. Feature Serving System
class FeatureServer:
def __init__(self, config: Dict[str, Any]):
self.online_store = RedisFeatureStore(
config['online_store_config']
)
self.offline_store = S3FeatureStore(
config['offline_store_config']
)
self.router = FeatureRouter(config['routing_config'])
async def get_online_features(
self,
feature_ids: List[str],
entity_key: str
) -> Dict[str, Any]:
# Route request
store = self.router.route_request(feature_ids)
if store == 'online':
return await self.online_store.get_features(
feature_ids,
entity_key
)
else:
return await self.offline_store.get_features(
feature_ids,
entity_key
)
async def get_batch_features(
self,
feature_ids: List[str],
entity_keys: List[str]
) -> pd.DataFrame:
return await self.offline_store.get_batch_features(
feature_ids,
entity_keys
)
class RedisFeatureStore:
def __init__(self, config: Dict[str, Any]):
self.redis = aioredis.Redis(
host=config['host'],
port=config['port']
)
self.serializer = FeatureSerializer()
async def get_features(
self,
feature_ids: List[str],
entity_key: str
) -> Dict[str, Any]:
keys = [
f"{entity_key}:{feature_id}"
for feature_id in feature_ids
]
values = await self.redis.mget(keys)
return {
feature_id: self.serializer.deserialize(value)
for feature_id, value in zip(feature_ids, values)
if value is not None
}
4. Feature Pipeline Manager
class FeaturePipelineManager:
def __init__(self, config: Dict[str, Any]):
self.dag_generator = DAGGenerator()
self.airflow_client = AirflowClient(
config['airflow_config']
)
self.monitor = PipelineMonitor(
config['monitoring_config']
)
async def create_pipeline(
self,
feature_ids: List[str]
) -> str:
# Generate computation DAG
dag = await self.dag_generator.generate_dag(feature_ids)
# Deploy to Airflow
pipeline_id = await self.airflow_client.deploy_dag(dag)
# Setup monitoring
await self.monitor.setup_pipeline_monitoring(
pipeline_id,
feature_ids
)
return pipeline_id
async def get_pipeline_status(
self,
pipeline_id: str
) -> PipelineStatus:
status = await self.airflow_client.get_dag_status(
pipeline_id
)
metrics = await self.monitor.get_pipeline_metrics(
pipeline_id
)
return PipelineStatus(status, metrics)
Usage Example
# Initialize feature store
config = {
'metadata_config': {
'database_url': 'postgresql://localhost:5432/feature_store'
},
'online_store_config': {
'host': 'localhost',
'port': 6379
},
'offline_store_config': {
'bucket': 'feature-store',
'region': 'us-west-2'
},
'compute_config': {
'spark_master': 'spark://localhost:7077',
'executor_memory': '4g'
}
}
feature_store = FeatureStore(config)
# Register new feature
feature_def = FeatureDefinition(
name='user_purchase_30d',
entity='user',
value_type='float',
transformation=WindowedAggregation(
agg_func='sum',
window='30d'
),
dependencies=['purchase_amount']
)
feature_id = await feature_store.register_feature(feature_def)
# Create feature pipeline
pipeline_id = await feature_store.create_pipeline([feature_id])
# Get online features
features = await feature_store.get_online_features(
[feature_id],
'user_123'
)
# Get batch features
batch_features = await feature_store.get_batch_features(
[feature_id],
['user_123', 'user_456']
)