“该策略针对NYSE、AMEX和NASDAQ上市的中大盘股,剔除外资股和存托凭证。首先,对每只股票的对数价格进行0.9阶分数差分,混合动量与短期反转信号。计算预期对数回报后,按历史数据和回溯期预测回报。最后,做多预期回报最高的五分之一,做空最低的五分之一,策略每日调整,每周持仓。”
资产类别:股票 | 区域:美国 | 频率:每日 | 市场:股票 | 关键词:动量,分数差分
策略概述
投资领域包括NYSE、AMEX和NASDAQ上市且股票代码为10和11的股票。外资股、存托凭证等非普通股票被排除。此外,投资领域仅包含市值排名前90%的中大盘股,市值数据来自CRSP。第一步,针对每只股票,使用其完整历史的对数价格向量进行分数差分处理。分数差分基于时间序列分析中的滞后算子,具体细节见论文第2.1节。分数差分算子的阶数设为0.9,这将产生动量与短期反转信号的混合效应(当d=0时为短期反转,d=1时为经典动量)。首先获取分数差分后的对数价格序列,反转后可计算预期对数回报。然后,基于历史数据和回溯期,使用加权方法预测回报(见公式11和12)。最后,将股票按预期回报排序,做多回报最高的五分之一,做空回报最低的五分之一。策略每日调整,每周持仓周期。
策略合理性
该策略的功能性主要来源于使用时间序列方法和分数差分滤波器处理价格序列。相比简单动量或反转仅使用回溯期开始和结束两个数据点,该方法可以利用整个回溯期内的所有数据。尽管过程复杂,但它本质上可以理解为动量和反转信号的结合。该策略有效性源于两种广泛研究的投资风格的结合。作者还通过大量的稳健性测试评估了交易成本的影响。
论文来源
Momentum Without Crashes [点击浏览原文]
- Soros Chitsiripanich, Marc S. Paolella, Pawel Polak, Patrick S. Walker,苏黎世大学金融系,瑞士金融研究所,石溪大学应用数学与统计系,高级计算科学研究所,OLZ AG
<摘要>
我们构建了一个动量因子,通过一个加权方案识别横截面的赢家和输家,该方案结合了整个回溯期内的所有价格数据,而不仅仅是窗口期的起始和结束价格。该加权方案来源于分数差分滤波器,这是一种统计变换,能够在数据中保持记忆效应,并经济性地结合回报中的反转和动量模式。我们的大量样本外分析表明,新的分数动量策略不仅实现了显著更高的(风险调整后)回报,还减少了经典动量和短期反转策略中臭名昭著的回撤。绩效结果在交易成本和其他现实摩擦下表现出稳健性;超额回报无法通过其他资产定价因子解释,并在不同资产领域和国外市场中广泛存在。


