分享

量化技术篇:吊顶线反包策略回测结果及代码

 立志德美 2023-09-30 发布于上海

  经常做股票的同学应该对各种技术选股的方法都听说过比如均线,MACD金叉,反包等等,各种老师都会拿着对应方法的经典图形来讲解,但自己按照这些指标来选股操作时往往不尽如人意,这是因为单单从技术上来说,他们列举往往是一些技术最好或者较好的案例,殊不知这可能陷入了幸存者偏差的统计陷阱,从数据概率论角度上来说对应的技术本身能有多少的盈亏比和胜率,可能他们自己也都不清楚,更不好要说还要结合考虑市场因素,个人情绪因素等等。

    今天我也不能免俗,给大家先从最好的案例来讲解一个经典多的吊顶线反包策略的最好案例,但后面会给出这个策略,最近两年的回测代码,和结果,供大家学习使用,切勿迷信各种技术图形,首先需要建立在大数据分析下的才更有依据。

'''吊顶线反包策略,最经典的案例是铭普光磁,23-5-30的交易:
1. 前一天涨停,第二天最高价涨停,收盘价未涨停加入股票池
2. 第三天任意时间价格高于第二天最高价,买入
3. 跌破第二天开盘价止损,持有3天或者20%25%)盈利退出
'''

图片

   上面是这个交易最近两年最好的一个执行结果,单次交易盈利60%,不过如果把所有交易数据都贴出来,就知道这个纯技术方法的好坏和风险,

图片可以看出对最近2年的所有股票应用该策略选股,止盈止损的胜率并不高,连40%都不到,当然盈亏比足够大达到了2,所以最终还是会获利的,因此这个吊顶线反包的策略会有一些机会发现并跟上大牛股,但是如果对于满足条件的所有标的进行操作的话也会陷入不少亏损,止损的谜团。

    下面是回测代码,供有兴趣的朋友自己去调整各种参数进行调优:

