Quant Buffet放轻松,别过度思虑

利用期权信息在股票市场中获利

登录后收藏

学术论文

Exploiting Option Information in the Equity Market

作者作者:Baltussen; 机构:鹿特丹伊拉斯姆斯大学(Erasmus University Rotterdam)

机构
  • ?Van der Grient, De Groot, Zhou, Hennink
  • ?Northern Trust Asset Management,Robeco定量投资团队,Rabobank International
论文摘要

公共期权市场信息在流动性强的大盘股投资范围内包含可利用的信息,用于股票市场投资者的策略。基于多种期权指标的策略可以预测标的股票的回报和阿尔法。尽管这些策略的高周转率使交易成本成为重要因素,但通过简单的交易成本调整后,仍能获得显著的净阿尔法收益。研究结果显示,期权市场指标在股票市场中提供了宝贵的预测能力,特别是在利用大盘股的投资机会时。

策略概要

该策略针对纽约证券交易所(NYSE)、美国证券交易所(AMEX)和纳斯达克(NASDAQ)市值最大的100只股票,利用期权市场的指标预测股票表现。每周对一个月期权进行分类为平值(ATM)或虚值(OTM),并计算四项关键指标:

OTM波动率偏斜(反映对负面价格风险的担忧)

已实现波动率与隐含波动率的差值(捕捉波动率风险)

ATM波动率偏斜(表明信息交易和跳跃溢价)

ATM波动率偏斜的变化(预测信息交易变化)

将这些指标标准化为z分数后取平均值,并按此排名将股票分为五分位组。对排名前五分位(最优股票)进行多头交易,对排名后五分位(最差股票)进行空头交易。投资组合按等权重配置,每周重新平衡。

策略合理性

学术研究表明,交易者可能更倾向于在期权市场利用私人信息,因为期权市场的交易成本较低、财务杠杆更高且空头限制较少。如果股票投资者未能利用这些信息,期权市场与股票市场之间会产生一种“领先-滞后”关系。该策略通过期权市场信息捕捉这种滞后效应,为股票市场投资者提供超额回报的机会。

回测表现

年化收益7.61%
波动率6.69%
贝塔0.417
夏普比率0.54
索提诺比率-0.009
胜率59%

完整 Python 代码

from AlgorithmImports import *
from typing import Dict, List
class ExploitingOptionInformationintheEquityMarket(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 1, 1)
self.SetCash(1000000)

self.min_expiry:int = 25
self.max_expiry:int = 35

self.daily_period:int = 21              # stock daily price period
self.vol_skew_change_period:int = 5     # weekly change in ATM vol skew period

self.min_share_price:int = 5
self.leverage:int = 20
self.quantile:int = 5
self.price_threshold:float = [0.8 , 0.95]
self.prices:Dict[Symbol, RollingWindow] = {}
self.symbols_by_ticker:Dict[str, Symbol] = {}
self.subscribed_contracts:Dict[Symbol, Contracts] = {}
self.ATM_volatility_skew_values:Dict[Symbol, RollingWindow] = {}

self.day:int = -1
self.fundamental_sorting_key = lambda x: x.DollarVolume
self.fundamental_count:int = 100
self.selection_flag:bool = True
self.rebalance_flag:bool = False
self.wait_one_day = False

self.Settings.MinimumOrderMarginPortfolioPercentage = 0.
self.UniverseSettings.Resolution = Resolution.Minute
self.AddUniverse(self.FundamentalSelectionFunction)
self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw))
self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw

market = self.AddEquity('SPY', Resolution.Minute).Symbol
self.Schedule.On(self.DateRules.Every(DayOfWeek.Tuesday), self.TimeRules.BeforeMarketClose(market), self.Rebalance)

def Rebalance(self) -> None:
self.rebalance_flag = True

def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
for security in changes.AddedSecurities:
    security.SetFeeModel(CustomFeeModel())
    security.SetLeverage(self.leverage)

for security in changes.RemovedSecurities:
    symbol:Symbol = security.Symbol
    if symbol in self.ATM_volatility_skew_values:
        # delete ATM vol skew values
        del self.ATM_volatility_skew_values[symbol]

def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> List[Symbol]:
# update daily prices of stocks in self.data dictionary
for stock in fundamental:
    symbol:Symbol = stock.Symbol
    
    if symbol in self.prices:
        self.prices[symbol].Add(stock.AdjustedPrice)

# rebalance, when contracts expiried
if not self.selection_flag:
    return Universe.Unchanged

