
“该策略以FTSE100指数成分股为交易标的,每小时进行交易。通过识别价格偏差最小的前5对股票,当价格发生偏离时开仓交易,并在价格趋同时或交易周期结束时平仓,以实现系统化执行。”
资产类别:股票 | 地区:欧洲 | 频率:日内 | 市场:股票市场 | 关键词:配对
I. 策略概述
该策略以FTSE100指数成分股为目标,采用日内配对交易方法,每小时执行交易。一个完整的22小时周期包括两个阶段:
- 形成期(264小时):通过计算标准化价格的平方偏差之和,识别价格偏差最小的股票对。
- 交易期(132小时):选出前5对股票进行交易。
交易规则如下:
- 当标准化价格的偏离超过历史标准差的两倍时开仓:
- 平仓条件:
这种方法保证了系统化执行,并有效管理交易周期内的风险。
II. 策略合理性
配对交易利用了股票价格的历史协整关系,假设这些股票具有共同的基本面回报相关性。当短期冲击导致价格偏离协整关系时,产生了统计套利机会,因为价格可能回归其历史关系。该策略动态更新可交易配对,以排除不再同步的股票对,确保仅交易那些具有高度协整可能性的配对。通过专注于价格收敛性强的股票对,该策略降低了价格持续偏离的风险,同时保持其套利机会的有效性。
III. 论文来源
High Frequency Equity Pairs Trading: Transaction Costs, Speed of Execution and Patterns in Returns [点击浏览原文]
- 作者:Bowen, Hutchinson, O’Sullivan
- 机构:科克大学(University College Cork)
<摘要>
本文研究了2007年1月至12月期间FTSE100成分股的高频配对交易特性。研究表明,策略的超额收益对交易成本和执行速度极为敏感。尽管配对交易能够捕捉统计套利机会,但其表现受限于市场流动性、交易滑点和执行延迟的影响。研究进一步表明,在严格控制这些因素的情况下,基于协整关系的配对交易仍然能够实现风险调整后的显著回报,同时揭示了收益模式与交易频率之间的复杂关系。

