Code Examples

Selected code snippets and implementations from my quantitative research projects. These represent my approach to systematic trading, volatility analysis, and financial modeling.

Trading Strategypython

EMA Crossover with Heikin-Ashi

Implementation of a systematic trading strategy using Heikin-Ashi candlesticks and EMA crossovers with gap pullback exits.

import pandas as pd
import numpy as np

def calculate_heikin_ashi(df):
    """Convert OHLC data to Heikin-Ashi candles."""
    ha_close = (df['Open'] + df['High'] + df['Low'] + df['Close']) / 4
    ha_open = (df['Open'].iloc[0] + ha_close.iloc[0]) / 2
    
    for i in range(1, len(df)):
        ha_open = (ha_open.iloc[i-1] + ha_close.iloc[i-1]) / 2
    
    ha_high = pd.concat([df['High'], ha_open, ha_close], axis=1).max(axis=1)
    ha_low = pd.concat([df['Low'], ha_open, ha_close], axis=1).min(axis=1)
    
    return pd.DataFrame({
        'Open': ha_open,
        'High': ha_high,
        'Low': ha_low,
        'Close': ha_close
    })

def ema_crossover_signals(df, fast=12, slow=26):
    """Generate EMA crossover signals."""
    df['EMA_fast'] = df['Close'].ewm(span=fast, adjust=False).mean()
    df['EMA_slow'] = df['Close'].ewm(span=slow, adjust=False).mean()
    
    df['Signal'] = 0
    df.loc[df['EMA_fast'] > df['EMA_slow'], 'Signal'] = 1
    df.loc[df['EMA_fast'] < df['EMA_slow'], 'Signal'] = -1
    
    df['Position'] = df['Signal'].diff()
    return df

def gap_pullback_exit(df, window=5):
    """Exit when price pulls back from peak by gap percentage."""
    df['Peak'] = df['Close'].cummax()
    df['Drawdown'] = (df['Peak'] - df['Close']) / df['Peak']
    
    df['Exit_Signal'] = df['Drawdown'] > 0.02  # 2% pullback
    return df
#pandas#numpy#trading#technical-analysis
Optionspython

IV Surface Data Collection

Framework for collecting and processing implied volatility surface data using the ivolatility API.

import requests
import pandas as pd
from datetime import datetime, timedelta

class IVSurfaceCollector:
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://api.ivolatility.com"
    
    def get_options_data(self, ticker, expiration_range=(30, 90)):
        """Collect options data for specified expiration range."""
        params = {
            "api_key": self.api_key,
            "ticker": ticker,
            "min_expiration": expiration_range[0],
            "max_expiration": expiration_range[1],
            "include_underlying": True
        }
        
        response = requests.get(
            f"{self.base_url}/options/ivsurface",
            params=params
        )
        
        return self._process_response(response.json())
    
    def calculate_iv_surface(self, options_data):
        """Convert raw options data to IV surface structure."""
        # Group by expiration and calculate IV for each strike
        surfaces = {}
        
        for exp_date, group in options_data.groupby('expiration'):
            strikes = group['strike'].values
            ivs = group['implied_volatility'].values
            
            # Fit polynomial to IV curve
            coeffs = np.polyfit(strikes, ivs, 2)
            surfaces[exp_date] = {
                'strikes': strikes,
                'ivs': ivs,
                'polynomial': coeffs
            }
        
        return surfaces
    
    def identify_skew_patterns(self, surface):
        """Analyze IV surface for skew patterns."""
        # Calculate skew as IV slope across strikes
        if len(surface['strikes']) < 3:
            return {'pattern': 'insufficient_data'}
        
        # Fit linear regression to IV curve
        from scipy import stats
        slope, intercept, r_value, p_value, std_err =             stats.linregress(surface['strikes'], surface['ivs'])
        
        if slope < -0.01:
            return {'pattern': 'put_skew', 'strength': abs(slope)}
        elif slope > 0.01:
            return {'pattern': 'call_skew', 'strength': abs(slope)}
        else:
            return {'pattern': 'flat', 'strength': 0}
#api#options#volatility#data-engineering
Fixed Incomepython

Monte Carlo Yield Curve Simulation

Monte Carlo simulation framework for yield curve modeling and forward rate curve construction.

import numpy as np
import pandas as pd
from scipy.interpolate import CubicSpline