# select top n stocks by dollar volume
selected:List[Fundamental] = [
    x for x in fundamental if x.HasFundamentalData and x.Market == 'usa' and x.Price > self.min_share_price
]
if len(selected) > self.fundamental_count:
    selected = [x for x in sorted(selected, key=self.fundamental_sorting_key, reverse=True)[:self.fundamental_count]]
selected_symbols:List[Symbol] = []
        
for stock in selected:
    symbol:Symbol = stock.Symbol
    ticker:str = symbol.Value 
    
    selected_symbols.append(symbol)
    self.symbols_by_ticker[ticker] = symbol
    
    if symbol in self.prices:
        continue
    
    self.prices[symbol] = RollingWindow[float](self.daily_period)
    history:DataFrame = self.History(symbol, self.daily_period, Resolution.Daily)
    if history.empty:
        continue
    closes:Series = history.loc[symbol].close
    for time, close in closes.items():
        self.prices[symbol].Add(close)
# return newly selected symbols
return selected_symbols
def OnData(self, data: Slice) -> None:
# execute once a day
if self.day == self.Time.day:
    return
if not (self.Time.hour == 9 and self.Time.minute == 31):
    return
self.day = self.Time.day

# check if any of the subscribed contracts expired
for _, symbol in self.symbols_by_ticker.items():
    if symbol in self.subscribed_contracts and self.subscribed_contracts[symbol].expiry_date <= self.Time.date():
        # remove expired contracts
        for contract in [self.subscribed_contracts[symbol].atm_call, self.subscribed_contracts[symbol].atm_put, self.subscribed_contracts[symbol].otm_put]:
            if self.Securities[contract].IsTradable:
                # self.RemoveSecurity(contract)
                self.Liquidate(contract)
            
        # remove Contracts object for current symbol
        del self.subscribed_contracts[symbol]

# perform next selection, when there are no active contracts
if len(self.subscribed_contracts) == 0 and not self.selection_flag:
    # liquidate leftovers
    if self.Portfolio.Invested:
        self.Liquidate()
        
    self.symbols_by_ticker.clear()
    self.selection_flag = True
    return
# subscribe to new contracts after selection
elif len(self.subscribed_contracts) == 0 and self.selection_flag:
    self.selection_flag = False
    
    for _, symbol in self.symbols_by_ticker.items():
        if symbol in self.prices:
            if self.prices[symbol].IsReady:
                # get all contracts for current stock symbol
                contracts:List[Symbol] = self.OptionChainProvider.GetOptionContractList(symbol, self.Time)
                # get current price for etf
                underlying_price:float = self.prices[symbol][0]
                
                # get strikes from commodity future contracts
                strikes:List[float] = [i.ID.StrikePrice for i in contracts]
                
                # can't filter contracts, if there isn't any strike price
                if len(strikes) <= 0 or underlying_price == 0:
                    continue
                
                # filter calls and puts contracts with one month expiry
                atm_calls, atm_puts = self.FilterContracts(strikes, contracts, underlying_price, OptionType.ATM)
                _, otm_puts = self.FilterContracts(strikes, contracts, underlying_price, OptionType.OTM)
                
                # make sure, there is at least one call and put contract
                if len(atm_calls) > 0 and len(atm_puts) > 0 and len(otm_puts) > 0:
                    # sort by expiry
                    atm_call:Symbol = sorted(atm_calls, key = lambda x: x.ID.Date, reverse=True)[0]
                    atm_put:Symbol = sorted(atm_puts, key = lambda x: x.ID.Date, reverse=True)[0]
                    otm_put:Symbol = sorted(otm_puts, key = lambda x: x.ID.Date, reverse=True)[0]
                    
                    subscriptions = self.SubscriptionManager.SubscriptionDataConfigService.GetSubscriptionDataConfigs(atm_call.Underlying)
                    if subscriptions:
                        # add contracts
                        self.AddContract(atm_call)
                        self.AddContract(atm_put)
                        self.AddContract(otm_put)
                        
                        # retrieve expiry date for contracts
                        expiry_date:datetime.date = atm_call.ID.Date.date() if atm_call.ID.Date.date() < atm_put.ID.Date.date() else atm_put.ID.Date.date()
                        expiry_date:datetime.date = expiry_date if expiry_date < otm_put.ID.Date.date() else otm_put.ID.Date.date()
                        
                        # store contracts with expiry date under stock's symbol
                        self.subscribed_contracts[symbol] = Contracts(expiry_date, underlying_price, atm_call, atm_put, otm_put)
                    