import csvimport backtrader as btfrom SelfTrade import SelfTradeimport datetimefrom datetime import datetime
'''吊顶线反包1. 前一天涨停,第二天最高价涨停,收盘价未涨停加入股票池2. 第三天任意时间价格高于第二天最高价,买入3. 跌破第二天开盘价止损,持有3天或者20%(25%)盈利退出'''class InverseDiaoDingXian(bt.Strategy): def log(self, Type, Code, PriceType, Price, Date): ''' 策略的日志函数''' with open(self.logFile, 'a', newline='') as file: writer = csv.writer(file) writer.writerow([Type, Code, PriceType, Price, Date]) print('%s, %s, %s, %s, %s' % (Type, Code, PriceType, Price, Date))
params = dict( observeDays=1, # 反包观察天数 hostDays=3, # 持有最大天数 UPLimit = 0.095, # 涨停标准 DOWNLimit= -0.095, # 跌停标准 stopProfit=0.25, # 止盈标准 )
def __init__(self): self.start_dates = {} self.start_dates_flag = {} self.diff_start_dates_days = {} self.selfTrader = SelfTrade(commission_rate=0.0007, cash=10000000) self.candidates1 = {} # 进入第一波候选观察股池 self.positionsMy = {} # 持仓股票及其买入价格 self.days_since_in_host= {} # 记录每只股票持有后 self.days_since_in_pool= {} self.inds = dict() self.first_day_traded = {}
# 构建文件名 filename = f'吊顶线{self.p.observeDays}天持有{self.p.hostDays}天止盈{self.p.stopProfit}前开盘价止损卖出策略_tdx2021_全部回测当日交易.csv' self.logFile = '../out/'+ filename
with open(self.logFile, 'w', newline='') as file: writer = csv.writer(file) writer.writerow(['Type', 'Code', 'PriceType', 'Price', 'Date'])
self.profit_trades = 0 # 交易盈利次数 self.stop_loss_trades = 0 # 割肉次数 self.profit_pct = 0.0 # 盈利时的平均股价涨幅 self.stop_loss_pct = 0.0 # 割肉时的平均股价跌幅 self.profit_trades_open = 0 # 交易盈利次数 self.stop_loss_trades_open = 0 # 割肉次数 self.profit_pct_open = 0.0 # 盈利时的平均股价涨幅 self.stop_loss_pct_open = 0.0 # 割肉时的平均股价跌幅 self.count = 0.0 # 统计资金不足次数
for i, d in enumerate(self.datas): self.inds[d] = dict() self.inds[d]['sma5'] = bt.ind.SMA(d.volume, period=5) # 短期5均量线 self.inds[d]['sma10'] = bt.ind.SMA(d.volume, period=10) # 10日成交量均线

def prenext(self): self.next()
def notify_trade(self, trade): if trade.isclosed: if trade.pnlcomm > 0: self.profit_trades_open += 1 self.profit_pct_open += trade.pnlcomm else: self.stop_loss_trades_open += 1 self.stop_loss_pct_open += trade.pnlcomm
if self.selfTrader.isclose(symbol= trade.data._name): if self.selfTrader.positions[trade.data._name]['pnl'] > 0: self.profit_trades += 1 self.profit_pct += self.selfTrader.positions[trade.data._name]['pnl'] self.log('盈利', trade.data._name, '收益', self.selfTrader.positions[trade.data._name]['pnl'], trade.data.datetime.date(-1)) else: if self.selfTrader.positions[trade.data._name]['pnl'] > -2500: self.stop_loss_trades += 1 self.stop_loss_pct += self.selfTrader.positions[trade.data._name]['pnl'] self.log('割肉', trade.data._name, '损失', self.selfTrader.positions[trade.data._name]['pnl'], trade.data.datetime.date(-1)) else: self.log('可能有除权', trade.data._name, '问题数据', self.selfTrader.positions[trade.data._name]['pnl'], trade.data.datetime.date(-1))

def stop(self): if self.profit_trades > 0: self.profit_pct /= self.profit_trades if self.stop_loss_trades > 0: self.stop_loss_pct /= self.stop_loss_trades
if self.profit_trades_open > 0: self.profit_pct_open /= self.profit_trades_open if self.stop_loss_trades_open > 0: self.stop_loss_pct_open /= self.stop_loss_trades_open # 打印交易统计信息 print('交易盈利次数:', self.profit_trades) print('割肉次数:', self.stop_loss_trades) print('资金不足次数:', self.count) print('盈利时的平均盈利金额: %.2f%%' %self.profit_pct) print('割肉时的平均亏损金额: %.2f%%' %self.stop_loss_pct) self.log(self.profit_trades, self.stop_loss_trades, self.profit_pct, self.stop_loss_pct,'CloseTrade') print('Final Portfolio Value by Close trade: %.2f' % self.selfTrader.get_total_assets())
current_date = datetime.now().strftime('%Y-%m-%d') with open(f'../in/吊顶线{current_date}.txt', mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(['代码', 'TargetPrice']) for stock in self.candidates1: if stock not in self.positionsMy: writer.writerow([stock._name[:-4], self.candidates1[stock][0]])
def upLimit(self, currentPrice, lastClose, upLimit): if currentPrice - lastClose > upLimit * lastClose: return True else: return False
def downLimit(self, currentPrice, lastClose, downLimit): if currentPrice - lastClose < downLimit * lastClose: return True else: return False
def next(self): for data in self.datas: if data not in self.first_day_traded: self.first_day_traded[data] = True self.diff_start_dates_days[data] = 0 continue # 跳过第一天交易判断
if data not in self.start_dates: self.start_dates[data] = data.datetime.date(0) self.start_dates_flag[data] = False continue if (data.datetime.date(0) - self.start_dates[data]).days != 0 and not self.start_dates_flag[data]: self.start_dates_flag[data] = True self.start_dates[data] = data.datetime.date(0) continue # 当股票当前日大于起始日60天以上才进行计算,保证新股上市90天后才计算, if self.start_dates_flag[data] and (data.datetime.date(0) - self.start_dates[data]).days > 60 \ and (data.datetime.date(0) - self.start_dates[data]).days > self.diff_start_dates_days[data]: self.diff_start_dates_days[data] = (data.datetime.date(0) - self.start_dates[data]).days #设置前一日价格 dividend_flag = False prev_close = data.close[-1] prev_prev_close = data.close[-2] prev_open = data.open[-1] if data.open[0] - prev_close < -0.11 * prev_close and data._name.startswith(('sh60', 'sz00')) or \ data.open[0] - prev_close < -0.21 * prev_close and data._name.startswith(('sh68', 'sz30')): dividend_adjustment = prev_close / data.open[0] prev_close /= dividend_adjustment prev_prev_close /= dividend_adjustment prev_open /= dividend_adjustment dividend_flag = True
# 更新交易日数 if data in self.days_since_in_pool: self.days_since_in_pool[data] += 1
# 判断是否需要卖出持仓股票 if data in self.positionsMy: self.days_since_in_host[data] += 1 #除权发生调整成本 if dividend_flag: self.selfTrader.positions[data._name]['price'] /= dividend_adjustment self.selfTrader.positions[data._name]['quantity'] *= dividend_adjustment self.log('除权发生', data._name, '调整成本价', self.selfTrader.positions[data._name]['price'], data.datetime.date(0)) if self.days_since_in_host[data] >= self.params.hostDays or (data.low[0] - self.candidates1[data][1]) < 0 or \ self.upLimit(data.high[0], self.selfTrader.positions[data._name]['price'], self.p.stopProfit): # 卖出持仓股票 if data.high[0] == data.close[0] and self.upLimit(data.close[0], prev_close, 0.045): self.log('涨停不卖', data._name, '涨停价格', data.close[0], data.datetime.date(0)) continue elif data.low[0] == data.high[0] and self.downLimit(data.close[0], prev_close, -0.045): self.log('无量跌停不卖', data._name, '跌停价格', data.close[0], data.datetime.date(0)) continue elif self.days_since_in_host[data] >= self.params.hostDays: sell_price = data.close[0] self.log('卖出股票超期', data._name, '卖出价格', sell_price, data.datetime.date(0)) elif (data.low[0] - self.candidates1[data][1]) < 0: sell_price = self.candidates1[data][1] self.log('卖出股票低于前开盘', data._name, '卖出价格', sell_price, data.datetime.date(0)) else: sell_price = (1+self.p.stopProfit) * self.selfTrader.positions[data._name]['price'] self.log('卖出股票止盈', data._name, '卖出价格', sell_price, data.datetime.date(0)) self.close(data=data, price = sell_price) self.selfTrader.close(symbol=data._name, price = sell_price) self.positionsMy.pop(data, None) self.candidates1.pop(data, None) self.days_since_in_pool.pop(data, None)                        self.log('卖出股票', data._name, '卖出价格', sell_price, data.datetime.date(0)) continue
# 判断是否在候选观察股池中,买入该股票根据规则 if data in self.candidates1: # 反包 if data.high[0] >= self.candidates1[data][0] and data.low[0] < data.high[0]: # 买入该股票 cash = self.broker.getcash() if cash < 10000: self.log('资金不足', data._name, '持有股数量', len(self.positionsMy), data.datetime.date(0)) self.count = self.count + 1 continue elif data in self.positionsMy: continue
try: buyPrice = max(data.open[0],self.candidates1[data][0] + 0.01) sizeOpen = 10000 / buyPrice self.buy(data=data, size=sizeOpen) self.positionsMy[data] = buyPrice self.log('买入股票', data._name, '买入价格', buyPrice, data.datetime.date(0)) self.days_since_in_host[data] = 1 sizeClose = 10000 / buyPrice self.selfTrader.buy(symbol=data._name, price=buyPrice, quantity=sizeClose) except IndexError: print('data.open[1] 不存在所以跳过') continue continue
# 判断是否需要将股票加入候选池 if self.upLimit(prev_close, prev_prev_close, self.p.UPLimit) and self.upLimit(data.high[0], prev_close, self.p.UPLimit) \ and not self.upLimit(data.close[0], prev_close, self.p.UPLimit): self.candidates1[data] = (data.high[0], data.open[0]) self.days_since_in_pool[data] = 1 # self.log('添加股票池1', data._name, '添加价格', data.high[0] ,data.datetime.date(0)) continue
# 判断是否需要移除候选观察股池中的股票,在股票池超过observeDays个交易日 if data in self.candidates1: if self.days_since_in_pool[data] > self.p.observeDays and data not in self.positionsMy: self.candidates1.pop(data, None) self.days_since_in_pool.pop(data, None) # self.log('移除股票', data._name, '移除价格', data.close[0], data.datetime.date(0))
now = datetime.datetime.now()one_day = datetime.timedelta(days=1)next_day = now + one_dayclass MyData(bt.feeds.GenericCSVData): params = ( ('dtformat', '%Y-%m-%d'), ('datetime', 0), ('open', 1), ('high', 2), ('low', 3), ('close', 4), ('volume', 6), ('openinterest', -1),        ('fromdate', datetime.datetime(2022121)), ('todate', next_day), )
if __name__ == '__main__': time_start = time.time() cerebro = bt.Cerebro(stdstats=False, cheat_on_open=False)
    # 添加数据源    dataFolder = r'/Users/Downloads/tdx/baostock_qfq' allStockList = r'../in/all_stocks.csv' df_all = pd.read_csv(allStockList, header=0, na_values='NA')
stock_lists = os.listdir(dataFolder)
for stock in stock_lists:        name = stock.replace('.csv''') new_df = df_all[df_all['代码'] == name] if len(new_df) > 0: first_row = new_df.iloc[0] if 'S' not in first_row['名称'] and '*' not in first_row['名称'] and '退' not in first_row['名称'] and 'X' not in first_row['名称']\                    and 'D' not in first_row['名称'and 'C' not in first_row['名称'and 'U' not in first_row['名称'] : filepath = os.path.join(dataFolder, f'{stock}') size = os.path.getsize(filepath) if size > 10 * 200: if stock.startswith(('sh60', 'sh68', 'sz00', 'sz30')): # if stock.startswith(('sh68', 'sz30')): # if stock.startswith(('sh60', 'sz00')): df = pd.read_csv(filepath) df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d') last_row = df.tail(1) if last_row['date'].values[0] > np.datetime64('2023-05-18'): cerebro.adddata(MyData(dataname=filepath), name = stock)
# 添加交易记录 cerebro.broker.setcash(10000000.0) cerebro.broker.setcommission(commission=0.0007)
    print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue()) cerebro.addstrategy(InverseDiaoDingXian)
cerebro.run()    print('Final Portfolio Value by Open trade: %.2f' % cerebro.broker.getvalue()) time_end = time.time()
print('程序所耗时间:', time_end - time_start)

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多