
“该策略使用15个预测因子和加权回归预测标普500股票溢价,并根据缩放预测和RMSE调整后的风险管理,在股票和国库券之间动态分配资金。”
资产类别: 差价合约、ETFs、基金、期货 | 地区: 美国 | 周期: 每月 | 市场: 股票 | 关键词: 标普500
I. 策略概要
该策略专注于标普500指数,使用15个预测变量(包括通货膨胀变化、工业生产、信用风险溢价等)来预测一个月期的股票风险溢价。预测因子使用500天的向后窗口进行标准化,并应用衰减因子为0.99的加权最小二乘(WLS)回归,强调近期数据。双向逐步过程基于赤池信息准则选择统计上显著的变量。该模型每月重新估计(自1990年6月起),并每天使用更新的预测因子来预测当月的股票溢价。
投资分配基于缩放的股票溢价预测:预测通过均方根误差(RMSE)的倒数进行调整,并乘以五。正的股票溢价预测导致按比例投资于标普500指数,其余投资于国库券。负的预测导致全部投资于国库券。该模型每月进行调整,重新校准参数以适应不断变化的市场条件,同时确保采用动态和风险调整后的股票分配方法。
II. 策略合理性
通过利用宏观经济变量作为未来投资机会的代理,回报可预测性可以优于买入并持有策略。尽管这些变量(包括商品价格和汇率)通常变动缓慢,但它们结合起来可以准确预测下个月的超额市场回报,反映当前的商业状况。变量选择限制了参数,创建了具有卓越样本外表现的简约模型。加权最小二乘(WLS)回归优先考虑近期数据而非较旧的观测值,以提高准确性。双向逐步选择结合了向前选择(添加最大化改进的变量)和向后消除(删除不显著的变量)。这种方法改进了模型,增强了它们的解释能力和预测可靠性,以实现持续的超额回报。
III. 来源论文
Return Predictability and Market-Timing: A One-Month Model [点击查看论文]
- 布莱尔·赫尔、肖·乔奥 和 佩特拉·巴科索娃。HTAA,有限责任公司。香港城市大学。赫尔战术公司。
<摘要>
我们提出了一个由15个不同变量构建的为期一个月的市场择时模型。我们使用带有逐步变量选择的加权最小二乘法,为一个月后的市场超额回报构建预测模型。通过我们的统计模型,我们将预测转化为可投资头寸,以构建市场择时策略。从2003年到2017年,我们的策略实现了16.6%的年回报率,夏普比率为0.92,最大回撤为20.3%,而标普500指数的年回报率为10%,夏普比率为0.46,最大回撤为55.2%。当我们的一个月模型与Hull和Qiao(2017)的六个月模型结合使用时,组合策略的夏普比率超过了各个模型的夏普比率。组合模型实现了15%的年回报率,夏普比率为1.12,最大回撤为14%。我们在我们的每日报告中发布一个月模型的预测。