class YieldCurveSimulator:
    def __init__(self, initial_curve, num_simulations=10000):
        self.initial_curve = initial_curve  # (maturities, yields)
        self.num_simulations = num_simulations
        self.cs = CubicSpline(
            initial_curve[0], 
            initial_curve[1]
        )
    
    def bootstrap_curve(self, bond_data):
        """Bootstrap yield curve from bond prices."""
        sorted_bonds = bond_data.sort_values('maturity')
        yields = []
        
        for _, bond in sorted_bonds.iterrows():
            # Simplified bootstrap logic
            price = bond['price']
            coupon = bond['coupon']
            maturity = bond['maturity']
            face_value = bond['face_value']
            
            # Calculate yield to maturity
            # (simplified - actual implementation more complex)
            ytm = ((face_value + coupon) / price) ** (1/maturity) - 1
            yields.append(ytm)
        
        return np.array([sorted_bonds['maturity'].values, yields])
    
    def simulate_forward_rates(self, years=30, dt=0.25):
        """Simulate forward rate paths using Ho-Lee model."""
        num_steps = int(years / dt)
        dt_sqrt = np.sqrt(dt)
        
        # Initialize rates matrix
        rates = np.zeros((self.num_simulations, num_steps))
        rates[:, 0] = self.cs(dt)
        
        # Random shocks
        dW = np.random.normal(0, dt_sqrt, 
                            (self.num_simulations, num_steps - 1))
        
        # Mean reversion parameter
        theta = 0.01
        sigma = 0.01
        
        for t in range(1, num_steps):
            # Ho-Lee short rate dynamics
            dr = theta * dt + sigma * dW[:, t-1]
            rates[:, t] = rates[:, t-1] + dr
            rates[:, t] = np.maximum(rates[:, t], 0.001)  # Floor at 0.1%
        
        return rates
    
    def price_bond(self, rate_path, coupon, maturity, face_value):
        """Price bond using simulated rate paths."""
        dt = 0.25
        num_payments = int(maturity / dt)
        
        discount_factors = np.exp(-rate_path[:, :num_payments] * 
                                 np.arange(1, num_payments+1) * dt)
        
        coupon_pv = coupon * dt * discount_factors.sum(axis=1)
        principal_pv = face_value * discount_factors[:, -1]
        
        return coupon_pv + principal_pv
#monte-carlo#fixed-income#quantitative-finance
Stochastic Processespython

Markov Chain Regime Detection

Implementation of Markov chain models for detecting market regimes using hidden Markov models.

import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize

class MarkovRegimeModel:
    def __init__(self, n_regimes=2):
        self.n_regimes = n_regimes
        self.transition_matrix = None
        self.regime_params = None
    
    def fit(self, returns, n_iter=100):
        """Fit Markov model using Baum-Welch algorithm."""
        n = len(returns)
        
        # Initialize parameters
        self.transition_matrix = np.full(
            (self.n_regimes, self.n_regimes), 
            1.0 / self.n_regimes
        )
        
        self.regime_params = {
            'means': np.array([returns.mean() * 0.5, returns.mean() * 1.5]),
            'stds': np.array([returns.std() * 0.5, returns.std() * 1.5]),
            'weights': np.array([0.5, 0.5])
        }
        
        # EM iteration
        for _ in range(n_iter):
            # E-step: compute posterior probabilities
            gamma, xi = self._e_step(returns)
            
            # M-step: update parameters
            self._m_step(returns, gamma, xi)
        
        return self
    
    def _e_step(self, returns):
        """Forward-backward algorithm."""
        n = len(returns)
        gamma = np.zeros((n, self.n_regimes))
        xi = np.zeros((n-1, self.n_regimes, self.n_regimes))
        
        # Forward variables
        alpha = np.zeros((n, self.n_regimes))
        
        # Initial emission probabilities
        emission = np.zeros((n, self.n_regimes))
        for k in range(self.n_regimes):
            emission[:, k] = norm.pdf(
                returns,
                self.regime_params['means'][k],
                self.regime_params['stds'][k]
            )
        
        # Forward recursion
        alpha[0] = self.regime_params['weights'] * emission[0]
        for t in range(1, n):
            for k in range(self.n_regimes):
                alpha[t, k] = emission[t, k] * (
                    alpha[t-1] @ self.transition_matrix[:, k]
                )
        
        # Normalize
        alpha_sum = alpha.sum(axis=1, keepdims=True)
        alpha /= alpha_sum
        gamma = alpha.copy()
        
        # Backward probabilities for xi
        beta = np.ones((n, self.n_regimes))
        for t in range(n-2, -1, -1):
            for k in range(self.n_regimes):
                beta[t, k] = (
                    self.transition_matrix[k] * 
                    emission[t+1] * 
                    beta[t+1]
                ).sum()
        
        # Compute xi
        for t in range(n-1):
            for i in range(self.n_regimes):
                for j in range(self.n_regimes):
                    xi[t, i, j] = (
                        alpha[t, i] *
                        self.transition_matrix[i, j] *
                        emission[t+1, j] *
                        beta[t+1, j]
                    )
            xi[t] /= xi[t].sum()
        
        return gamma, xi
    
    def _m_step(self, returns, gamma, xi):
        """Update parameters from sufficient statistics."""
        n = len(returns)
        
        # Update weights
        self.regime_params['weights'] = gamma[0] / gamma[0].sum()
        
        # Update means and stds
        for k in range(self.n_regimes):
            weight_sum = gamma[:, k].sum()
            self.regime_params['means'][k] = (
                (returns * gamma[:, k]).sum() / weight_sum
            )
            self.regime_params['stds'][k] = np.sqrt(
                ((returns - self.regime_params['means'][k])**2 * 
                 gamma[:, k]).sum() / weight_sum
            )
        
        # Update transition matrix
        for i in range(self.n_regimes):
            for j in range(self.n_regimes):
                self.transition_matrix[i, j] = (
                    xi[:, i, j].sum() / gamma[:-1, i].sum()
                )
    
    def predict_regime(self, returns):
        """Predict most likely regime sequence."""
        self.fit(returns)
        gamma, _ = self._e_step(returns)
        return np.argmax(gamma, axis=1)
