
“该策略使用ETF或期货交易标普500的均值回归,每年通过步进前测优化六个参数,并为多头和空头交易设定独立阈值,以确保适应市场变化。”
资产类别: 差价合约(CFD)、交易所交易基金(ETF)、期货 | 地区: 美国 | 周期:
每日 | 市场: 股票 | 关键词: 自适应反转
I. 策略概要
该策略利用标普500指数的短期均值回归特性,通过ETF、期货或差价合约(CFD)进行交易。它采用六个优化参数——三用于多头交易,三用于空头交易,并通过步进前测每年评估。
对于多头交易:
- LongZScore 计算方法:收盘价与指数平滑移动平均线(EMA)之间的偏离程度(基于优化周期 X 计算),再除以 X 天内收盘价的标准差。
- Buy Level:设置 LongZScore 的初始买入阈值。
- BuySecond:设定二级 LongZScore 阈值,以便在市场进一步偏离均值时加仓。
空头交易采用类似的参数设定。所有参数每年优化更新,以确保策略能够实时适应市场变化。
II. 策略合理性
该学术论文未描述其所利用的均值回归效应的基本原因。然而,该系统的自适应特性在一定程度上保证了其能够对市场结构的潜在变化做出反应。
III. 来源论文
Developing Robust Trading Systems, with Implications for Position Sizing and System Health [点击查看论文]
- Howard B. Bandy, Ph.D.
<摘要>
目标:
作为交易者和投资者,尤其是作为积极的投资者,重要的是要对以下几点有高度的信心:
- 所使用的交易系统是稳健的。
- 交易系统是健康的。
- 交易的规模是正确的,能够在保持回撤在可接受百分比范围内的同时,实现最快的资本增长。
- 有应对回撤的计划。
本文描述了获得这种信心的独特且实用的技巧,并通过一个经过全面分析的交易系统进行说明。