回测表现
| 年化收益率 | 20.3% |
| 波动率 | 22.1% |
| Beta | -0.214 |
| 夏普比率 | 0.92 |
| 索提诺比率 | 0.089 |
| 最大回撤 | N/A |
| 胜率 | 51% |
完整python代码
from AlgorithmImports import *
import data_tools
import numpy as np
from dateutil.relativedelta import relativedelta
from typing import List, Dict, Tuple
from math import factorial, prod
from decimal import Decimal
# endregion
class MomentumBasedonFractionalDifferenceFilter(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2000, 1, 1)
self.SetCash(100000)
self.market:Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
self.data:Dict[Symbol, float] = {}
self.stocks_to_liquidate:List[data_tools.HoldingItem] = []
self.traded_portfolio_portion:Dict[Symbol, float] = {}
self.t:int = 252
self.period:int = self.t * 5
self.quantile:int = 5
self.leverage:int = 25
self.holding_period:int = 5
self.fundamental_count:int = 50
self.d:Decimal = Decimal(0.9)
# pi and weights calculations
self.pi_list:List[Decimal] = [ self.pi(i, self.d) for i in range(self.period) ]
self.weights:List[Decimal] = [ self.w(np.array(list(range(self.period))), self.pi_list[i], i) for i in range(self.period) ]
self.rebalance_flag:bool = False
self.Settings.MinimumOrderMarginPortfolioPercentage = 0
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.FundamentalSelectionFunction)
self.Schedule.On(self.DateRules.EveryDay(self.market), self.TimeRules.AfterMarketOpen(self.market), self.Selection)
def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
for security in changes.AddedSecurities:
security.SetFeeModel(data_tools.CustomFeeModel())
security.SetLeverage(self.leverage)
def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> List[Symbol]:
# store daily stock prices
for stock in fundamental:
symbol:Symbol = stock.Symbol
if symbol in self.data:
self.data[symbol].update_daily_return(stock.AdjustedPrice)
selected:List[Symbol] = [x.Symbol
for x in sorted([x for x in fundamental if x.HasFundamentalData and x.Market == 'usa' and x.MarketCap != 0 and \
(x.SecurityReference.ExchangeId == 'NYS') or (x.SecurityReference.ExchangeId == 'NAS') or (x.SecurityReference.ExchangeId == 'ASE')],
key = lambda x: x.DollarVolume, reverse = True)[:self.fundamental_count]]
# price warmup and prediction
predict_by_symbol:Dict[Symbol, float] = {}
for symbol in selected:
if symbol not in self.data:
self.data[symbol] = data_tools.SymbolData(self.period)
# price warmup
history:DataFrame = self.History(symbol, self.period, Resolution.Daily)
if history.empty:
self.Log(f"Not enough data for {symbol} yet.")
continue
closes:pd.Series = history.loc[symbol].close
for time, close in closes.items():
self.data[symbol].update_daily_return(close)
if self.data[symbol].is_ready():
# prediction
prices:List[float] = np.log(self.data[symbol].get_daily_prices())
predict:float = - (self.weights[len(self.weights) - self.t] / self.t - 1) * Decimal(sum([ prices[i] - prices[len(self.weights) - self.t] for i in range(len(self.weights) - self.t + 1, len(self.weights) - 1) ])) \
+ Decimal(sum([ (self.weights[i] + (self.weights[len(self.weights) - self.t] / (self.t - 1))) * Decimal(prices[i] - prices[len(self.weights) - 1]) for i in range(len(self.weights) - self.t + 1, len(self.weights) - 1) ]))
predict_by_symbol[symbol] = predict
# sort and divide into quantiles
if len(predict_by_symbol) >= self.quantile:
sorted_predicts:List[Symbol] = sorted(predict_by_symbol, key=predict_by_symbol.get, reverse=True)
quantile:int = len(sorted_predicts) // self.quantile
long = sorted_predicts[:quantile]
short = sorted_predicts[-quantile:]
for i, portfolio in enumerate([long, short]):
for symbol in portfolio:
self.traded_portfolio_portion[symbol] = ((-1) ** i) / len(portfolio) * (self.Portfolio.TotalPortfolioValue / self.holding_period)
self.rebalance_flag = True
return selected
def OnData(self, data: Slice) -> None:
if not self.rebalance_flag:
return
self.rebalance_flag = False
items_to_remove:List[data_tools.HoldingItem] = []
# execute order and hold for holding period
for item in self.stocks_to_liquidate:
item._holding_period += 1
if item._holding_period >= self.holding_period:
self.MarketOrder(item._symbol, -item._quantity)
items_to_remove.append(item)
# remove from collection
for item in items_to_remove:
self.stocks_to_liquidate.remove(item)
# execute order
for price_symbol, portfolio_portion in self.traded_portfolio_portion.items():
if price_symbol in data and data[price_symbol]:
final_quantity:int = portfolio_portion // data[price_symbol].Price
if portfolio_portion != 0:
self.MarketOrder(price_symbol, final_quantity)
self.stocks_to_liquidate.append(data_tools.HoldingItem(price_symbol, final_quantity))
self.traded_portfolio_portion.clear()
def Selection(self) -> None:
self.selection_flag = True
def w(self, price_dataset:np.ndarray, pi:Decimal, u:int) -> Decimal:
result:Decimal = (np.sum((np.arange((len(price_dataset) - u - self.t + 1), len(price_dataset) - u)) * pi) / self.t) - self.pi((len(price_dataset) - u + 1), self.d) if u < len(price_dataset) - self.t \
else ((np.sum((np.arange(len(price_dataset) - u)) * pi) / self.t) - self.pi((len(price_dataset) - u + 1), self.d) if u in range((len(price_dataset) - self.t + 1) , len(price_dataset) - 1) \
else (self.pi(0, self.d) / self.t) - self.pi(1, self.d) - 1)
return result
def pi(self, s:int, d:Decimal) -> Decimal:
s_factorial:Decimal = Decimal(factorial(s))
result:Decimal = Decimal(((-1) ** s) * (prod([ Decimal(d - i) / s_factorial for i in range(s-1) ])))
return result
