该策略专注于寻找24个国家的ETF中形成双底形态的上涨机会。最多同时持有4个仓位,最大杠杆为2:1。每月寻找新形态,但每日执行交易。关键参数包括滚动30天周期、局部低点之间最大1000天的间隔和2%的垂直距离。设置3%跟踪止损,目标利润为6%。

策略概述

投资范围包括24个国家的ETF(主要是iShares MSCI系列,此外还有iShares中国大盘ETF [FXI],美国由SPDR S&P 500 ETF [SPY]代表)。

如前所述,该策略专注于寻找长期上涨价格的国家指数中的双底形态。(基于这一条件,不考虑做空指数ETF。)

<交易执行>

同时最多可持有4个开放仓位,最大投资组合杠杆为2:1,因此ETF在投资组合中的最大权重为50%。我们每月寻找新的双顶和双底形态,但交易决策是每日执行的(进入和退出仓位)。

寻找局部低点。标准情况下,使用滚动的30天周期来回顾过去和前瞻未来。

在这些局部低点中寻找潜在的双底形态。两个局部高点/低点之间的最大距离被任意设定为1000个交易日,最小距离为30个交易日。(最大百分比为2%,即两个局部低点之间的垂直距离最大为2%,才有资格被视为潜在的双底形态)。

在双底水平处设置ETF的限价单,并设置3%的跟踪止损(SL),目标利润设置为风险回报比为1:2,因此目标利润为6%。如果仓位盈利,每天移动跟踪止损。

策略合理性

尽管技术分析看起来主观,并且在学术界可能没有太多的重视,但许多文章试图量化并对其进行精确的数学和统计分析。市场技术人员认为,当前价格包含了关于任何资产的所有信息。尽管这无疑是一种过于简化的假设,因为市场远比这复杂。但如果过度依赖基本面分析,你就假设你拥有市场所有基本信息,并假设其他人也掌握了完整的信息。显然,这离实际情况太远了。如果你只依赖图表交易,你也会观察到市场参与者的心理波动,这在价格飙升时经常可以从新闻中找到解释。但市场不仅由基本面推动,也由心理和价格驱动。因此,忽视技术分析并视之为“无稽之谈”是不明智的。

论文来源

Double Bottom Country Trading Strategy [点击浏览原文]

<摘要>

Quantpedia的这篇文章探讨了技术分析的方法,主要关注双底和双顶交易策略。尽管技术分析常常受到怀疑,甚至有人将其比作“男人的占星术”,但文章强调市场由基本面和心理共同驱动,完全忽视技术分析是不明智的。技术分析的历史可追溯到1600年代日本的Sokyu Honma以及现代的道氏理论。文章深入讨论了技术分析中的支撑、阻力和移动平均线等术语,重点介绍了作为图表反转指标的双顶和双底形态。这些形态可以表明价格图表中的强阻力或支撑。该研究的研究方法基于多年来收到的反馈和问题,表明技术分析依然是一个受欢迎的话题。

回测表现

年化收益率7.79%
波动率16.57%
Beta0.103
夏普比率0.47
索提诺比率0.181
最大回撤-30.38%
胜率53%

完整python代码