# calculate measures
elif len(self.subscribed_contracts) != 0 and data.OptionChains.Count != 0:
    if self.rebalance_flag:
        OTM_volatility_skew:Dict[Symbol, float] = {}
        volatility_spread:Dict[Symbol, float] = {}
        ATM_volatility_skew:Dict[Symbol, float] = {}
        ATM_volatility_skew_change:Dict[Symbol, float] = {}
        
    for kvp in data.OptionChains:
        chain:OptionChain = kvp.Value
        ticker:str = chain.Underlying.Symbol.Value
        if ticker in self.symbols_by_ticker:
            # get stock's symbol
            symbol:Symbol = self.symbols_by_ticker[ticker]
            # get contracts
            contracts:List[OptionConctract] = [x for x in chain]
            # check if there are enough contracts for option and daily prices are ready
            if len(contracts) < 3 or not self.prices[symbol].IsReady:
                continue
            
            # get call and put implied volatility
            atm_call_iv, atm_put_iv, otm_put_iv = self.GetImpliedVolatilities(contracts)
            if atm_call_iv is not None and atm_put_iv is not None and otm_put_iv is not None:
                ATM_vol_skew:float = atm_put_iv - atm_call_iv
                # store daily vol skew value
                if symbol not in self.ATM_volatility_skew_values:
                    self.ATM_volatility_skew_values[symbol] = RollingWindow[float](self.vol_skew_change_period)
                self.ATM_volatility_skew_values[symbol].Add(ATM_vol_skew)
                
                if self.rebalance_flag:
                    if self.ATM_volatility_skew_values[symbol].IsReady:
                        if self.ATM_volatility_skew_values[symbol][4] != 0 and self.ATM_volatility_skew_values[symbol][0] != 0:
                            # 1.measure
                            OTM_volatility_skew[symbol] = otm_put_iv - atm_call_iv
                            
                            # 2.measure
                            hv:float = self.GetHistoricalVolatility(self.prices[symbol])
                            atm_iv:float = (atm_call_iv + atm_put_iv) / 2 
                            volatility_spread[symbol] = hv - atm_iv
                            
                            # 3.measure
                            ATM_volatility_skew[symbol] = ATM_vol_skew
                            
                            # 4.measure
                            # weekly change in ATM vol skew
                            ATM_volatility_skew_change[symbol] = self.ATM_volatility_skew_values[symbol][0] / self.ATM_volatility_skew_values[symbol][4] - 1
    
    if self.rebalance_flag:
        self.rebalance_flag = False
        
        # can't perform selection
        if len(OTM_volatility_skew) < self.quantile:
            self.Liquidate()
            return
        
        # z-score normalization
        OTM_volatility_skew_score:Dict[Symbol, float] = self.StandardizedZscore(OTM_volatility_skew)
        volatility_spread_score:Dict[Symbol, float] = self.StandardizedZscore(volatility_spread)
        ATM_volatility_skew_score:Dict[Symbol, float] = self.StandardizedZscore(ATM_volatility_skew)
        ATM_volatility_skew_change_score:Dict[Symbol, float] = self.StandardizedZscore(ATM_volatility_skew_change)
        
        # score calculation
        combined_zscore:Dict[Symbol, float] = { x[0] : np.mean([x[1], volatility_spread_score[x[0]], ATM_volatility_skew_score[x[0]], ATM_volatility_skew_change_score[x[0]]]) for x in OTM_volatility_skew_score.items() }
        
        # perform selection
        quantile:int = int(len(combined_zscore) / self.quantile)
        sorted_by_zscore:List[Symbol] = [x[0] for x in sorted(combined_zscore.items(), key=lambda item: item[1], reverse=True)]
        
        # long top
        long:List[Symbol] = sorted_by_zscore[-quantile:]
        # short bottom
        short:List[Symbol] = sorted_by_zscore[:quantile]
        
        # trade execution
        invested:List[Symbol] = [x.Key for x in self.Portfolio if x.Value.Invested]
        for symbol in invested:
            if symbol not in long + short:
                # self.Liquidate(symbol)
                self.MarketOnCloseOrder(symbol, -self.Portfolio[symbol].Quantity)
        
        long_n:int = len(long)
        short_n:int = len(short)
        
        for symbol in long:
            if symbol in data and data[symbol]:
                q = int((self.Portfolio.TotalPortfolioValue / long_n) / self.prices[symbol][0])
                self.MarketOnCloseOrder(symbol, q)
            
        for symbol in short:
            if symbol in data and data[symbol]:
                q = int((self.Portfolio.TotalPortfolioValue / short_n) / self.prices[symbol][0])
                self.MarketOnCloseOrder(symbol, -q)
            
