“该策略使用15个预测因子和加权回归预测标普500股票溢价,并根据缩放预测和RMSE调整后的风险管理,在股票和国库券之间动态分配资金。”

I. 策略概要

该策略专注于标普500指数,使用15个预测变量(包括通货膨胀变化、工业生产、信用风险溢价等)来预测一个月期的股票风险溢价。预测因子使用500天的向后窗口进行标准化,并应用衰减因子为0.99的加权最小二乘(WLS)回归,强调近期数据。双向逐步过程基于赤池信息准则选择统计上显著的变量。该模型每月重新估计(自1990年6月起),并每天使用更新的预测因子来预测当月的股票溢价。

投资分配基于缩放的股票溢价预测:预测通过均方根误差(RMSE)的倒数进行调整,并乘以五。正的股票溢价预测导致按比例投资于标普500指数,其余投资于国库券。负的预测导致全部投资于国库券。该模型每月进行调整,重新校准参数以适应不断变化的市场条件,同时确保采用动态和风险调整后的股票分配方法。

II. 策略合理性

通过利用宏观经济变量作为未来投资机会的代理,回报可预测性可以优于买入并持有策略。尽管这些变量(包括商品价格和汇率)通常变动缓慢,但它们结合起来可以准确预测下个月的超额市场回报,反映当前的商业状况。变量选择限制了参数,创建了具有卓越样本外表现的简约模型。加权最小二乘(WLS)回归优先考虑近期数据而非较旧的观测值,以提高准确性。双向逐步选择结合了向前选择(添加最大化改进的变量)和向后消除(删除不显著的变量)。这种方法改进了模型,增强了它们的解释能力和预测可靠性,以实现持续的超额回报。

III. 来源论文

Return Predictability and Market-Timing: A One-Month Model [点击查看论文]

<摘要>

我们提出了一个由15个不同变量构建的为期一个月的市场择时模型。我们使用带有逐步变量选择的加权最小二乘法,为一个月后的市场超额回报构建预测模型。通过我们的统计模型,我们将预测转化为可投资头寸,以构建市场择时策略。从2003年到2017年,我们的策略实现了16.6%的年回报率,夏普比率为0.92,最大回撤为20.3%,而标普500指数的年回报率为10%,夏普比率为0.46,最大回撤为55.2%。当我们的一个月模型与Hull和Qiao(2017)的六个月模型结合使用时,组合策略的夏普比率超过了各个模型的夏普比率。组合模型实现了15%的年回报率,夏普比率为1.12,最大回撤为14%。我们在我们的每日报告中发布一个月模型的预测。

IV. 回测表现

年化回报16.6%
波动率16.6%
β值0
夏普比率0.76
索提诺比率0
最大回撤-20.3%
胜率0%

V. 完整的 Python 代码

