该策略以FTSE100指数成分股为交易标的,每小时进行交易。通过识别价格偏差最小的前5对股票,当价格发生偏离时开仓交易,并在价格趋同时或交易周期结束时平仓,以实现系统化执行。

I. 策略概述

该策略以FTSE100指数成分股为目标,采用日内配对交易方法,每小时执行交易。一个完整的22小时周期包括两个阶段:

  1. 形成期(264小时):通过计算标准化价格的平方偏差之和,识别价格偏差最小的股票对。
  2. 交易期(132小时):选出前5对股票进行交易。

交易规则如下:

这种方法保证了系统化执行,并有效管理交易周期内的风险。

II. 策略合理性

配对交易利用了股票价格的历史协整关系,假设这些股票具有共同的基本面回报相关性。当短期冲击导致价格偏离协整关系时,产生了统计套利机会,因为价格可能回归其历史关系。该策略动态更新可交易配对,以排除不再同步的股票对,确保仅交易那些具有高度协整可能性的配对。通过专注于价格收敛性强的股票对,该策略降低了价格持续偏离的风险,同时保持其套利机会的有效性。

III. 论文来源

High Frequency Equity Pairs Trading: Transaction Costs, Speed of Execution and Patterns in Returns [点击浏览原文]

<摘要>

本文研究了2007年1月至12月期间FTSE100成分股的高频配对交易特性。研究表明,策略的超额收益对交易成本和执行速度极为敏感。尽管配对交易能够捕捉统计套利机会,但其表现受限于市场流动性、交易滑点和执行延迟的影响。研究进一步表明,在严格控制这些因素的情况下,基于协整关系的配对交易仍然能够实现风险调整后的显著回报,同时揭示了收益模式与交易频率之间的复杂关系。

IV. 回测表现

年化收益率10.23%
波动率4.98%
Beta0.003
夏普比率1.25
索提诺比率-0.34
最大回撤N/A
胜率56%

V. 完整python代码