IV. 回测表现
| 年化收益率 | 10.23% |
| 波动率 | 4.98% |
| Beta | 0.003 |
| 夏普比率 | 1.25 |
| 索提诺比率 | -0.34 |
| 最大回撤 | N/A |
| 胜率 | 56% |
V. 完整python代码
import itertools as it
from AlgorithmImports import *
from typing import List, Dict
import numpy as np
class PairsTradingIntradayBasis(QCAlgorithm):
def Initialize(self) -> None:
self.SetStartDate(2010, 1, 1)
self.SetCash(100000)
self.symbol:Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
# Daily price data.
self.history_price:Dict[Symbol, RollingWindow] = {}
self.formation_period:int = 264
self.trading_period:int = 132
self.period:int = 22
self.leverage:int = 5
self.min_share_price:int = 5
self.overlapping_portfolios_count:int = 6 # 132 / 22 = 6
self.overlapping_portfolios:List[Symbol] = []
# Equally weighted brackets.
self.max_traded_pairs:int = 5
self.fundamental_sorting_key = lambda x: x.DollarVolume
self.fundamental_count:int = 100
self.selection_hours:int = 1
self.selection_flag:bool = True
self.Settings.MinimumOrderMarginPortfolioPercentage = 0.
self.settings.daily_precise_end_time = False
self.UniverseSettings.Resolution = Resolution.Hour
self.AddUniverse(self.FundamentalSelectionFunction)
def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
for security in changes.AddedSecurities:
security.SetFeeModel(CustomFeeModel())
security.SetLeverage(self.leverage)
for security in changes.RemovedSecurities:
symbol:Symbol = security.Symbol
if symbol in self.history_price:
del self.history_price[symbol]
symbols:List[Symbol] = [x for x in self.history_price.keys() if x != self.symbol]
self.symbol_pairs:List[Tuple[Symbol]] = list(it.combinations(symbols, 2))
# minimize the sum of squared deviations
distances:Dict[Tuple[Symbol], float] = {}
for pair in self.symbol_pairs:
if self.history_price[pair[0]].IsReady and self.history_price[pair[1]].IsReady:
distances[pair] = self.Distance(self.history_price[pair[0]], self.history_price[pair[1]])
new_portfolio = TradedPortfolio(self)
# sorting 5 pairs with no assets repetition
if len(distances) != 0:
sorted_pairs:List[Tuple[Tuple[Symbol], float]] = sorted(distances.items(), key = lambda x: x[1])
sorted_pairs:List[Tuple[Symbol]] = [x[0] for x in sorted_pairs]
# self.sorted_pairs.append(sorted_pairs[0])
new_portfolio.add_pair(sorted_pairs[0])
selected_assets:List[List[Symbol]] = [sorted_pairs[0][0], sorted_pairs[0][1]]
for pair in sorted_pairs:
symbol1:Symbol = pair[0]
symbol2:Symbol = pair[1]
# symbols which has not been selected yet
if symbol1 not in selected_assets and \
symbol2 not in selected_assets:
# self.sorted_pairs.append(pair)
new_portfolio.add_pair(pair)
selected_assets.extend([symbol1, symbol2])
if len(new_portfolio.sorted_pairs) == self.max_traded_pairs:
break
# liquidate firstly added portfolio
if len(self.overlapping_portfolios) == self.overlapping_portfolios_count:
item_to_remove:Symbol = self.overlapping_portfolios[0]
result = item_to_remove.liquidate()
if not result:
self.Log('Nothing to liquidate or liquidation error')
self.overlapping_portfolios.remove(item_to_remove)
# append new portfolio
self.overlapping_portfolios.append(new_portfolio)
def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> List[Symbol]:
if not self.selection_flag:
return Universe.Unchanged
self.selection_flag = False
selected:List[Fundamental] = [
x for x in fundamental if x.HasFundamentalData and x.Price > self.min_share_price and x.Market == 'usa'
]
if len(selected) > self.fundamental_count:
selected = [x for x in sorted(selected, key=self.fundamental_sorting_key, reverse=True)[:self.fundamental_count]]
# Warmup price rolling windows.
for stock in selected:
symbol:Symbol = stock.Symbol
if symbol in self.history_price:
continue
self.history_price[symbol] = RollingWindow[float](self.formation_period)
history:dataframe = self.History(symbol, self.formation_period, Resolution.Hour)
if history.empty:
self.Log(f"Not enough data for {symbol} yet")
continue
closes:Series = history.loc[symbol].close
for time, close in closes.items():
self.history_price[symbol].Add(close)
return [x.Symbol for x in selected if self.history_price[x.Symbol].IsReady]
def OnData(self, data: Slice) -> None:
for symbol in self.history_price:
symbol_obj:Symbol = self.Symbol(symbol)
if symbol_obj in data and data[symbol_obj]:
price:float = data[symbol_obj].Value
self.history_price[symbol].Add(price)
if self.selection_hours == self.period:
self.selection_flag = True
self.selection_hours = 1
else:
self.selection_hours += 1
# manage overlapping portfolios
for portfolio in self.overlapping_portfolios:
pairs_to_remove:List[Tuple[Symbol]] = []
for pair in portfolio.sorted_pairs:
if pair[0] not in self.history_price or pair[1] not in self.history_price:
continue
if not self.history_price[pair[0]].IsReady or not self.history_price[pair[1]].IsReady:
continue
# Calculate the spread of two price series.
price_a:List[float] = [x for x in self.history_price[pair[0]]]
price_b:List[float] = [x for x in self.history_price[pair[1]]]
norm_a:np.ndarray = np.array(price_a) / price_a[-1]
norm_b:np.ndarray = np.array(price_b) / price_b[-1]
spread:np.ndarray = norm_a - norm_b
mean:float = np.mean(spread)
std:float = np.std(spread)
actual_spread:float = spread[0]
# Long-short position is opened when pair prices have diverged by two standard deviations.
traded_portfolio_value = self.Portfolio.TotalPortfolioValue / self.overlapping_portfolios_count / self.max_traded_pairs
if actual_spread > mean + 2*std or actual_spread < mean - 2*std:
if pair not in portfolio.traded_pairs:
# open new position for pair, if there's place for it.
if len(portfolio.traded_pairs) < self.max_traded_pairs:
symbol_a:Symbol = pair[0]
symbol_b:Symbol = pair[1]
a_price_norm:float = norm_a[0]
b_price_norm:float = norm_b[0]
a_price:float = price_a[0]
b_price:float = price_b[0]
if symbol_a in data and data[symbol_a] and symbol_b in data and data[symbol_b]:
# a stock's price > b stock's price
if a_price_norm > b_price_norm:
long_q:float = traded_portfolio_value / b_price # long b stock
short_q:float = -traded_portfolio_value / a_price # short a stock
if self.Securities.ContainsKey(symbol_a) and self.Securities.ContainsKey(symbol_b) and \
self.Securities[symbol_a].Price != 0 and self.Securities[symbol_a].IsTradable and \
self.Securities[symbol_b].Price != 0 and self.Securities[symbol_b].IsTradable:
self.MarketOrder(symbol_a, short_q)
self.MarketOrder(symbol_b, long_q)
portfolio.traded_quantity[pair] = (short_q, long_q)
portfolio.traded_pairs.append(pair)
# b stock's price > a stock's price
else:
long_q = traded_portfolio_value / a_price
short_q = -traded_portfolio_value / b_price
if self.Securities.ContainsKey(symbol_a) and self.Securities.ContainsKey(symbol_b) and \
self.Securities[symbol_a].Price != 0 and self.Securities[symbol_a].IsTradable and \
self.Securities[symbol_b].Price != 0 and self.Securities[symbol_b].IsTradable:
self.MarketOrder(symbol_a, long_q)
self.MarketOrder(symbol_b, short_q)
portfolio.traded_quantity[pair] = (long_q, short_q)
portfolio.traded_pairs.append(pair)
# The position is closed when prices revert back.
else:
portfolio.liquidate_pair(pair)
def Distance(self, price_a:List[float], price_b:List[float]):
# Calculate the sum of squared deviations between two normalized price series.
price_a:List[float] = [x for x in price_a]
price_b:List[float] = [x for x in price_b]
norm_a:np.ndarray = np.array(price_a) / price_a[-1]
norm_b:np.ndarray = np.array(price_b) / price_b[-1]
return sum((norm_a - norm_b)**2)
# Custom fee model.
class CustomFeeModel(FeeModel):
def GetOrderFee(self, parameters):
fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
return OrderFee(CashAmount(fee, "USD"))
class TradedPortfolio():
def __init__(self, algorithm):
self.traded_pairs = []
self.traded_quantity = {}
self.sorted_pairs = []
self.algorithm = algorithm
def add_pair(self, pair) -> None:
self.sorted_pairs.append(pair)
def liquidate(self) -> bool:
result = False
for pair in self.traded_pairs:
if pair in self.traded_quantity:
self.algorithm.MarketOrder(pair[0], -self.traded_quantity[pair][0])
self.algorithm.MarketOrder(pair[1], -self.traded_quantity[pair][1])
result = True
return result
def liquidate_pair(self, pair) -> None:
if pair in self.traded_pairs and pair in self.traded_quantity:
self.algorithm.MarketOrder(pair[0], -self.traded_quantity[pair][0])
self.algorithm.MarketOrder(pair[1], -self.traded_quantity[pair][1])
self.traded_pairs.remove(pair)
del self.traded_quantity[pair]