IV. 回测表现
| 年化回报 | 19.9% |
| 波动率 | N/A |
| β值 | 0.147 |
| 夏普比率 | N/A |
| 索提诺比率 | -0.045 |
| 最大回撤 | -18% |
| 胜率 | 58% |
V. 完整的 Python 代码
from AlgorithmImports import *
from dateutil.relativedelta import relativedelta
from pandas.core.frame import dataframe
from typing import List
import sys
# endregion
class ShorttermAdaptiveReversalinSP500Index(QCAlgorithm):
def Initialize(self):
self.SetStartDate(1999, 1, 1)
self.SetCash(100000)
# market subscription and consolidator
self.market:Symbol = self.AddEquity("SPY", Resolution.Minute).Symbol
self.consolidator = TradeBarConsolidator(timedelta(days=1))
self.consolidator.DataConsolidated += self.consolidation_handler
self.SubscriptionManager.AddConsolidator(self.market, self.consolidator)
# history warmup
min_day_period:int = 200
history:dataframe = self.History(self.market, start=self.Time.date() - relativedelta(years=1), end=self.Time.date(), resolution=Resolution.Daily)
if len(history) >= min_day_period and 'close' in history.columns:
self.history = history['close'].unstack(level=0).dropna()
else:
message:str = f'Need at least {min_day_period} days of market warmup data -> SPY data is available from 1998'
self.Debug(message)
self.Quit(message)
# optimization setting
self.optimize_long:bool = True
self.optimize_short:bool = False
self.opt_flags:List[bool] = [True, False] if (self.optimize_long and self.optimize_short) else [True] if (self.optimize_long and not self.optimize_short) else [False] if (not self.optimize_long and self.optimize_short) else []
self.opt_buy_ema:float = None
self.opt_sell_ema:float = None
self.opt_buy_level:float = None
self.opt_sell_level:float = None
# optimization parameters
self.optimize_flag:bool = False
self.ema_param_range:List[int] = list(range(2, 21, 1))
self.buy_level_param_range:List[float] = list(np.arange(-5, 0, 0.1))
self.sell_level_param_range:List[float] = list(np.arange(0, 5, 0.1))
self.Schedule.On(self.DateRules.MonthStart(self.market), self.TimeRules.AfterMarketOpen(self.market), self.Optimize)
def consolidation_handler(self, sender, consolidated) -> None:
# store daily data
self.history.loc[consolidated.EndTime, consolidated.Symbol] = consolidated.Close
self.history = self.history.iloc[1:]
def Optimize(self) -> None:
if self.Time.month == 1:
self.optimize_flag = True
def OnData(self, data: Slice) -> None:
# one minute before close
if self.Time.hour == 15 and self.Time.minute == 59:
if self.market in data and data[self.market]:
market_price_df:dataframe = self.history
signal:bool = False
for buy_flag in self.opt_flags:
opt_ema = self.opt_buy_ema if buy_flag else self.opt_sell_ema
opt_level = self.opt_buy_level if buy_flag else self.opt_sell_level
if opt_ema is not None and opt_level is not None:
# signal calculation
ema:float = market_price_df.ewm(span=opt_ema, adjust=False).mean().iloc[-1].values[0]
std:float = market_price_df.rolling(opt_ema).std().iloc[-1].values[0]
z_score:float = (-ema + data[self.market].Value) / std
signal = opt_level > z_score if buy_flag else opt_level < z_score
# trade execution
if signal:
self.SetHoldings(self.market, 1 if buy_flag else -1)
else:
self.Liquidate(self.market)
if not self.optimize_flag:
return
self.optimize_flag = False
# replace history
self.history = self.History(self.market, start=self.Time.date() - relativedelta(years=1), end=self.Time.date(), resolution=Resolution.Daily)['close'].unstack(level=0).dropna()
market_price_df:dataframe = self.history
market_perf_df:dataframe = market_price_df.pct_change()
# construct EMA and STD dataframes
ema_df:dataframe = pd.dataframe()
std_df:dataframe = pd.dataframe()
for ema_period in self.ema_param_range:
ema_df[f'EMA_{ema_period}'] = market_price_df.ewm(span=ema_period, adjust=False).mean()
std_df[f'STD_{ema_period}'] = market_price_df.rolling(ema_period).std()
# long_filter:dataframe = market_history_df >= ema_df['EMA_1'] # unnecessary
z_score:np.ndarray = ((-ema_df.values).T + market_price_df[self.market].values) / std_df.values.T
# optimization for long and short
for buy_flag in self.opt_flags:
opt_param_sharpe = sys.float_info.min
opt_ema:int = 0
opt_level:int = 0
opt_found:bool = False
for level in self.buy_level_param_range if buy_flag else self.sell_level_param_range:
# calculate performance for each portfolio with particular buy level used
signal:np.ndarray = (level > z_score).astype(float) if buy_flag else (level < z_score).astype(float)
portfolio_perf:np.ndarray = market_perf_df[self.market].values[1:] * np.array([x[1:] for x in signal])
portfolio_cumulative_perf:np.ndarray = (portfolio_perf + 1).cumprod(axis=1)
# sharpe ratio
portfolio_return:np.ndarray = np.diff(np.log(portfolio_cumulative_perf))
mean_return:np.ndarray = np.mean(portfolio_return, axis=1)
std_dev:np.ndarray = np.std(portfolio_return, axis=1)
sharpe_ratio:np.ndarray = mean_return / std_dev
if not all(np.isnan(x) for x in sharpe_ratio):
index_of_max_sharpe:int = np.nanargmax(sharpe_ratio)
if sharpe_ratio[index_of_max_sharpe] > opt_param_sharpe:
opt_param_sharpe:float = sharpe_ratio[index_of_max_sharpe]
opt_ema:int = self.ema_param_range[index_of_max_sharpe]
opt_level:int = level
opt_found = True
# store optimal parameters for following year
if opt_found:
if buy_flag:
self.opt_buy_ema = opt_ema
self.opt_buy_level = opt_level
else:
self.opt_sell_ema = opt_ema
self.opt_sell_level = opt_level