The strategy trades FTSE100 pairs hourly, identifying top 5 pairs with minimal price deviations, opening trades on divergence, and closing on convergence or period end for systematic execution.

I. STRATEGY IN A NUTSHELL

FTSE100 stocks are traded hourly using pairs trading. Over a 22-hour cycle (264-hour formation, 132-hour trading), top 5 co-integrated pairs are selected. Trades open when normalized prices diverge >2σ; long the lower-priced, short the higher-priced stock. Positions close at convergence or period end.

II. ECONOMIC RATIONALE

Pairs trading exploits co-integrated stock prices. Temporary deviations from shocks create arbitrage opportunities, as prices tend to revert to historical relationships. Continuously updating tradable pairs ensures focus on strongly co-integrated, high-probability convergence pairs.

III. SOURCE PAPER

High Frequency Equity Pairs Trading: Transaction Costs, Speed of Execution and Patterns in Returns [Click to Open PDF]

Bowen, Hutchinson, O’Sullivan, University College Cork, University College Cork, University College Cork

<Abstract>

In this paper we examine the characteristics of high frequency pairs trading using a sample of FTSE100 constituent stocks for the period January to December 2007. We show that the excess returns of the strategy are extremely sensitive both to transaction costs and speed of execution. When we specify a moderate level of transaction costs (15 basis points) the excess returns of the strategy are reduced by more than 50%. Likewise, when we implement a wait one period restriction on execution the returns of the strategy are eliminated. When we further examine the time series properties of pairs trading returns we see that the majority of returns occur in the first hour of trading. Finally, we find that the excess returns bear little exposure to traditional risk factors but are weakly related to market and reversal risk factors.

IV. BACKTEST PERFORMANCE

Annualised Return10.23%
Volatility4.98%
Beta0.003
Sharpe Ratio 1.25
Sortino Ratio-0.34
Maximum DrawdownN/A
Win Rate56%

V. FULL PYTHON CODE

