Code Examples
Selected code snippets and implementations from my quantitative research projects. These represent my approach to systematic trading, volatility analysis, and financial modeling.
EMA Crossover with Heikin-Ashi
Implementation of a systematic trading strategy using Heikin-Ashi candlesticks and EMA crossovers with gap pullback exits.
import pandas as pd
import numpy as np
def calculate_heikin_ashi(df):
"""Convert OHLC data to Heikin-Ashi candles."""
ha_close = (df['Open'] + df['High'] + df['Low'] + df['Close']) / 4
ha_open = (df['Open'].iloc[0] + ha_close.iloc[0]) / 2
for i in range(1, len(df)):
ha_open = (ha_open.iloc[i-1] + ha_close.iloc[i-1]) / 2
ha_high = pd.concat([df['High'], ha_open, ha_close], axis=1).max(axis=1)
ha_low = pd.concat([df['Low'], ha_open, ha_close], axis=1).min(axis=1)
return pd.DataFrame({
'Open': ha_open,
'High': ha_high,
'Low': ha_low,
'Close': ha_close
})
def ema_crossover_signals(df, fast=12, slow=26):
"""Generate EMA crossover signals."""
df['EMA_fast'] = df['Close'].ewm(span=fast, adjust=False).mean()
df['EMA_slow'] = df['Close'].ewm(span=slow, adjust=False).mean()
df['Signal'] = 0
df.loc[df['EMA_fast'] > df['EMA_slow'], 'Signal'] = 1
df.loc[df['EMA_fast'] < df['EMA_slow'], 'Signal'] = -1
df['Position'] = df['Signal'].diff()
return df
def gap_pullback_exit(df, window=5):
"""Exit when price pulls back from peak by gap percentage."""
df['Peak'] = df['Close'].cummax()
df['Drawdown'] = (df['Peak'] - df['Close']) / df['Peak']
df['Exit_Signal'] = df['Drawdown'] > 0.02 # 2% pullback
return dfIV Surface Data Collection
Framework for collecting and processing implied volatility surface data using the ivolatility API.
import requests
import pandas as pd
from datetime import datetime, timedelta
class IVSurfaceCollector:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.ivolatility.com"
def get_options_data(self, ticker, expiration_range=(30, 90)):
"""Collect options data for specified expiration range."""
params = {
"api_key": self.api_key,
"ticker": ticker,
"min_expiration": expiration_range[0],
"max_expiration": expiration_range[1],
"include_underlying": True
}
response = requests.get(
f"{self.base_url}/options/ivsurface",
params=params
)
return self._process_response(response.json())
def calculate_iv_surface(self, options_data):
"""Convert raw options data to IV surface structure."""
# Group by expiration and calculate IV for each strike
surfaces = {}
for exp_date, group in options_data.groupby('expiration'):
strikes = group['strike'].values
ivs = group['implied_volatility'].values
# Fit polynomial to IV curve
coeffs = np.polyfit(strikes, ivs, 2)
surfaces[exp_date] = {
'strikes': strikes,
'ivs': ivs,
'polynomial': coeffs
}
return surfaces
def identify_skew_patterns(self, surface):
"""Analyze IV surface for skew patterns."""
# Calculate skew as IV slope across strikes
if len(surface['strikes']) < 3:
return {'pattern': 'insufficient_data'}
# Fit linear regression to IV curve
from scipy import stats
slope, intercept, r_value, p_value, std_err = stats.linregress(surface['strikes'], surface['ivs'])
if slope < -0.01:
return {'pattern': 'put_skew', 'strength': abs(slope)}
elif slope > 0.01:
return {'pattern': 'call_skew', 'strength': abs(slope)}
else:
return {'pattern': 'flat', 'strength': 0}Monte Carlo Yield Curve Simulation
Monte Carlo simulation framework for yield curve modeling and forward rate curve construction.
import numpy as np
import pandas as pd
from scipy.interpolate import CubicSpline
class YieldCurveSimulator:
def __init__(self, initial_curve, num_simulations=10000):
self.initial_curve = initial_curve # (maturities, yields)
self.num_simulations = num_simulations
self.cs = CubicSpline(
initial_curve[0],
initial_curve[1]
)
def bootstrap_curve(self, bond_data):
"""Bootstrap yield curve from bond prices."""
sorted_bonds = bond_data.sort_values('maturity')
yields = []
for _, bond in sorted_bonds.iterrows():
# Simplified bootstrap logic
price = bond['price']
coupon = bond['coupon']
maturity = bond['maturity']
face_value = bond['face_value']
# Calculate yield to maturity
# (simplified - actual implementation more complex)
ytm = ((face_value + coupon) / price) ** (1/maturity) - 1
yields.append(ytm)
return np.array([sorted_bonds['maturity'].values, yields])
def simulate_forward_rates(self, years=30, dt=0.25):
"""Simulate forward rate paths using Ho-Lee model."""
num_steps = int(years / dt)
dt_sqrt = np.sqrt(dt)
# Initialize rates matrix
rates = np.zeros((self.num_simulations, num_steps))
rates[:, 0] = self.cs(dt)
# Random shocks
dW = np.random.normal(0, dt_sqrt,
(self.num_simulations, num_steps - 1))
# Mean reversion parameter
theta = 0.01
sigma = 0.01
for t in range(1, num_steps):
# Ho-Lee short rate dynamics
dr = theta * dt + sigma * dW[:, t-1]
rates[:, t] = rates[:, t-1] + dr
rates[:, t] = np.maximum(rates[:, t], 0.001) # Floor at 0.1%
return rates
def price_bond(self, rate_path, coupon, maturity, face_value):
"""Price bond using simulated rate paths."""
dt = 0.25
num_payments = int(maturity / dt)
discount_factors = np.exp(-rate_path[:, :num_payments] *
np.arange(1, num_payments+1) * dt)
coupon_pv = coupon * dt * discount_factors.sum(axis=1)
principal_pv = face_value * discount_factors[:, -1]
return coupon_pv + principal_pvMarkov Chain Regime Detection
Implementation of Markov chain models for detecting market regimes using hidden Markov models.
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize
class MarkovRegimeModel:
def __init__(self, n_regimes=2):
self.n_regimes = n_regimes
self.transition_matrix = None
self.regime_params = None
def fit(self, returns, n_iter=100):
"""Fit Markov model using Baum-Welch algorithm."""
n = len(returns)
# Initialize parameters
self.transition_matrix = np.full(
(self.n_regimes, self.n_regimes),
1.0 / self.n_regimes
)
self.regime_params = {
'means': np.array([returns.mean() * 0.5, returns.mean() * 1.5]),
'stds': np.array([returns.std() * 0.5, returns.std() * 1.5]),
'weights': np.array([0.5, 0.5])
}
# EM iteration
for _ in range(n_iter):
# E-step: compute posterior probabilities
gamma, xi = self._e_step(returns)
# M-step: update parameters
self._m_step(returns, gamma, xi)
return self
def _e_step(self, returns):
"""Forward-backward algorithm."""
n = len(returns)
gamma = np.zeros((n, self.n_regimes))
xi = np.zeros((n-1, self.n_regimes, self.n_regimes))
# Forward variables
alpha = np.zeros((n, self.n_regimes))
# Initial emission probabilities
emission = np.zeros((n, self.n_regimes))
for k in range(self.n_regimes):
emission[:, k] = norm.pdf(
returns,
self.regime_params['means'][k],
self.regime_params['stds'][k]
)
# Forward recursion
alpha[0] = self.regime_params['weights'] * emission[0]
for t in range(1, n):
for k in range(self.n_regimes):
alpha[t, k] = emission[t, k] * (
alpha[t-1] @ self.transition_matrix[:, k]
)
# Normalize
alpha_sum = alpha.sum(axis=1, keepdims=True)
alpha /= alpha_sum
gamma = alpha.copy()
# Backward probabilities for xi
beta = np.ones((n, self.n_regimes))
for t in range(n-2, -1, -1):
for k in range(self.n_regimes):
beta[t, k] = (
self.transition_matrix[k] *
emission[t+1] *
beta[t+1]
).sum()
# Compute xi
for t in range(n-1):
for i in range(self.n_regimes):
for j in range(self.n_regimes):
xi[t, i, j] = (
alpha[t, i] *
self.transition_matrix[i, j] *
emission[t+1, j] *
beta[t+1, j]
)
xi[t] /= xi[t].sum()
return gamma, xi
def _m_step(self, returns, gamma, xi):
"""Update parameters from sufficient statistics."""
n = len(returns)
# Update weights
self.regime_params['weights'] = gamma[0] / gamma[0].sum()
# Update means and stds
for k in range(self.n_regimes):
weight_sum = gamma[:, k].sum()
self.regime_params['means'][k] = (
(returns * gamma[:, k]).sum() / weight_sum
)
self.regime_params['stds'][k] = np.sqrt(
((returns - self.regime_params['means'][k])**2 *
gamma[:, k]).sum() / weight_sum
)
# Update transition matrix
for i in range(self.n_regimes):
for j in range(self.n_regimes):
self.transition_matrix[i, j] = (
xi[:, i, j].sum() / gamma[:-1, i].sum()
)
def predict_regime(self, returns):
"""Predict most likely regime sequence."""
self.fit(returns)
gamma, _ = self._e_step(returns)
return np.argmax(gamma, axis=1)Small-Cap Factor Model
Factor-based stock selection model for Chinese A-share market with focus on small-cap premium.
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
class SmallCapFactorModel:
def __init__(self, factors=['size', 'value', 'momentum', 'volatility']):
self.factors = factors
self.factor_returns = None
self.factor_loadings = None
self.scaler = StandardScaler()
def calculate_factors(self, data):
"""Calculate factor values for each stock."""
factors_df = pd.DataFrame(index=data.index)
# Size factor (negative = small cap)
factors_df['size'] = -np.log(data['market_cap'])
# Value factor (P/E ratio)
factors_df['value'] = 1 / data['pe_ratio']
# Momentum (12-month return)
factors_df['momentum'] = data['close'].pct_change(252)
# Volatility (annualized)
factors_df['volatility'] = data['close'].pct_change().rolling(252).std()
# Liquidity
factors_df['liquidity'] = data['volume'].rolling(21).mean()
return factors_df.dropna()
def build_portfolio(self, factor_data, returns, n_stocks=50,
weight_by_factor=True):
"""Build long-short portfolio based on factor signals."""
# Scale factors
scaled_factors = self.scaler.fit_transform(factor_data)
# Calculate factor z-scores
factor_zscores = pd.DataFrame(
scaled_factors,
index=factor_data.index,
columns=factor_data.columns
)
# Combine factors (equal weight)
if weight_by_factor:
factor_score = factor_zscores.mean(axis=1)
else:
# Custom weighting based on small-cap tilt
weights = {'size': 0.4, 'value': 0.2,
'momentum': 0.2, 'volatility': 0.2}
factor_score = sum(
factor_zscores[col] * weights.get(col, 0.2)
for col in factor_zscores.columns
)
# Select small-cap stocks with best factor scores
long_stocks = factor_score.nsmallest(n_stocks)
short_stocks = factor_score.nlargest(n_stocks)
# Calculate returns
long_returns = returns.loc[long_stocks.index].mean()
short_returns = returns.loc[short_stocks.index].mean()
portfolio_returns = long_returns - short_returns
return {
'long_stocks': long_stocks.index.tolist(),
'short_stocks': short_stocks.index.tolist(),
'returns': portfolio_returns,
'factor_score': factor_score
}
def backtest(self, portfolio_func, data, returns, window=252):
"""Backtest factor strategy."""
portfolio_history = []
for i in range(window, len(data)):
train_data = data.iloc[i-window:i]
train_returns = returns.iloc[i-window:i]
portfolio = portfolio_func(train_data, train_returns)
portfolio_history.append({
'date': data.index[i],
**portfolio
})
return pd.DataFrame(portfolio_history).set_index('date')More Code on GitHub
These snippets represent a selection of my quantitative work. Check out my GitHub for the complete implementation of these projects and more.
View GitHub ProfileLanguages & Tools
Core Languages
- • Python (Primary)
- • R (Statistics)
- • SQL (Data)
- • C++ (Performance)
Libraries
- • Pandas/NumPy
- • SciPy/scikit-learn
- • TensorFlow/PyTorch
- • Matplotlib/Seaborn
Data & APIs
- • Bloomberg/Reuters
- • ivolatility API
- • Yahoo Finance
- • Wind/Tushare
Development
- • Git/GitHub
- • Jupyter Lab
- • Docker
- • Linux/Shell