from AlgorithmImports import *
from pandas.core.frame import DataFrame
from datetime import datetime
# endregion
class DoubleBottomCountryTradingStrategy(QCAlgorithm):
    def Initialize(self) -> None:
        self.SetStartDate(2000, 1, 1)
        self.SetCash(100000)
        self.country_etfs:List[str] = [
            "EWA",  # iShares MSCI Australia Index ETF
            "EWO",  # iShares MSCI Austria Investable Mkt Index ETF
            "EWK",  # iShares MSCI Belgium Investable Market Index ETF
            "EWZ",  # iShares MSCI Brazil Index ETF
            "EWC",  # iShares MSCI Canada Index ETF
            "FXI",  # iShares China Large-Cap ETF
            "EWQ",  # iShares MSCI France Index ETF
            "EWG",  # iShares MSCI Germany ETF 
            "EWH",  # iShares MSCI Hong Kong Index ETF
            "EWI",  # iShares MSCI Italy Index ETF
            "EWJ",  # iShares MSCI Japan Index ETF
            "EWM",  # iShares MSCI Malaysia Index ETF
            "EWW",  # iShares MSCI Mexico Inv. Mt. Idx
            "EWN",  # iShares MSCI Netherlands Index ETF
            "EWS",  # iShares MSCI Singapore Index ETF
            "EZA",  # iShares MSCI South Africe Index ETF
            "EWY",  # iShares MSCI South Korea ETF
            "EWP",  # iShares MSCI Spain Index ETF
            "EWD",  # iShares MSCI Sweden Index ETF
            "EWL",  # iShares MSCI Switzerland Index ETF
            "EWT",  # iShares MSCI Taiwan Index ETF
            "THD",  # iShares MSCI Thailand Index ETF
            "EWU",  # iShares MSCI United Kingdom Index ETF
            "SPY",  # SPDR S&P 500 ETF
        ]
        self.rolling_period:int = 30
        # two local highs/lows max and min distance
        self.min_distance:int = self.rolling_period
        self.max_distance:int = 1000
        # max vertical distance between two local highs/lows
        self.max_percentage:float = .02
        self.stop_loss_pct:float = .03
        self.take_profit_pct:float = .06
        self.trade_long = True
        self.trade_short = False
        self.max_positions_opened:int = 4
        self.SetWarmup(self.max_distance, Resolution.Daily)
        self.price_data:Dict[str, List[Tuple(datetime.date, float)]] = {}
        # add Forex data
        for etf in self.country_etfs:
            data:Equity = self.AddEquity(etf, Resolution.Daily)
            data.SetLeverage(10)
            self.price_data[etf] = []
        
        self.recent_month:int = -1
    def OnData(self, data: Slice) -> None:
        # trail long SL for each opened positions
        active_open_buy_orders:List = list(self.Transactions.GetOrderTickets(lambda ticket: 'LIMIT_BUY' in ticket.Tag and ticket.Status == OrderStatus.Filled))
        for ticket in active_open_buy_orders:
            if self.Portfolio[ticket.Symbol].IsLong:
                active_sl_orders:List = list(self.Transactions.GetOrderTickets(lambda sl_ticket: 'SL' in sl_ticket.Tag \
                                            and sl_ticket.Symbol == ticket.Symbol \
                                            and sl_ticket.Status not in [OrderStatus.Filled, OrderStatus.Invalid, OrderStatus.Canceled]))
                if len(active_sl_orders) != 0:
                    active_sl_order = active_sl_orders[0]
                    # price moved into profit
                    if self.Securities[ticket.Symbol].Price > ticket.Get(OrderField.LimitPrice):
                        new_sl_price:float = round(self.Securities[ticket.Symbol].Price - (self.Securities[ticket.Symbol].Price * self.stop_loss_pct), 2)
                        if new_sl_price > active_sl_order.Get(OrderField.StopPrice):
                            active_sl_order.UpdateStopPrice(new_sl_price)
        # trail short SL for each opened positions
        active_open_buy_orders:List = list(self.Transactions.GetOrderTickets(lambda ticket: 'LIMIT_SELL' in ticket.Tag and ticket.Status == OrderStatus.Filled))
        for ticket in active_open_buy_orders:
            if self.Portfolio[ticket.Symbol].IsShort:
                active_sl_orders:List = list(self.Transactions.GetOrderTickets(lambda sl_ticket: 'SL' in sl_ticket.Tag \
                                            and sl_ticket.Symbol == ticket.Symbol \
                                            and sl_ticket.Status not in [OrderStatus.Filled, OrderStatus.Invalid, OrderStatus.Canceled]))
                if len(active_sl_orders) != 0:
                    active_sl_order = active_sl_orders[0]
                    # price moved into profit
                    if self.Securities[ticket.Symbol].Price < ticket.Get(OrderField.LimitPrice):
                        new_sl_price:float = round(self.Securities[ticket.Symbol].Price + (self.Securities[ticket.Symbol].Price * self.stop_loss_pct), 2)
                        if new_sl_price < active_sl_order.Get(OrderField.StopPrice):
                            active_sl_order.UpdateStopPrice(new_sl_price)
        # look for new double top and double bottom once a month
        rebalance_flag = False
        if self.recent_month != self.Time.month:
            rebalance_flag = True
            self.recent_month = self.Time.month
        # store daily prices
        for etf in self.country_etfs:
            if etf in data and data[etf]:
                self.price_data[etf].append( (self.Time, data[etf].Value) )
            
            if not rebalance_flag:
                continue
            if len(self.price_data[etf]) >= self.max_distance:
                if not self.Portfolio[etf].Invested:
                    etf_hist:DataFrame = pd.DataFrame(self.price_data[etf], columns=['Date', etf])
                    etf_hist.set_index('Date', inplace=True)
                    current_datetime:datetime = etf_hist.index[-1]
                    etf_hist = self.find_support_and_resistance(etf, etf_hist, self.rolling_period, self.min_distance)
                    etf_hist = self.find_double_tops_and_bottoms(etf, etf_hist, self.min_distance, self.max_distance, self.max_percentage)                    
                    if self.trade_long:
                        # find not yet crossed bottom
                        last_bottom_datetime:DataFrame = etf_hist[~etf_hist['Double_Bottom'].isnull()]
                        if not last_bottom_datetime.empty:
                            last_bottom_datetime:datetime = last_bottom_datetime.index[-1]
                            if all(x[1][etf] >= etf_hist.loc[last_bottom_datetime][etf] for x in etf_hist.loc[last_bottom_datetime:current_datetime].iterrows()):
                                limit_price:float = round(etf_hist.loc[last_bottom_datetime][etf], 2)
                                active_limit_orders:List = list(self.Transactions.GetOrderTickets(lambda order_ticket: order_ticket.Status not in [OrderStatus.Filled, OrderStatus.Invalid, OrderStatus.Canceled] and order_ticket.Symbol == etf and 'LIMIT_BUY' in order_ticket.Tag))
                                # submit orders
                                if len(active_limit_orders) != 0:
                                    active_order = active_limit_orders[0]
                                    # there is active order for different price
                                    if active_order.Get(OrderField.LimitPrice) != limit_price:
                                        new_quantity:int = self.Portfolio.TotalPortfolioValue // limit_price
                                        active_order.UpdateLimitPrice(limit_price)
                                        active_order.UpdateQuantity(new_quantity)
                                else:
                                    # submit new limit order
                                    if len([x for x in self.Portfolio if x.Value.Invested]) < self.max_positions_opened:
                                        quantity:int = self.Portfolio.TotalPortfolioValue // limit_price
                                        if abs(quantity) > 1:
                                            self.LimitOrder(etf, quantity, limit_price, tag='LIMIT_BUY')
                    if self.trade_short:
                        # find not yet crossed top
                        last_top_datetime = etf_hist[~etf_hist['Double_Top'].isnull()]
                        if not last_top_datetime.empty:
                            last_top_datetime = last_top_datetime.index[-1]
                            if all(x[1][etf] <= etf_hist.loc[last_top_datetime][etf] for x in etf_hist.loc[last_top_datetime:current_datetime].iterrows()):
                                limit_price:float = round(etf_hist.loc[last_top_datetime][etf], 2)
                                active_limit_orders:List = list(self.Transactions.GetOrderTickets(lambda order_ticket: order_ticket.Status not in [OrderStatus.Filled, OrderStatus.Invalid, OrderStatus.Canceled] and order_ticket.Symbol == etf and 'LIMIT_SELL' in order_ticket.Tag))
                                # submit orders
                                if len(active_limit_orders) != 0:
                                    active_order = active_limit_orders[0]
                                    # there is active order for different price
                                    if active_order.Get(OrderField.LimitPrice) != limit_price:
                                        new_quantity:int = self.Portfolio.TotalPortfolioValue // limit_price
                                        active_order.UpdateLimitPrice(limit_price)
                                        active_order.UpdateQuantity(-new_quantity)
                                else:
                                    # submit new limit order
                                    if len([x for x in self.Portfolio if x.Value.Invested]) < self.max_positions_opened:
                                        quantity:int = self.Portfolio.TotalPortfolioValue // limit_price
                                        if abs(quantity) > 1:
                                            self.LimitOrder(etf, -quantity, limit_price, tag='LIMIT_SELL')
    def cancel_all_limit_orders(self) -> None:
        # cancel all submitted limit orders for the symbol
        active_limit_orders:List = list(self.Transactions.GetOrderTickets(lambda ticket: ('LIMIT_SELL' in ticket.Tag or 'LIMIT_BUY' in ticket.Tag) and ticket.Status not in [OrderStatus.Filled, OrderStatus.Invalid, OrderStatus.Canceled]))
        for ticket in active_limit_orders:
            response = ticket.Cancel()
    def cancel_all_orders(self, symbol:Symbol) -> None:
        # cancel all submitted orders for the symbol
        active_limit_orders:List = list(self.Transactions.GetOrderTickets(lambda ticket: ticket.Symbol == symbol))
        for ticket in active_limit_orders:
            response = ticket.Cancel()
    def OnOrderEvent(self, orderEvent: OrderEvent) -> None:
        order = self.Transactions.GetOrderById(orderEvent.OrderId)
        if orderEvent.Status == OrderStatus.Filled:
            order_ticket = self.Transactions.GetOrderTicket(orderEvent.OrderId)
            
            # if TP or SL was hit
            if '_TP' in order_ticket.Tag or '_SL' in order_ticket.Tag:
                # cancel all submitted orders for the symbol
                self.cancel_all_orders(order_ticket.Symbol)
            # set TP and SL if limit was hit
            if orderEvent.Quantity > 0:
                if 'LIMIT_BUY' in order_ticket.Tag:
                    if len([x for x in self.Portfolio if x.Value.Invested]) == self.max_positions_opened:
                        self.cancel_all_limit_orders()
                    tp_price:float = round(orderEvent.FillPrice + (orderEvent.FillPrice * self.take_profit_pct), 2)
                    stop_price:float = round(orderEvent.FillPrice - (orderEvent.FillPrice * self.stop_loss_pct), 2)
                    self.LimitOrder(orderEvent.Symbol, -orderEvent.Quantity, tp_price, tag='BUY_TP')
                    self.StopMarketOrder(orderEvent.Symbol, -orderEvent.Quantity, stop_price, tag='BUY_SL')
            
            if orderEvent.Quantity < 0:
                if 'LIMIT_SELL' in order_ticket.Tag:
                    if len([x for x in self.Portfolio if x.Value.Invested]) == self.max_positions_opened:
                        self.cancel_all_limit_orders()
                    tp_price:float = round(orderEvent.FillPrice - (orderEvent.FillPrice * self.take_profit_pct), 2)
                    stop_price:float = round(orderEvent.FillPrice + (orderEvent.FillPrice * self.stop_loss_pct), 2)
                    
                    self.LimitOrder(orderEvent.Symbol, -orderEvent.Quantity, tp_price, tag='SELL_TP')
                    self.StopMarketOrder(orderEvent.Symbol, -orderEvent.Quantity, stop_price, tag='SELL_SL')
    def find_support_and_resistance(self, symbol:str, stock_data:DataFrame, rolling_period:int, lookback_period:int) -> DataFrame:
        for level_str, obj_function in [('Support', 'min'), ('Resistance', 'max')]:
            level_prices = stock_data[symbol]
            # TODO eval min and max
            if obj_function == 'min':
                pivot_level = level_prices.rolling(window=rolling_period*2+1, center=True).min().shift(-rolling_period)
            else:
                pivot_level = level_prices.rolling(window=rolling_period*2+1, center=True).max().shift(-rolling_period)
            stock_data[level_str] = None
            level_points = pivot_level.where(pivot_level == level_prices, None).dropna().index
            valid_indices = []
            for i, level_point in enumerate(level_points):
                if i == 0:
                    valid_indices.append(level_point)
                else:
                    timedelta = level_point - level_points[i - 1]
                    if timedelta.days > lookback_period:
                        valid_indices.append(level_point)
            stock_data.loc[valid_indices, level_str] = pivot_level.loc[valid_indices]
        return stock_data
    def find_double_tops_and_bottoms(self, symbol:str, stock_data:DataFrame, min_distance:int, max_distance:int, max_percentage:float) -> DataFrame:
        stock_data['Double_Top'] = None
        stock_data['Double_Bottom'] = None
        support_points = stock_data.dropna(subset=['Support']).index
        resistance_points = stock_data.dropna(subset=['Resistance']).index
        for i in range(len(resistance_points)):
            for j in range(i + 1, len(resistance_points)):
                timedelta = resistance_points[j] - resistance_points[i]
                if timedelta.days < min_distance or timedelta.days > max_distance:
                    continue
                price_pct_difference = abs(stock_data.loc[resistance_points[j], 'Resistance'] / stock_data.loc[resistance_points[i], 'Resistance'] - 1)
                if price_pct_difference <= max_percentage:
                    lower_top_index = resistance_points[j] if stock_data.loc[resistance_points[j], 'Resistance'] < stock_data.loc[resistance_points[i], 'Resistance'] else resistance_points[i]
                    if all(stock_data.loc[resistance_points[x], symbol] <= stock_data.loc[lower_top_index, 'Resistance'] for x in range(i+1,j)):
                        stock_data.loc[lower_top_index, 'Double_Top'] = stock_data.loc[lower_top_index, 'Resistance']
        for i in range(len(support_points)):
            for j in range(i + 1, len(support_points)):
                timedelta = support_points[j] - support_points[i]
                if timedelta.days < min_distance or timedelta.days > max_distance:
                    continue
                price_pct_difference = abs(stock_data.loc[support_points[j], 'Support'] / stock_data.loc[support_points[i], 'Support'] - 1)
                if price_pct_difference <= max_percentage:
                    higher_bottom_index = support_points[j] if stock_data.loc[support_points[j], 'Support'] > stock_data.loc[support_points[i], 'Support'] else support_points[i]
                    if all(stock_data.loc[support_points[x], symbol] >= stock_data.loc[higher_bottom_index, 'Support'] for x in range(i+1,j)):
                        stock_data.loc[higher_bottom_index, 'Double_Bottom'] = stock_data.loc[higher_bottom_index, 'Support']
        return stock_data

Leave a Reply

Discover more from Quant Buffet

Subscribe now to keep reading and get access to the full archive.

Continue reading