import itertools as it
from AlgorithmImports import *
from typing import List, Dict
import numpy as np
class PairsTradingIntradayBasis(QCAlgorithm):
    
    def Initialize(self) -> None:
        self.SetStartDate(2010, 1, 1)
        self.SetCash(100000)
       
        self.symbol:Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        # Daily price data.
        self.history_price:Dict[Symbol, RollingWindow] = {}
        self.formation_period:int = 264
        self.trading_period:int = 132
        self.period:int = 22
        self.leverage:int = 5
        self.min_share_price:int = 5
        
        self.overlapping_portfolios_count:int = 6  # 132 / 22 = 6
        self.overlapping_portfolios:List[Symbol] = []
        
        # Equally weighted brackets.
        self.max_traded_pairs:int = 5
        
        self.fundamental_sorting_key = lambda x: x.DollarVolume
        self.fundamental_count:int = 100
        self.selection_hours:int = 1
        self.selection_flag:bool = True
        self.Settings.MinimumOrderMarginPortfolioPercentage = 0.
        self.settings.daily_precise_end_time = False
        self.UniverseSettings.Resolution = Resolution.Hour
        self.AddUniverse(self.FundamentalSelectionFunction)
    def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
        for security in changes.AddedSecurities:
            security.SetFeeModel(CustomFeeModel())
            security.SetLeverage(self.leverage)
            
        for security in changes.RemovedSecurities:
            symbol:Symbol = security.Symbol
            if symbol in self.history_price:
                del self.history_price[symbol]
            
        symbols:List[Symbol] = [x for x in self.history_price.keys() if x != self.symbol]
        self.symbol_pairs:List[Tuple[Symbol]] = list(it.combinations(symbols, 2))
        
        # minimize the sum of squared deviations
        distances:Dict[Tuple[Symbol], float] = {}
        for pair in self.symbol_pairs:
            if self.history_price[pair[0]].IsReady and self.history_price[pair[1]].IsReady:
                distances[pair] = self.Distance(self.history_price[pair[0]], self.history_price[pair[1]])
        
        new_portfolio = TradedPortfolio(self)
        
        # sorting 5 pairs with no assets repetition
        if len(distances) != 0:
            sorted_pairs:List[Tuple[Tuple[Symbol], float]] = sorted(distances.items(), key = lambda x: x[1])
            sorted_pairs:List[Tuple[Symbol]] = [x[0] for x in sorted_pairs]
            
            # self.sorted_pairs.append(sorted_pairs[0])
            new_portfolio.add_pair(sorted_pairs[0])
            
            selected_assets:List[List[Symbol]] = [sorted_pairs[0][0], sorted_pairs[0][1]]
            
            for pair in sorted_pairs:
                symbol1:Symbol = pair[0]
                symbol2:Symbol = pair[1]
                
                # symbols which has not been selected yet
                if symbol1 not in selected_assets and \
                    symbol2 not in selected_assets:
                        # self.sorted_pairs.append(pair)
                        new_portfolio.add_pair(pair)
                        selected_assets.extend([symbol1, symbol2])
                        
                        if len(new_portfolio.sorted_pairs) == self.max_traded_pairs:
                            break
        
        # liquidate firstly added portfolio
        if len(self.overlapping_portfolios) == self.overlapping_portfolios_count:
            item_to_remove:Symbol = self.overlapping_portfolios[0]
            result = item_to_remove.liquidate()
            if not result:
                self.Log('Nothing to liquidate or liquidation error')
            self.overlapping_portfolios.remove(item_to_remove)
        # append new portfolio
        self.overlapping_portfolios.append(new_portfolio)
        
    def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> List[Symbol]:
        if not self.selection_flag:
            return Universe.Unchanged
        self.selection_flag = False
        
        selected:List[Fundamental] = [
            x for x in fundamental if x.HasFundamentalData and x.Price > self.min_share_price and x.Market == 'usa'
        ]
            
        if len(selected) > self.fundamental_count:
            selected = [x for x in sorted(selected, key=self.fundamental_sorting_key, reverse=True)[:self.fundamental_count]]
        
        # Warmup price rolling windows.
        for stock in selected:
            symbol:Symbol = stock.Symbol
            
            if symbol in self.history_price:
                continue
            
            self.history_price[symbol] = RollingWindow[float](self.formation_period)
            history:dataframe = self.History(symbol, self.formation_period, Resolution.Hour)
            if history.empty:
                self.Log(f"Not enough data for {symbol} yet")
                continue
            closes:Series = history.loc[symbol].close
            for time, close in closes.items():
                self.history_price[symbol].Add(close)
                
        return [x.Symbol for x in selected if self.history_price[x.Symbol].IsReady]
    
    def OnData(self, data: Slice) -> None:
        for symbol in self.history_price:
            symbol_obj:Symbol = self.Symbol(symbol)
            if symbol_obj in data and data[symbol_obj]:
                price:float = data[symbol_obj].Value
                self.history_price[symbol].Add(price)
                        
        if self.selection_hours == self.period:
            self.selection_flag = True
            self.selection_hours = 1
        else:
            self.selection_hours += 1
        
        # manage overlapping portfolios
        for portfolio in self.overlapping_portfolios:
            pairs_to_remove:List[Tuple[Symbol]] = []
            
            for pair in portfolio.sorted_pairs:
                if pair[0] not in self.history_price or pair[1] not in self.history_price:
                    continue
                
                if not self.history_price[pair[0]].IsReady or not self.history_price[pair[1]].IsReady:
                    continue
                
                # Calculate the spread of two price series.
                price_a:List[float] = [x for x in self.history_price[pair[0]]]
                price_b:List[float] = [x for x in self.history_price[pair[1]]]
                
                norm_a:np.ndarray = np.array(price_a) / price_a[-1]
                norm_b:np.ndarray = np.array(price_b) / price_b[-1]
                
                spread:np.ndarray = norm_a - norm_b
                mean:float = np.mean(spread)
                std:float = np.std(spread)
                actual_spread:float = spread[0]
                
                # Long-short position is opened when pair prices have diverged by two standard deviations.
                traded_portfolio_value = self.Portfolio.TotalPortfolioValue / self.overlapping_portfolios_count / self.max_traded_pairs
                if actual_spread > mean + 2*std or actual_spread < mean - 2*std:
                    if pair not in portfolio.traded_pairs:
                        # open new position for pair, if there's place for it.
                        if len(portfolio.traded_pairs) < self.max_traded_pairs:
                            symbol_a:Symbol = pair[0]
                            symbol_b:Symbol = pair[1]
                            a_price_norm:float = norm_a[0]
                            b_price_norm:float = norm_b[0]
                            a_price:float = price_a[0]
                            b_price:float = price_b[0]
                            
                            if symbol_a in data and data[symbol_a] and symbol_b in data and data[symbol_b]: 
                                # a stock's price > b stock's price
                                if a_price_norm > b_price_norm:
                                    long_q:float = traded_portfolio_value / b_price    # long b stock
                                    short_q:float = -traded_portfolio_value / a_price  # short a stock
                                    if self.Securities.ContainsKey(symbol_a) and self.Securities.ContainsKey(symbol_b) and \
                                        self.Securities[symbol_a].Price != 0 and self.Securities[symbol_a].IsTradable and \
                                        self.Securities[symbol_b].Price != 0 and self.Securities[symbol_b].IsTradable:
                                        self.MarketOrder(symbol_a, short_q)
                                        self.MarketOrder(symbol_b, long_q)
                                        
                                        portfolio.traded_quantity[pair] = (short_q, long_q)
                                        portfolio.traded_pairs.append(pair)
                                # b stock's price > a stock's price
                                else:
                                    long_q = traded_portfolio_value / a_price
                                    short_q = -traded_portfolio_value / b_price
                                    if self.Securities.ContainsKey(symbol_a) and self.Securities.ContainsKey(symbol_b) and \
                                        self.Securities[symbol_a].Price != 0 and self.Securities[symbol_a].IsTradable and \
                                        self.Securities[symbol_b].Price != 0 and self.Securities[symbol_b].IsTradable:
                                        self.MarketOrder(symbol_a, long_q)
                                        self.MarketOrder(symbol_b, short_q)
        
                                        portfolio.traded_quantity[pair] = (long_q, short_q)
                                        portfolio.traded_pairs.append(pair)
                # The position is closed when prices revert back.
                else:
                    portfolio.liquidate_pair(pair)
    def Distance(self, price_a:List[float], price_b:List[float]):
        # Calculate the sum of squared deviations between two normalized price series.
        price_a:List[float] = [x for x in price_a]
        price_b:List[float] = [x for x in price_b]
        
        norm_a:np.ndarray = np.array(price_a) / price_a[-1]
        norm_b:np.ndarray = np.array(price_b) / price_b[-1]
        return sum((norm_a - norm_b)**2)
        
# Custom fee model.
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))
class TradedPortfolio():
    def __init__(self, algorithm):
        self.traded_pairs = []
        self.traded_quantity = {}
        self.sorted_pairs = []
        
        self.algorithm = algorithm
    
    def add_pair(self, pair) -> None:
        self.sorted_pairs.append(pair)
    
    def liquidate(self) -> bool:
        result = False
        for pair in self.traded_pairs:
            if pair in self.traded_quantity:
                self.algorithm.MarketOrder(pair[0], -self.traded_quantity[pair][0])
                self.algorithm.MarketOrder(pair[1], -self.traded_quantity[pair][1])
                result = True
        return result
    def liquidate_pair(self, pair) -> None:
        if pair in self.traded_pairs and pair in self.traded_quantity:
            self.algorithm.MarketOrder(pair[0], -self.traded_quantity[pair][0])
            self.algorithm.MarketOrder(pair[1], -self.traded_quantity[pair][1])
            self.traded_pairs.remove(pair)
            del self.traded_quantity[pair]

Leave a Reply

Discover more from Quant Buffet

Subscribe now to keep reading and get access to the full archive.

Continue reading