Machine Learning Trading System
A comprehensive implementation of an autonomous trading system using machine learning, real-time data processing, and advanced risk management
System Architecture
This project implements a production-grade autonomous trading system using machine learning models, real-time market data processing, and sophisticated risk management strategies.
Core Components
1. Data Pipeline
class MarketDataPipeline:
def __init__(self, config: Dict[str, Any]):
self.data_sources = {
'primary': AlpacaMarketDataAPI(config['alpaca_key']),
'secondary': YFinanceAPI(),
'sentiment': NewsAPIClient(config['news_api_key'])
}
self.preprocessor = DataPreprocessor(
feature_config=config['features'],
normalization=config['normalization']
)
async def fetch_market_data(self, symbols: List[str]) -> pd.DataFrame:
tasks = []
for source in self.data_sources.values():
tasks.append(source.fetch_data(symbols))
raw_data = await asyncio.gather(*tasks)
return self.preprocessor.process(self.merge_data(raw_data))
def merge_data(self, data_list: List[pd.DataFrame]) -> pd.DataFrame:
# Custom logic to merge data from different sources
pass
2. Feature Engineering
class FeatureEngineer:
def __init__(self):
self.technical_indicators = {
'momentum': self._calculate_momentum,
'volatility': self._calculate_volatility,
'trend': self._calculate_trend
}
self.market_features = MarketFeatureExtractor()
self.sentiment_analyzer = SentimentAnalyzer()
def generate_features(self, data: pd.DataFrame) -> pd.DataFrame:
features = data.copy()
# Technical indicators
for indicator in self.technical_indicators.values():
features = indicator(features)
# Market microstructure features
features = self.market_features.extract(features)
# Sentiment features
features = self.sentiment_analyzer.add_sentiment_features(features)
return features
def _calculate_momentum(self, data: pd.DataFrame) -> pd.DataFrame:
return ta.momentum.rsi(data['close'], window=14)
3. Model Architecture
class MLTradingModel(nn.Module):
def __init__(self, input_dim: int, hidden_dim: int):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_dim,
hidden_size=hidden_dim,
num_layers=2,
dropout=0.2,
batch_first=True
)
self.attention = MultiHeadAttention(hidden_dim, 4)
self.fc = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim // 2, 3) # Buy, Sell, Hold
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
lstm_out, _ = self.lstm(x)
attention_out = self.attention(lstm_out)
return self.fc(attention_out[:, -1, :])
4. Risk Management
class RiskManager:
def __init__(self, config: Dict[str, Any]):
self.position_limits = config['position_limits']
self.var_calculator = ValueAtRiskCalculator(
confidence_level=0.99,
time_horizon='1d'
)
self.portfolio_manager = PortfolioManager(
max_leverage=config['max_leverage'],
margin_requirements=config['margin_reqs']
)
def validate_trade(self, trade: Trade) -> bool:
checks = [
self._check_position_limits(trade),
self._check_var_limits(trade),
self._check_portfolio_constraints(trade)
]
return all(checks)
def _check_var_limits(self, trade: Trade) -> bool:
portfolio_var = self.var_calculator.compute_var(
self.portfolio_manager.get_portfolio()
)
new_var = self.var_calculator.compute_var_with_trade(
self.portfolio_manager.get_portfolio(), trade
)
return new_var <= self.position_limits['var_limit']
5. Execution Engine
class ExecutionEngine:
def __init__(self, broker_api: BrokerAPI):
self.broker = broker_api
self.order_manager = OrderManager()
self.execution_optimizer = ExecutionOptimizer(
strategy='twap', # Time-Weighted Average Price
max_participation_rate=0.1
)
async def execute_trade(self, trade: Trade) -> OrderStatus:
optimized_orders = self.execution_optimizer.split_order(trade)
execution_tasks = []
for order in optimized_orders:
execution_tasks.append(self._execute_single_order(order))
results = await asyncio.gather(*execution_tasks)
return self.order_manager.aggregate_results(results)
async def _execute_single_order(self, order: Order) -> OrderResult:
try:
return await self.broker.place_order(order)
except BrokerAPIError as e:
self.order_manager.handle_error(e)
raise
Performance Monitoring
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'returns': self._calculate_returns,
'sharpe': self._calculate_sharpe,
'drawdown': self._calculate_drawdown,
'win_rate': self._calculate_win_rate
}
self.risk_metrics = RiskMetricsCalculator()
def generate_report(self, portfolio_history: pd.DataFrame) -> Dict[str, float]:
report = {}
for metric_name, metric_func in self.metrics.items():
report[metric_name] = metric_func(portfolio_history)
report.update(self.risk_metrics.calculate_all(portfolio_history))
return report
Backtesting Framework
class Backtester:
def __init__(self,
model: MLTradingModel,
data_pipeline: MarketDataPipeline,
risk_manager: RiskManager):
self.model = model
self.data_pipeline = data_pipeline
self.risk_manager = risk_manager
self.performance_monitor = PerformanceMonitor()
def run_backtest(self,
start_date: datetime,
end_date: datetime,
initial_capital: float) -> BacktestResults:
portfolio = Portfolio(initial_capital)
trades = []
for date in self._date_range(start_date, end_date):
data = self.data_pipeline.get_historical_data(date)
signals = self.model.predict(data)
for signal in signals:
if self.risk_manager.validate_trade(signal):
trade = self._execute_trade(signal, portfolio)
trades.append(trade)
portfolio.update(date)
return BacktestResults(
trades=trades,
portfolio_history=portfolio.history,
metrics=self.performance_monitor.generate_report(portfolio.history)
)
Production Deployment
-
Infrastructure Setup
- AWS infrastructure using Terraform
- Kubernetes cluster for scalability
- Monitoring with Prometheus/Grafana
- Log aggregation with ELK stack
-
CI/CD Pipeline
name: ML Trading System CI/CD on: push: branches: [ main ] pull_request: branches: [ main ] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Run tests run: | python -m pytest tests/ python -m pylint src/ deploy: needs: test runs-on: ubuntu-latest steps: - name: Deploy to Kubernetes run: | kubectl apply -f k8s/
System Requirements
-
Hardware
- Minimum 32GB RAM
- GPU for model training
- Low-latency network connection
- SSD storage for data
-
Software
- Python 3.8+
- PyTorch/NumPy/Pandas
- Redis for caching
- PostgreSQL for storage
Getting Started
- Clone the repository
- Install dependencies:
pip install -r requirements.txt
- Configure API keys in
.env
- Run tests:
pytest tests/
- Start the system:
python src/main.py