“投资宇宙:在美国上市的公司。过滤条件:最低股票价格,日交易量。入场信号:今日收盘价 ≥ 历史最高收盘价。出场信号:10周期ATR(平均真实波幅)跟踪止损。等权重,每日再平衡,扣除0.5%的交易成本。”
资产类别:股票 | 区域:全球 | 频率:每日 | 市场:股票 | 关键词:趋势跟随
策略概述
投资宇宙包括在美国上市的公司。采用最低股票价格过滤器以避免投资低价股,并使用最低日交易量过滤器以避免流动性不足的股票。如果今日收盘价大于或等于该股票历史最高收盘价,则触发入场信号。使用10周期的平均真实波幅(ATR)跟踪止损作为出场信号。投资者持有满足入场标准且未被止损的所有股票。投资组合等权重分配,每日再平衡。每笔交易扣除0.5%的交易成本,包含估计的佣金和滑点。
策略合理性
行为偏差(如投资者的羊群效应、过度反应和不足反应等)会在金融市场中产生非正态的收益分布。趋势跟随系统通过削减长尾分布的左尾风险,使得相比多元化的买入并持有策略,趋势跟随系统具有更好的风险/回报特征。
论文来源
Does Trend Following Work on Stocks [点击浏览原文]
- Cole Wilcox,Blackstar Funds, LLC
- Eric Crittenden,Blackstar Funds, LLC
<摘要>
多年来,许多商品交易顾问、专有交易者和全球宏观对冲基金成功地应用各种趋势跟随方法在全球期货市场中盈利。然而,很少有研究发表关于将趋势跟随策略应用于股票市场的成果。假设趋势跟随策略适用于期货市场而不适用于股票市场,是否合理?我们决定通过运行一个仅做多的趋势跟随策略,并测试其在经过公司行为调整的美国股票数据库中的表现。我们包括退市公司数据以消除幸存者偏差,并应用现实的交易成本估计(滑点和佣金)。采用流动性过滤器限制假设交易仅限于当时流动性足够的股票。覆盖24,000多只证券,时间跨度为22年。实证结果强烈表明,股票市场中的趋势跟随策略确实提供了正向数学期望,这是构建有效投资或交易系统的基础要素。


回测表现
| 年化收益率 | 19.3% |
| 波动率 | 15.6% |
| Beta | 0.881 |
| 夏普比率 | 0.625 |
| 索提诺比率 | 0.638 |
| 最大回撤 | 34.1% |
| 胜率 | 59% |
完整python代码
import numpy as np
from AlgoLib import *
class TrendFollowingEffectinStocks(XXX):
def Initialize(self):
self.SetStartDate(2010, 1, 1)
self.SetCash(100000)
self.fundamental_count:int = 100
self.fundamental_sorting_key = lambda x: x.DollarVolume
self.long:List[Symbol] = []
self.max_close:Dict[Symbol, float] = {}
self.atr:Dict[Symbol, AverageTrueRange] = {}
self.atr_period:int = 10
self.sl_order:Dict[Symbol, OrderTicket] = {}
self.sl_price:Dict[Symbol, float] = {}
self.selection:List[Symbol] = []
self.period:int = 10*12*21
self.min_share_price:float = 5.
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.FundamentalSelectionFunction)
self.Settings.MinimumOrderMarginPortfolioPercentage = 0.
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
security.SetFeeModel(CustomFeeModel())
symbol = security.Symbol
if symbol not in self.atr:
self.atr[symbol] = self.ATR(symbol, self.atr_period, Resolution.Daily)
if symbol not in self.max_close:
hist = self.History([self.Symbol(symbol)], self.period, Resolution.Daily)
if 'close' in hist.columns:
closes:pd.Series = hist['close']
self.max_close[symbol] = max(closes)
def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> None:
selected:List[Fundamental] = [x for x in fundamental if x.HasFundamentalData and x.AdjustedPrice >= self.min_share_price]
if len(selected) > self.fundamental_count:
selected = [x for x in sorted(selected, key=self.fundamental_sorting_key, reverse=True)[:self.fundamental_count]]
self.selection = list(map(lambda x: x.Symbol, selected))
return self.selection
def OnData(self, data: Slice) -> None:
if self.IsWarmingUp:
return
for symbol in self.selection:
if symbol in data.Bars:
price:float = data[symbol].Value
if symbol not in self.max_close: continue
if price >= self.max_close[symbol]:
self.max_close[symbol] = price
self.long.append(symbol)
stocks_invested:List[Symbol] = [x.Key for x in self.Portfolio if x.Value.Invested]
count:int = len(self.long) + len(stocks_invested)
if count == 0: return
# Update stoploss orders
for symbol in stocks_invested:
if not self.Securities[symbol].IsTradable:
self.Liquidate(symbol)
if self.atr[symbol].Current.Value == 0: continue
# Move SL
if symbol not in self.sl_price: continue
self.SetHoldings(symbol, 1 / count)
new_sl = self.Securities[symbol].Price - self.atr[symbol].Current.Value
if new_sl > self.sl_price[symbol]:
update_order_fields = UpdateOrderFields()
update_order_fields.StopPrice = new_sl # Update SL price
quantity:float = self.CalculateOrderQuantity(symbol, (1 / count))
update_order_fields.Quantity = quantity # Update SL quantity
self.sl_price[symbol] = new_sl
self.sl_order[symbol].Update(update_order_fields)
# Open new trades
for symbol in self.long:
if not self.Portfolio[symbol].Invested and self.atr[symbol].Current.Value != 0:
price:float = data[symbol].Value
if self.Securities[symbol].IsTradable:
unit_size:float = self.CalculateOrderQuantity(symbol, (1 / count))
self.MarketOrder(symbol, unit_size)
sl_price:float = price - self.atr[symbol].Current.Value
self.sl_price[symbol] = sl_price
if unit_size != 0:
self.sl_order[symbol] = self.StopMarketOrder(symbol, -unit_size, sl_price, 'SL')
self.long.clear()
# Custom fee model.
class CustomFeeModel(FeeModel):
def GetOrderFee(self, parameters):
fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
return OrderFee(CashAmount(fee, "USD"))