#markov#regime-detection#stochastic-processes#machine-learning
Factor Investingpython

Small-Cap Factor Model

Factor-based stock selection model for Chinese A-share market with focus on small-cap premium.

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler

class SmallCapFactorModel:
    def __init__(self, factors=['size', 'value', 'momentum', 'volatility']):
        self.factors = factors
        self.factor_returns = None
        self.factor_loadings = None
        self.scaler = StandardScaler()
    
    def calculate_factors(self, data):
        """Calculate factor values for each stock."""
        factors_df = pd.DataFrame(index=data.index)
        
        # Size factor (negative = small cap)
        factors_df['size'] = -np.log(data['market_cap'])
        
        # Value factor (P/E ratio)
        factors_df['value'] = 1 / data['pe_ratio']
        
        # Momentum (12-month return)
        factors_df['momentum'] = data['close'].pct_change(252)
        
        # Volatility (annualized)
        factors_df['volatility'] = data['close'].pct_change().rolling(252).std()
        
        # Liquidity
        factors_df['liquidity'] = data['volume'].rolling(21).mean()
        
        return factors_df.dropna()
    
    def build_portfolio(self, factor_data, returns, n_stocks=50, 
                        weight_by_factor=True):
        """Build long-short portfolio based on factor signals."""
        # Scale factors
        scaled_factors = self.scaler.fit_transform(factor_data)
        
        # Calculate factor z-scores
        factor_zscores = pd.DataFrame(
            scaled_factors,
            index=factor_data.index,
            columns=factor_data.columns
        )
        
        # Combine factors (equal weight)
        if weight_by_factor:
            factor_score = factor_zscores.mean(axis=1)
        else:
            # Custom weighting based on small-cap tilt
            weights = {'size': 0.4, 'value': 0.2, 
                      'momentum': 0.2, 'volatility': 0.2}
            factor_score = sum(
                factor_zscores[col] * weights.get(col, 0.2) 
                for col in factor_zscores.columns
            )
        
        # Select small-cap stocks with best factor scores
        long_stocks = factor_score.nsmallest(n_stocks)
        short_stocks = factor_score.nlargest(n_stocks)
        
        # Calculate returns
        long_returns = returns.loc[long_stocks.index].mean()
        short_returns = returns.loc[short_stocks.index].mean()
        
        portfolio_returns = long_returns - short_returns
        
        return {
            'long_stocks': long_stocks.index.tolist(),
            'short_stocks': short_stocks.index.tolist(),
            'returns': portfolio_returns,
            'factor_score': factor_score
        }
    
    def backtest(self, portfolio_func, data, returns, window=252):
        """Backtest factor strategy."""
        portfolio_history = []
        
        for i in range(window, len(data)):
            train_data = data.iloc[i-window:i]
            train_returns = returns.iloc[i-window:i]
            
            portfolio = portfolio_func(train_data, train_returns)
            portfolio_history.append({
                'date': data.index[i],
                **portfolio
            })
        
        return pd.DataFrame(portfolio_history).set_index('date')
#factor-models#chinese-markets#portfolio-construction#quant

More Code on GitHub

These snippets represent a selection of my quantitative work. Check out my GitHub for the complete implementation of these projects and more.

View GitHub Profile

Languages & Tools

Core Languages

  • • Python (Primary)
  • • R (Statistics)
  • • SQL (Data)
  • • C++ (Performance)

Libraries

  • • Pandas/NumPy
  • • SciPy/scikit-learn
  • • TensorFlow/PyTorch
  • • Matplotlib/Seaborn

Data & APIs

  • • Bloomberg/Reuters
  • • ivolatility API
  • • Yahoo Finance
  • • Wind/Tushare

Development

  • • Git/GitHub
  • • Jupyter Lab
  • • Docker
  • • Linux/Shell