IV. 回测表现
| 年化回报 | 16.6% |
| 波动率 | 16.6% |
| β值 | 0 |
| 夏普比率 | 0.76 |
| 索提诺比率 | 0 |
| 最大回撤 | -20.3% |
| 胜率 | 0% |
V. 完整的 Python 代码
from AlgorithmImports import *
import data_tools
from pandas.core.frame import dataframe
from dateutil.relativedelta import relativedelta
import statsmodels.api as sm
from statsmodels.tools.eval_measures import rmse
# endregion
class TimingSP500UsingaLargeSetofForecastingVariables(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2000, 1, 1)
self.SetCash(100000)
self.period:int = 24
self.quarter_period:int = 3 # due to Deliquencies quarterly data
self.month_period:int = 21
self.max_alocation:float = 1.5
self.decay_factor:float = 0.99
self.leverage:int = 3
self.max_missing_days:int = 31
self.independent_variables_num:int = 12
self.market:Symbol = self.AddEquity("SPY", Resolution.Daily).Symbol
self.bil:Symbol = self.AddEquity('BIL', Resolution.Daily).Symbol
self.commodity_index:Symbol = self.AddEquity('GSG', Resolution.Daily).Symbol
self.dollar_index:Symbol = self.AddEquity('UUP', Resolution.Daily).Symbol
self.baltic_index:Symbol = self.AddData(data_tools.BalticDryIndex, 'BADI', Resolution.Daily).Symbol
for symbol in [self.market, self.bil]:
self.Securities[symbol].SetLeverage(self.leverage)
quandl_tickers:List[str] = ['RATEINF/CPI_USA']
self.consumer_index_usa:Symbol = self.AddData(data_tools.QuandlValue, 'RATEINF/CPI_USA', Resolution.Daily).Symbol
quarterly_custom_data:List[str] = ['DRALACBN_YOY']
monthly_custom_data:List[str] = ['INDPRO_YOY', 'BAA_AAA', 'NEWORDER', 'HOUST', 'UNEMPLOYMENT_RATE']
daily_custom_data:List[str] = ['T10Y3M']
self.data:Dict[Symbol, float] = {}
self.quarterly_custom_data:List[Symbol] = [self.AddData(data_tools.QuarterlyCustomData, x, Resolution.Daily).Symbol for x in quarterly_custom_data]
self.monthly_custom_data:List[Symbol] = [self.AddData(data_tools.MonthlyCustomData, x, Resolution.Daily).Symbol for x in monthly_custom_data]
self.daily_custom_data:List[Symbol] = [self.AddData(data_tools.DailyCustomData, x, Resolution.Daily).Symbol for x in daily_custom_data] + [self.baltic_index]
self.qc_data:List[Symbol] = [self.consumer_index_usa]
self.recent_month:int = -1
def OnData(self, data: Slice):
# monthly rebalance
if self.recent_month == self.Time.month:
return
self.recent_month = self.Time.month
quarterly_custom_data_last_update_date:Dict[Symbol, datetime.date] = data_tools.QuarterlyCustomData._last_update_date
monthly_custom_data_last_update_date:Dict[Symbol, datetime.date] = data_tools.MonthlyCustomData._last_update_date
daily_custom_data_last_update_date:Dict[Symbol, datetime.date] = data_tools.DailyCustomData._last_update_date
# check if data is still arriving
if all([self.Securities[x].GetLastData() for x in self.quarterly_custom_data + self.monthly_custom_data + self.daily_custom_data + self.qc_data]) \
and any([self.Time.date() >= monthly_custom_data_last_update_date[x] for x in monthly_custom_data_last_update_date]) \
and any([self.Time.date() >= quarterly_custom_data_last_update_date[x] for x in quarterly_custom_data_last_update_date]) \
and any([self.Time.date() >= daily_custom_data_last_update_date[x] for x in daily_custom_data_last_update_date]) \
and any((self.Time.date() - self.Securities[x].GetLastData().Time.date()).days >= self.max_missing_days for x in self.qc_data):
self.Liquidate()
return Universe.Unchanged
# call history on all variables
history:dataframe = self.History(self.monthly_custom_data + self.daily_custom_data + self.qc_data + self.quarterly_custom_data, start=self.Time.date() - relativedelta(months=self.period + self.quarter_period), end=self.Time.date())['value'].unstack(level=0)
history_equities:dataframe = self.History([self.market, self.commodity_index, self.dollar_index], start=self.Time.date() - relativedelta(months=self.period), end=self.Time.date())['close'].unstack(level=0)
history = history.groupby(pd.Grouper(freq='MS')).last()
history_equities = history_equities.groupby(pd.Grouper(freq='MS')).last()
history['DRALACBN_YOY'] = history['DRALACBN_YOY'].fillna(method='ffill') # forward fill due quarterly data
history = history[self.quarter_period:]
history = pd.concat([history, history_equities], axis=1)
history = history[1:]
if history.iloc[0].isna().any():
return
if len(history.columns) < self.independent_variables_num:
return
history['normalized_market'] = history[self.market]
history = history.fillna(method='ffill')
history[['INDPRO_YOY', self.commodity_index, self.dollar_index, self.baltic_index, self.consumer_index_usa, 'normalized_market']] = history[['INDPRO_YOY', self.commodity_index, self.dollar_index, self.baltic_index, self.consumer_index_usa, 'normalized_market']].pct_change()
history[['HOUST', 'UNEMPLOYMENT_RATE', 'DRALACBN_YOY']] = history[['HOUST', 'UNEMPLOYMENT_RATE', 'DRALACBN_YOY']].diff()
# normalize variables with their 2 year standard deviation
history_normalized = history.loc[:, history.columns != self.market].iloc[-1] / history.loc[:, history.columns != self.market][:-1].std()
# save values to expanding list
for symbol, value in history_normalized.items():
if symbol not in self.data:
self.data[symbol] = []
self.data[symbol].append(value)
if self.market not in self.data:
self.data[self.market] = []
self.data[self.market].append(history[self.market].pct_change().iloc[-1])
# run factor analysis
if len(self.data[next(iter(self.data))]) >= self.period:
df:dataframe = pd.dataframe(self.data)
result_factors:Dict[Symbol, float] = self.factor_analysis(self.market, df, len(history.loc[:, history.columns != self.market].columns))
# run regression on best model and predict return of market
if result_factors:
y:np.ndarray = df[self.market][1:].values
x:np.ndarray = df[list(result_factors.keys())][:-1].values
model = self.multiple_linear_regression(x, y)
predicted_y:np.ndarray = model.predict(sm.add_constant(df[list(result_factors.keys())][-1:].values, has_constant='add'))
rmse_value:float = rmse(y, predicted_y)
alocation:float = predicted_y * (1 / rmse_value) * 5
# trade execution
if predicted_y < 0:
self.Liquidate()
self.SetHoldings(self.bil, 1)
return
traded_alocation:Tuple[float, float] = (alocation if alocation < self.max_alocation else self.max_alocation, self.max_alocation - alocation if alocation < self.max_alocation else 0)
if self.market in data and data[self.market] and self.bil in data and data[self.bil]:
self.SetHoldings(self.market, traded_alocation[0])
self.SetHoldings(self.bil, traded_alocation[1])
def factor_analysis(self, asset_id: str, factor_df: pd.dataframe, n:int=0, ignore_negative_loading:bool = False) -> Dict:
"""
Performs factor analysis.
:param asset_id: Id of asset that is going to be fit.
:param factor_df: Performance dataframe with all the factors and asset with asset_id.
:param n: Max number of assets in the model.
:return: Fitted model dictionary.
"""
order:List[str] = []
y:np.ndarray = factor_df[asset_id].values
reduced_df:dataframe = factor_df.drop([asset_id], axis=1)
# reduced_df.drop(['Date'], axis=1, inplace=True)
for i, _ in enumerate(reduced_df.items()):
# not negative beta models properties
aic_reduced:List[float] = []
col_names_reduced:List[str] = []
for j, (j_asset_name, _) in enumerate(reduced_df.items()):
if j_asset_name in order:
continue
if i > len(order) - 1:
order.append(j_asset_name)
else:
order[i] = j_asset_name
order_reduced:List[str] = [x for x in order if x != '']
x:np.ndarray = reduced_df[order_reduced].values
model = self.multiple_linear_regression(x, y)
aic:float = model.aic
if ignore_negative_loading:
# store only not negative beta models
if not any(x < 0. for x in betas_):
aic_reduced.append(aic)
col_names_reduced.append(j_asset_name)
else:
aic_reduced.append(aic)
col_names_reduced.append(j_asset_name)
if len(aic_reduced) == 0:
order[i] = ''
else:
min_aic_index:int = min(range(len(aic_reduced)), key=aic_reduced.__getitem__) # index of lowest aic
order[i] = col_names_reduced[min_aic_index]
if i > 0:
order_reduced:List[str] = [x for x in order if x != '']
previous_order:List[str] = [x for x in order_reduced[:-1]]
if len(order_reduced) != 0 and len(previous_order) != 0:
# build previous and actual regression models
x:np.ndarray = reduced_df[order_reduced].values
model = self.multiple_linear_regression(x, y)
aic:float = model.aic
betas:List[float] = model.params[1:]
# betas, _, lm_aic = self.multiple_linear_regression(x, y)
x = reduced_df[previous_order].values
model = self.multiple_linear_regression(x, y)
prev_aic:float = model.aic
prev_betas:List[float] = model.params[1:]
# prev_betas, _, previous_aic = self.multiple_linear_regression(y, x)
if (aic >= prev_aic) or (n != 0 and len(order_reduced) > n):
result:Dict[str, float] = { order_id : beta for order_id, beta in zip(previous_order, betas.tolist()) }
return result
order_reduced:List[str] = [x for x in order if x != '']
x:np.ndarray = reduced_df[order_reduced].values
model = self.multiple_linear_regression(x, y)
betas:List[float] = model.params[1:]
# betas, _, lm_aic = self.multiple_linear_regression(y, x)
result:Dict[str, float] = { order_id : betas for order_id, betas in zip(order_reduced, betas.tolist()) }
return result
def multiple_linear_regression(self, x:np.ndarray, y:np.ndarray):
# x:np.ndarray = np.array(x).T
x = sm.add_constant(x, prepend=True)
w:List[float] = self.decay_factor ** np.arange(len(y), 0, -1)
result = sm.WLS(endog=y, exog=x, weights=w).fit()
return result