from AlgorithmImports import *
import data_tools
from pandas.core.frame import dataframe
from dateutil.relativedelta import relativedelta
import statsmodels.api as sm
from statsmodels.tools.eval_measures import rmse
# endregion
class TimingSP500UsingaLargeSetofForecastingVariables(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2000, 1, 1)
        self.SetCash(100000)
        self.period:int = 24
        self.quarter_period:int = 3 # due to Deliquencies quarterly data
        self.month_period:int = 21
        self.max_alocation:float = 1.5
        self.decay_factor:float = 0.99
        self.leverage:int = 3
        self.max_missing_days:int = 31
        self.independent_variables_num:int = 12
        self.market:Symbol = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.bil:Symbol = self.AddEquity('BIL', Resolution.Daily).Symbol
        self.commodity_index:Symbol = self.AddEquity('GSG', Resolution.Daily).Symbol
        self.dollar_index:Symbol = self.AddEquity('UUP', Resolution.Daily).Symbol
        self.baltic_index:Symbol = self.AddData(data_tools.BalticDryIndex, 'BADI', Resolution.Daily).Symbol
        for symbol in [self.market, self.bil]:
            self.Securities[symbol].SetLeverage(self.leverage)
        quandl_tickers:List[str] = ['RATEINF/CPI_USA']
        self.consumer_index_usa:Symbol = self.AddData(data_tools.QuandlValue, 'RATEINF/CPI_USA', Resolution.Daily).Symbol
        quarterly_custom_data:List[str] = ['DRALACBN_YOY']
        monthly_custom_data:List[str] = ['INDPRO_YOY', 'BAA_AAA', 'NEWORDER', 'HOUST', 'UNEMPLOYMENT_RATE']
        daily_custom_data:List[str] = ['T10Y3M']
        self.data:Dict[Symbol, float] = {}
        self.quarterly_custom_data:List[Symbol] = [self.AddData(data_tools.QuarterlyCustomData, x, Resolution.Daily).Symbol for x in quarterly_custom_data]
        self.monthly_custom_data:List[Symbol] = [self.AddData(data_tools.MonthlyCustomData, x, Resolution.Daily).Symbol for x in monthly_custom_data]
        self.daily_custom_data:List[Symbol] = [self.AddData(data_tools.DailyCustomData, x, Resolution.Daily).Symbol for x in daily_custom_data] + [self.baltic_index]
        self.qc_data:List[Symbol] = [self.consumer_index_usa]
        self.recent_month:int = -1
    def OnData(self, data: Slice):
        # monthly rebalance
        if self.recent_month == self.Time.month:
            return
        self.recent_month = self.Time.month
        quarterly_custom_data_last_update_date:Dict[Symbol, datetime.date] = data_tools.QuarterlyCustomData._last_update_date
        monthly_custom_data_last_update_date:Dict[Symbol, datetime.date] = data_tools.MonthlyCustomData._last_update_date
        daily_custom_data_last_update_date:Dict[Symbol, datetime.date] = data_tools.DailyCustomData._last_update_date
        # check if data is still arriving
        if all([self.Securities[x].GetLastData() for x in self.quarterly_custom_data + self.monthly_custom_data + self.daily_custom_data + self.qc_data]) \
            and any([self.Time.date() >= monthly_custom_data_last_update_date[x] for x in monthly_custom_data_last_update_date]) \
            and any([self.Time.date() >= quarterly_custom_data_last_update_date[x] for x in quarterly_custom_data_last_update_date]) \
            and any([self.Time.date() >= daily_custom_data_last_update_date[x] for x in daily_custom_data_last_update_date]) \
            and any((self.Time.date() - self.Securities[x].GetLastData().Time.date()).days >= self.max_missing_days for x in self.qc_data):
            self.Liquidate()
            return Universe.Unchanged
        # call history on all variables
        history:dataframe = self.History(self.monthly_custom_data + self.daily_custom_data + self.qc_data + self.quarterly_custom_data, start=self.Time.date() - relativedelta(months=self.period + self.quarter_period), end=self.Time.date())['value'].unstack(level=0)
        history_equities:dataframe = self.History([self.market, self.commodity_index, self.dollar_index], start=self.Time.date() - relativedelta(months=self.period), end=self.Time.date())['close'].unstack(level=0)
        history = history.groupby(pd.Grouper(freq='MS')).last()
        history_equities = history_equities.groupby(pd.Grouper(freq='MS')).last()
        history['DRALACBN_YOY'] = history['DRALACBN_YOY'].fillna(method='ffill') # forward fill due quarterly data
        history = history[self.quarter_period:]
        history = pd.concat([history, history_equities], axis=1)
        history = history[1:]
        if history.iloc[0].isna().any():
            return
        
        if len(history.columns) < self.independent_variables_num:
            return
        history['normalized_market'] = history[self.market]
        history = history.fillna(method='ffill')
        history[['INDPRO_YOY', self.commodity_index, self.dollar_index, self.baltic_index, self.consumer_index_usa, 'normalized_market']] = history[['INDPRO_YOY', self.commodity_index, self.dollar_index, self.baltic_index, self.consumer_index_usa, 'normalized_market']].pct_change()
        history[['HOUST', 'UNEMPLOYMENT_RATE', 'DRALACBN_YOY']] = history[['HOUST', 'UNEMPLOYMENT_RATE', 'DRALACBN_YOY']].diff()
        # normalize variables with their 2 year standard deviation
        history_normalized = history.loc[:, history.columns != self.market].iloc[-1] / history.loc[:, history.columns != self.market][:-1].std()
        
        # save values to expanding list
        for symbol, value in history_normalized.items():
            if symbol not in self.data:
                self.data[symbol] = []
            self.data[symbol].append(value)
        
        if self.market not in self.data:
            self.data[self.market] = []
        self.data[self.market].append(history[self.market].pct_change().iloc[-1])
            
        # run factor analysis
        if len(self.data[next(iter(self.data))]) >= self.period:
            df:dataframe = pd.dataframe(self.data)
            result_factors:Dict[Symbol, float] = self.factor_analysis(self.market, df, len(history.loc[:, history.columns != self.market].columns))
            # run regression on best model and predict return of market 
            if result_factors:
                y:np.ndarray = df[self.market][1:].values
                x:np.ndarray = df[list(result_factors.keys())][:-1].values
                model = self.multiple_linear_regression(x, y)
                predicted_y:np.ndarray = model.predict(sm.add_constant(df[list(result_factors.keys())][-1:].values, has_constant='add'))
                rmse_value:float = rmse(y, predicted_y)
                alocation:float = predicted_y * (1 / rmse_value) * 5
                # trade execution
                if predicted_y < 0:
                    self.Liquidate()
                    self.SetHoldings(self.bil, 1)
                    return
                traded_alocation:Tuple[float, float] = (alocation if alocation < self.max_alocation else self.max_alocation, self.max_alocation - alocation if alocation < self.max_alocation else 0)
                if self.market in data and data[self.market] and self.bil in data and data[self.bil]:
                    self.SetHoldings(self.market, traded_alocation[0])
                    self.SetHoldings(self.bil, traded_alocation[1])
    def factor_analysis(self, asset_id: str, factor_df: pd.dataframe, n:int=0, ignore_negative_loading:bool = False) -> Dict:
        """ 
        Performs factor analysis.
        :param asset_id: Id of asset that is going to be fit.
        :param factor_df: Performance dataframe with all the factors and asset with asset_id.
        :param n: Max number of assets in the model.
        :return: Fitted model dictionary.
        """
        order:List[str] = []
        y:np.ndarray = factor_df[asset_id].values 
        reduced_df:dataframe = factor_df.drop([asset_id], axis=1)
        # reduced_df.drop(['Date'], axis=1, inplace=True)
        
        for i, _ in enumerate(reduced_df.items()):
            # not negative beta models properties
            aic_reduced:List[float] = []
            col_names_reduced:List[str] = []
                
            for j, (j_asset_name, _) in enumerate(reduced_df.items()):
                if j_asset_name in order:
                    continue
                
                if i > len(order) - 1:
                    order.append(j_asset_name)
                else:
                    order[i] = j_asset_name
                
                order_reduced:List[str] = [x for x in order if x != '']
                x:np.ndarray = reduced_df[order_reduced].values
                model = self.multiple_linear_regression(x, y)
                aic:float = model.aic
                
                if ignore_negative_loading:
                    # store only not negative beta models
                    if not any(x < 0. for x in betas_):
                        aic_reduced.append(aic)
                        col_names_reduced.append(j_asset_name)
                else:
                    aic_reduced.append(aic)
                    col_names_reduced.append(j_asset_name)
            
            if len(aic_reduced) == 0:
                order[i] = ''
            else:
                min_aic_index:int = min(range(len(aic_reduced)), key=aic_reduced.__getitem__)   # index of lowest aic
                order[i] = col_names_reduced[min_aic_index]
                
            if i > 0:
                order_reduced:List[str] = [x for x in order if x != '']
                previous_order:List[str] = [x for x in order_reduced[:-1]]
                
                if len(order_reduced) != 0 and len(previous_order) != 0:
                    # build previous and actual regression models
                    x:np.ndarray = reduced_df[order_reduced].values
                    model = self.multiple_linear_regression(x, y)
                    aic:float = model.aic
                    betas:List[float] = model.params[1:]
                    # betas, _, lm_aic = self.multiple_linear_regression(x, y)
                    x = reduced_df[previous_order].values
                    model = self.multiple_linear_regression(x, y)
                    prev_aic:float = model.aic
                    prev_betas:List[float] = model.params[1:]
                    # prev_betas, _, previous_aic = self.multiple_linear_regression(y, x)
                    
                    if (aic >= prev_aic) or (n != 0 and len(order_reduced) > n):
                        result:Dict[str, float] = { order_id : beta for order_id, beta in zip(previous_order, betas.tolist()) }
                        return result
        
        order_reduced:List[str] = [x for x in order if x != '']
        x:np.ndarray = reduced_df[order_reduced].values
        model = self.multiple_linear_regression(x, y)
        betas:List[float] = model.params[1:]
        # betas, _, lm_aic = self.multiple_linear_regression(y, x)
        result:Dict[str, float] = { order_id : betas  for order_id, betas in zip(order_reduced, betas.tolist()) }
        return result
    def multiple_linear_regression(self, x:np.ndarray, y:np.ndarray):
        # x:np.ndarray = np.array(x).T
        x = sm.add_constant(x, prepend=True)
        w:List[float] = self.decay_factor ** np.arange(len(y), 0, -1)
        result = sm.WLS(endog=y, exog=x, weights=w).fit()
        return result

发表评论

了解 Quant Buffet 的更多信息

立即订阅以继续阅读并访问完整档案。

继续阅读