def StandardizedZscore(self, collection:dict) -> dict:
collection_values = list(collection.values())
# median
collection_values_median:float = np.median(collection_values)
# avg
collection_values_avg:float = np.mean(collection_values)
# median absolute deviation
collection_med:float = np.median(np.array([abs(x-collection_values_median) for x in collection_values]))

max_cap:int = 3
result:dict = { x[0]: min(max_cap, max((x[1] - collection_values_avg) / collection_med, -max_cap)) for x in collection.items() }

return result

def FilterContracts(self, strikes, contracts, underlying_price, option_type) -> tuple():
''' filter call and put contracts from contracts parameter '''
''' returns call and put contracts tuple '''

strike:float = None
if option_type == OptionType.ATM:
    # at the money strike
    strikes:List[float] = [x for x in strikes if x > self.price_threshold[1] *underlying_price]
    if len(strikes) != 0:
        strike:float = min(strikes, key=lambda x: abs(x-underlying_price))
elif option_type == OptionType.OTM:
    # out the money 
    strikes:List[float] = [x for x in strikes if x <= self.price_threshold[1] *underlying_price and x > self.price_threshold[0] *underlying_price]
    if len(strikes) != 0:
        strike:float = min(strikes, key=lambda x: abs(x-(underlying_price* self.price_threshold[0])))

calls:List[Symbol] = [] # storing call contracts
puts:List[Symbol] = [] # storing put contracts
if strike is not None:
    for contract in contracts:
        # check if contract has one month expiry
        if self.min_expiry < (contract.ID.Date - self.Time).days < self.max_expiry:
            # check if contract is call
            if contract.ID.OptionRight == OptionRight.Call and contract.ID.StrikePrice == strike:
                calls.append(contract)
            # check if contract is put
            elif contract.ID.OptionRight == OptionRight.Put and contract.ID.StrikePrice == strike:
                puts.append(contract)

# return filtered calls and puts with one month expiry
return calls, puts

def AddContract(self, contract):
''' subscribe option contract, set price model and normalization mode '''
option:OptionContract = self.AddOptionContract(contract, Resolution.Minute)
option.PriceModel = OptionPriceModels.CrankNicolsonFD()

def GetImpliedVolatilities(self, contracts):
''' retrieve implied volatility of contracts from contracts parameteres '''
''' returns call and put implied volatility '''
atm_call_iv:Union[None, float] = None
atm_put_iv:Union[None, float] = None
otm_put_iv:Union[None, float] = None

underlying_symbol:Symbol = contracts[0].Symbol.Underlying

if underlying_symbol in self.subscribed_contracts:
    # go through option contracts
    for c in contracts:
        # atm call IV
        if c.Symbol.Value == self.subscribed_contracts[underlying_symbol].atm_call.Value:
            atm_call_iv = c.ImpliedVolatility
        # atm put IV
        elif c.Symbol.Value == self.subscribed_contracts[underlying_symbol].atm_put.Value:
            atm_put_iv = c.ImpliedVolatility
        # otm put IV
        elif c.Symbol.Value == self.subscribed_contracts[underlying_symbol].otm_put.Value:
            otm_put_iv = c.ImpliedVolatility
    
return atm_call_iv, atm_put_iv, otm_put_iv

def GetHistoricalVolatility(self, rolling_window_prices) -> float:
''' calculate historical volatility based on daily prices in rolling_window_prices parameter '''
prices:np.ndarray = np.array([x for x in rolling_window_prices])
returns:np.ndarray = (prices[:-1] - prices[1:]) / prices[1:]
return np.std(returns)
from enum import Enum
class OptionType(Enum):
ATM:str = 'atm'
OTM:str = 'otm'
ITM:str = 'itm'
class Contracts():
def __init__(self, expiry_date:datetime.date, underlying_price:float, atm_call:Symbol, atm_put:Symbol, otm_put:Symbol) -> None:
self.expiry_date:datetime.date = expiry_date
self.underlying_price:float = underlying_price
# self.contracts = contracts  # = [atm_call, atm_put, otm_put]
self.atm_call:Symbol = atm_call
self.atm_put:Symbol = atm_put
self.otm_put:Symbol = otm_put

# Custom fee model
class CustomFeeModel(FeeModel):
def GetOrderFee(self, parameters):
fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
return OrderFee(CashAmount(fee, "USD"))