from AlgorithmImports import *
import statsmodels.api as sm
import data_tools
# endregion
class ClimatePolicyUncertaintyAndTheCrossSectionOfStockReturns(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2000, 1, 1)
self.SetCash(100000)
self.leverage:int = 5
self.period:int = 21
self.regression_period:int = 60
self.max_missing_months:int = 2
self.quantile:int = 10
self.weights:dict[Symbol, float] = {}
self.prices:dict[Symbol, RollingWindow] = {}
self.exchanges:list[str] = ['NYS', 'NAS', 'ASE']
self.cpu_index:Symbol = self.AddData(data_tools.CPUIndex, 'CPU_INDEX', Resolution.Daily).Symbol
self.fama_french:Symbol = self.AddData(data_tools.QuantpediaFamaFrench, 'fama_french_3_factor_monthly', Resolution.Daily).Symbol
self.market_symbol:Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
self.prev_market_price:float = None
self.regression_data:RegressionData = data_tools.RegressionData(self.regression_period)
self.coarse_count:int = 1000
self.recent_month:int = -1
self.selection_flag:bool = False
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
security.SetFeeModel(data_tools.CustomFeeModel())
security.SetLeverage(self.leverage)
def CoarseSelectionFunction(self, coarse):
if not self.selection_flag:
return Universe.Unchanged
if self.coarse_count < 3000:
selected:list = sorted([x for x in coarse if x.HasFundamentalData and x.Price >= 2 and x.Price <= 1000],
key=lambda x: x.DollarVolume, reverse=True)[:self.coarse_count]
else:
selected:list = [x for x in coarse if x.HasFundamentalData and x.Price >= 2 and x.Price <= 1000]
selected_symbols:list[Symbol] = []
for stock in selected:
symbol:Symbol = stock.Symbol
if symbol not in self.prices:
# init rolling window for newly selected stock
self.prices[symbol] = RollingWindow[float](self.regression_period + 1)
selected_symbols.append(symbol)
for equity in coarse:
symbol:Symbol = equity.Symbol
if symbol in self.prices:
# update monthly prices
self.prices[symbol].Add(equity.AdjustedPrice)
return [x for x in selected_symbols if self.prices[x].IsReady]
def FineSelectionFunction(self, fine):
# make sure regression data are ready
if not self.regression_data.is_ready():
return Universe.Unchanged
# make sure regression data are up to date, otherwise reset them
elif not self.regression_data.data_still_coming(self.max_missing_months, self.Time.date()):
self.regression_data.reset()
return Universe.Unchanged
# filter fine
fine:list = [x for x in fine if x.MarketCap != 0 and x.MarketCap >= 5000000 and x.SecurityReference.ExchangeId in self.exchanges]
if len(fine) > self.coarse_count:
sorted_by_market_cap:list = sorted(fine, key = lambda x: x.MarketCap, reverse=True)
fine:list = sorted_by_market_cap[:self.coarse_count]
cpu_beta:dict = {}
regression_x:list = self.regression_data.regression_x()
for stock in fine:
symbol:Symbol = stock.Symbol
prices:np.array = np.array([x for x in self.prices[symbol]])
regression_y:np.array = (prices[:-1] / prices[1:]) - 1
regression_model = self.MultipleLinearRegression(regression_x, regression_y)
cpu_beta_value:float = regression_model.params[1]
cpu_beta[stock] = cpu_beta_value
# make sure there are enough stocks for selection
if len(cpu_beta) < self.quantile:
return Universe.Unchanged
quantile:int = int(len(cpu_beta) / self.quantile)
# low CPU beta stocks are in the top decile whereas high CPU beta stocks are in the bottom decile
sorted_by_beta:list = [x[0] for x in sorted(cpu_beta.items(), key=lambda item: item[1], reverse=True)]
# the zero-investment portfolio is constructed by long the top decile and short the bottom decile
long_part:list = sorted_by_beta[-quantile:]
short_part:list = sorted_by_beta[:quantile]
total_long_cap:float = sum([stock.MarketCap for stock in long_part])
for stock in long_part:
self.weights[stock.Symbol] = stock.MarketCap / total_long_cap
total_short_cap:float = sum([stock.MarketCap for stock in short_part])
for stock in short_part:
self.weights[stock.Symbol] = -stock.MarketCap / total_short_cap
return list(self.weights.keys())
def OnData(self, data):
rebalance_flag = False
# update regression data when fama french and cpu data come and market prices are ready
if self.fama_french in data and data[self.fama_french] and self.cpu_index in data and data[self.cpu_index]:
if self.recent_month != self.Time.month:
rebalance_flag = True
self.recent_month = self.Time.month
if self.Securities.ContainsKey(self.market_symbol) and self.Securities[self.market_symbol].Price != 0:
# get latest market price - market may not be opened at the CPU and FF arrival date
# market_price:float = data[self.market_symbol].Value
market_price:float = self.Securities[self.market_symbol].Price
if self.prev_market_price != None:
market_return:float = (market_price - self.prev_market_price) / self.prev_market_price
self.regression_data.update(
data[self.cpu_index].Value,
market_return,
data[self.fama_french].Market,
data[self.fama_french].Size,
data[self.fama_french].Value,
self.Time.date()
)
self.prev_market_price = market_price
# rebalance monthly
if rebalance_flag:
self.selection_flag = True
return
if not self.selection_flag:
return
self.selection_flag = False
# trade execution
invested = [x.Key for x in self.Portfolio if x.Value.Invested]
for symbol in invested:
if symbol not in self.weights:
self.Liquidate(symbol)
for symbol, w in self.weights.items():
if symbol in data and data[symbol]:
self.SetHoldings(symbol, w)
self.weights.clear()
def MultipleLinearRegression(self, x:list, y:list):
x:np.array = np.array(x).T
x = sm.add_constant(x)
result = sm.OLS(endog=y, exog=x).fit()
return result