import itertools as it
from AlgorithmImports import *
from typing import List, Dict
import numpy as np
class PairsTradingIntradayBasis(QCAlgorithm):
    
    def Initialize(self) -> None:
        self.SetStartDate(2010, 1, 1)
        self.SetCash(100000)
       
        self.symbol:Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        # Daily price data.
        self.history_price:Dict[Symbol, RollingWindow] = {}
        self.formation_period:int = 264
        self.trading_period:int = 132
        self.period:int = 22
        self.leverage:int = 5
        self.min_share_price:int = 5
        
        self.overlapping_portfolios_count:int = 6  # 132 / 22 = 6
        self.overlapping_portfolios:List[Symbol] = []
        
        # Equally weighted brackets.
        self.max_traded_pairs:int = 5
        
        self.fundamental_sorting_key = lambda x: x.DollarVolume
        self.fundamental_count:int = 100
        self.selection_hours:int = 1
        self.selection_flag:bool = True
        self.Settings.MinimumOrderMarginPortfolioPercentage = 0.
        self.settings.daily_precise_end_time = False
        self.UniverseSettings.Resolution = Resolution.Hour
        self.AddUniverse(self.FundamentalSelectionFunction)
    def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
        for security in changes.AddedSecurities:
            security.SetFeeModel(CustomFeeModel())
            security.SetLeverage(self.leverage)
            
        for security in changes.RemovedSecurities:
            symbol:Symbol = security.Symbol
            if symbol in self.history_price:
                del self.history_price[symbol]
            
        symbols:List[Symbol] = [x for x in self.history_price.keys() if x != self.symbol]
        self.symbol_pairs:List[Tuple[Symbol]] = list(it.combinations(symbols, 2))
        
        # minimize the sum of squared deviations
        distances:Dict[Tuple[Symbol], float] = {}
        for pair in self.symbol_pairs:
            if self.history_price[pair[0]].IsReady and self.history_price[pair[1]].IsReady:
                distances[pair] = self.Distance(self.history_price[pair[0]], self.history_price[pair[1]])
        
        new_portfolio = TradedPortfolio(self)
        
        # sorting 5 pairs with no assets repetition
        if len(distances) != 0:
            sorted_pairs:List[Tuple[Tuple[Symbol], float]] = sorted(distances.items(), key = lambda x: x[1])
            sorted_pairs:List[Tuple[Symbol]] = [x[0] for x in sorted_pairs]
            
            # self.sorted_pairs.append(sorted_pairs[0])
            new_portfolio.add_pair(sorted_pairs[0])
            
            selected_assets:List[List[Symbol]] = [sorted_pairs[0][0], sorted_pairs[0][1]]
            
            for pair in sorted_pairs:
                symbol1:Symbol = pair[0]
                symbol2:Symbol = pair[1]
                
                # symbols which has not been selected yet
                if symbol1 not in selected_assets and \
                    symbol2 not in selected_assets:
                        # self.sorted_pairs.append(pair)
                        new_portfolio.add_pair(pair)
                        selected_assets.extend([symbol1, symbol2])
                        
                        if len(new_portfolio.sorted_pairs) == self.max_traded_pairs:
                            break
        
        # liquidate firstly added portfolio
        if len(self.overlapping_portfolios) == self.overlapping_portfolios_count:
            item_to_remove:Symbol = self.overlapping_portfolios[0]
            result = item_to_remove.liquidate()
            if not result:
                self.Log('Nothing to liquidate or liquidation error')
            self.overlapping_portfolios.remove(item_to_remove)
        # append new portfolio
        self.overlapping_portfolios.append(new_portfolio)
        
    def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> List[Symbol]:
        if not self.selection_flag:
            return Universe.Unchanged
        self.selection_flag = False
        
        selected:List[Fundamental] = [
            x for x in fundamental if x.HasFundamentalData and x.Price > self.min_share_price and x.Market == 'usa'
        ]
            
        if len(selected) > self.fundamental_count:
            selected = [x for x in sorted(selected, key=self.fundamental_sorting_key, reverse=True)[:self.fundamental_count]]
        
        # Warmup price rolling windows.
        for stock in selected:
            symbol:Symbol = stock.Symbol
            
            if symbol in self.history_price:
                continue
            
            self.history_price[symbol] = RollingWindow[float](self.formation_period)
            history:dataframe = self.History(symbol, self.formation_period, Resolution.Hour)
            if history.empty:
                self.Log(f"Not enough data for {symbol} yet")
                continue
            closes:Series = history.loc[symbol].close
            for time, close in closes.items():
                self.history_price[symbol].Add(close)
                
        return [x.Symbol for x in selected if self.history_price[x.Symbol].IsReady]
    
    def OnData(self, data: Slice) -> None:
        for symbol in self.history_price:
            symbol_obj:Symbol = self.Symbol(symbol)
            if symbol_obj in data and data[symbol_obj]:
                price:float = data[symbol_obj].Value
                self.history_price[symbol].Add(price)
                        
        if self.selection_hours == self.period:
            self.selection_flag = True
            self.selection_hours = 1
        else:
            self.selection_hours += 1
        
        # manage overlapping portfolios
        for portfolio in self.overlapping_portfolios:
            pairs_to_remove:List[Tuple[Symbol]] = []
            
            for pair in portfolio.sorted_pairs:
                if pair[0] not in self.history_price or pair[1] not in self.history_price:
                    continue
                
                if not self.history_price[pair[0]].IsReady or not self.history_price[pair[1]].IsReady:
                    continue
                
                # Calculate the spread of two price series.
                price_a:List[float] = [x for x in self.history_price[pair[0]]]
                price_b:List[float] = [x for x in self.history_price[pair[1]]]
                
                norm_a:np.ndarray = np.array(price_a) / price_a[-1]
                norm_b:np.ndarray = np.array(price_b) / price_b[-1]
                
                spread:np.ndarray = norm_a - norm_b
                mean:float = np.mean(spread)
                std:float = np.std(spread)
                actual_spread:float = spread[0]
                
                # Long-short position is opened when pair prices have diverged by two standard deviations.
                traded_portfolio_value = self.Portfolio.TotalPortfolioValue / self.overlapping_portfolios_count / self.max_traded_pairs
                if actual_spread > mean + 2*std or actual_spread < mean - 2*std:
                    if pair not in portfolio.traded_pairs:
                        # open new position for pair, if there's place for it.
                        if len(portfolio.traded_pairs) < self.max_traded_pairs:
                            symbol_a:Symbol = pair[0]
                            symbol_b:Symbol = pair[1]
                            a_price_norm:float = norm_a[0]
                            b_price_norm:float = norm_b[0]
                            a_price:float = price_a[0]
                            b_price:float = price_b[0]
                            
                            if symbol_a in data and data[symbol_a] and symbol_b in data and data[symbol_b]: 
                                # a stock's price > b stock's price
                                if a_price_norm > b_price_norm:
                                    long_q:float = traded_portfolio_value / b_price    # long b stock
                                    short_q:float = -traded_portfolio_value / a_price  # short a stock
                                    if self.Securities.ContainsKey(symbol_a) and self.Securities.ContainsKey(symbol_b) and \
                                        self.Securities[symbol_a].Price != 0 and self.Securities[symbol_a].IsTradable and \
                                        self.Securities[symbol_b].Price != 0 and self.Securities[symbol_b].IsTradable:
                                        self.MarketOrder(symbol_a, short_q)
                                        self.MarketOrder(symbol_b, long_q)
                                        
                                        portfolio.traded_quantity[pair] = (short_q, long_q)
                                        portfolio.traded_pairs.append(pair)
                                # b stock's price > a stock's price
                                else:
                                    long_q = traded_portfolio_value / a_price
                                    short_q = -traded_portfolio_value / b_price
                                    if self.Securities.ContainsKey(symbol_a) and self.Securities.ContainsKey(symbol_b) and \
                                        self.Securities[symbol_a].Price != 0 and self.Securities[symbol_a].IsTradable and \
                                        self.Securities[symbol_b].Price != 0 and self.Securities[symbol_b].IsTradable:
                                        self.MarketOrder(symbol_a, long_q)
                                        self.MarketOrder(symbol_b, short_q)
        
                                        portfolio.traded_quantity[pair] = (long_q, short_q)
                                        portfolio.traded_pairs.append(pair)
                # The position is closed when prices revert back.
                else:
                    portfolio.liquidate_pair(pair)
    def Distance(self, price_a:List[float], price_b:List[float]):
        # Calculate the sum of squared deviations between two normalized price series.
        price_a:List[float] = [x for x in price_a]
        price_b:List[float] = [x for x in price_b]
        
        norm_a:np.ndarray = np.array(price_a) / price_a[-1]
        norm_b:np.ndarray = np.array(price_b) / price_b[-1]
        return sum((norm_a - norm_b)**2)
        
# Custom fee model.
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))
class TradedPortfolio():
    def __init__(self, algorithm):
        self.traded_pairs = []
        self.traded_quantity = {}
        self.sorted_pairs = []
        
        self.algorithm = algorithm
    
    def add_pair(self, pair) -> None:
        self.sorted_pairs.append(pair)
    
    def liquidate(self) -> bool:
        result = False
        for pair in self.traded_pairs:
            if pair in self.traded_quantity:
                self.algorithm.MarketOrder(pair[0], -self.traded_quantity[pair][0])
                self.algorithm.MarketOrder(pair[1], -self.traded_quantity[pair][1])
                result = True
        return result
    def liquidate_pair(self, pair) -> None:
        if pair in self.traded_pairs and pair in self.traded_quantity:
            self.algorithm.MarketOrder(pair[0], -self.traded_quantity[pair][0])
            self.algorithm.MarketOrder(pair[1], -self.traded_quantity[pair][1])
            self.traded_pairs.remove(pair)
            del self.traded_quantity[pair]




发表评论

了解 Quant Buffet 的更多信息

立即订阅以继续阅读并访问完整档案。

继续阅读