分享

Backtrader胜率92%策略实盘七:自动化交易

 立志德美 2023-05-23 发布于上海

今天的都是干货。

我的每一篇文章丝丝相扣,不可孤篇而论。

1,修改个装饰器:

backtrader把股票名设为了data._name,本文不建议通过这种方式进行抽取。最好是在linseries.py里加入装饰器@property

图片

# in linseries.py
# @property
# def name(self):
# return self._name

这样我们就可以用self.name了

2,交易软件设置一下:先开通模拟交易,(每个券商的交易项是有微调的,我以模拟交易为例演示,各自需要微调。)

图片

这些个内容哥哥券商不一致。

买入卖出界面设置成空的,我们自己填入,不需要自动填入。

图片

3,自动化流程简述:

从历史数据中拿到自己想要的数据9点25开盘数据生成,开始选股
将持仓和选股内容合并,开始是监控
触发买入条件买入

4,引入依赖设置

import pandas as pd# import tushare as tsimport numpy as npimport requestsimport datetimeimport backtrader as btimport timeimport easyquotationimport csv# 进度条from tqdm import tqdmimport timefrom functools import wrapsimport osimport comdataimport richimport jqktraderfrom feed import SimpleLivingData
# 设置输出对齐pd.set_option('display.max_columns', 500)pd.set_option('display.width', 500)pd.set_option('display.max_colwidth', 500)pd.set_option('display.unicode.ambiguous_as_wide', True)pd.set_option('display.unicode.east_asian_width', True)

5,懂得都懂:

作为全段看见.dot就头疼,本想写个句柄教程的,以后再说吧,这个是我git上搜的,能用就行,理论简单,操作硬性,没什么难的。

最好的方案还是改easytrader,放心。

# 这是个交易端user = jqktrader.use()user.connect( exe_path=r'D:\同花顺软件\同花顺\xiadan.exe', tesseract_cmd=r'C:\Program Files\Tesseract-OCR\tesseract.exe')user.enable_type_keys_for_editor()# user.position

6,加入时间测试,毕竟自动化时间就是命根:

def timefn(fn):    '''计算性能的修饰器'''    @wraps(fn)    def measure_time(*args, **kwargs):        t1 = time.time()        result = fn(*args, **kwargs)        t2 = time.time()        print(f'@timefn: {fn.__name__} took {t2 - t1: .5f} s')        return result    return measure_time

7,每一次触发,写入csv文件中,作为记录:

# 记载每一次的交易def trader_save(sa): row=sa nowdate=datetime.datetime.now().strftime('%Y%m%d') # 文件统一命名TK+日期 dirname='TK'+str(nowdate)+'.csv' with open(dirname,'a',newline='' ,encoding='utf8') as cf: wf=csv.writer(cf) wf.writerow(row)

这个是今天下午的交易记录,时间上基本可用:

图片

8,读取历史数据,和用历史数据生产的交易参数,此数据属于过去式,可以作为全局变量,加载到交易系统中。

读取的文件,就是盘前选股的文章中生成的文件。

# 读取今日股票仓库df0=pd.read_csv('./code_wash.csv',  )df0.set_index('code', inplace=True,drop=False)date_list_old=comdata.get_nowdata_list()date_list_old=date_list_old[['今开'  , '昨收'  , '价格'  , '最高'  , '最低' ,   '成交量'   ,     '日期'    ,  '时间'   ,   '代码']]date_list_old['code']=date_list_old['代码']date_list_old.set_index('code', inplace=True,drop=True)out = pd.concat([date_list_old,df0], axis=1).sort_index()out  =out.dropna(axis=0, how='any')out['买入阈值TK']=out['今开']+out['atrTK']out['买入下阈值TK']=out['今开']-out['atrTK']print(out)

写一个函数,按需从历史数据中调取:

# 从wash文件中获取数据# 0.005m速度# @timefndef get_washdata(code,wap): global out
try: keydata=out.loc[code,wap] except: keydata=None print(code,wap,'不存在') # print(smban) return keydata

写一个函数,按需读取实时数据,

# 返回今日实时数据# @0.009m# @timefndef get_nowvalue(code,wap):    realtick=comdata.get_nowdata_all(code,wap)    return realtick

9,这里写个案例,可以利用8中的数据做一个冲高后回落的卖出条件:

# 冲高后,回落3%函数@timefndef get_grolue(code,lvwap): # print(code) try: tnowh=get_nowvalue(code,'最高') yclose=get_nowvalue(code,'昨收') ylow=get_washdata(code,'最低') except: return print(code,'数据有误') if tnowh>yclose: tnowhight=tnowh else: tnowhight=yclose # 择日修改,这里有负数没有处理 growlue=abs((tnowhight-ylow)/20)*lvwap if (tnowh-yclose)*100/yclose>12: growlue=abs((tnowhight-ylow)/20)*2
return float('%.3f' % growlue)

10,获取股票全局代码名称的函数,待修改,不完美

# 获得股票基本数据(证券代码)# @timefndef get_tickes(wap):    stocks = pd.read_csv('ticketslib.csv')    stocks = stocks[~stocks['名称'].str.contains('ST|\*|退')]    if wap == 'sz':        cyb = stocks[stocks['代码'].str.contains('^sz300|^sz301')]    if wap == 'sh':        cyb = stocks[stocks['代码'].str.contains('^sh60|^sz000')]    return cyb

11,tk策略代码,将就看吧,我出一期专门的介绍。

class TK(bt.Strategy): params = dict( exitbars = 5, min_period = 21, ) #日志格式 def log(self, txt, dt=None,doprint=True): if doprint: dt = self.datas[0].datetime.datetime(0) print('%s, %s' % (dt.isoformat(), txt))
def __init__(self): global df0
self.inds=dict() self.volnow5=dict() self.priceright=dict() self.buy_cgvol_change=dict() self.buy_bgvol_change=dict() self.buy_apvol_change=dict() self.buy_dpvol_change=dict() self.selehigh=dict() self.selelow=dict() self.selevolume=dict() self.selemidban=dict() self.selemaxhigh=dict() self.seleminlow=dict() self.seleovlban=dict() self.selesmvo5=dict() self.opt_cyq_70=dict() self.buy_oh_signal=dict() self.buy_numvol=dict() self.buy_svol_change=dict() self.buy_kucun=dict() self.buy_bigvol=dict() self.buy_sobigvol=dict() self.buy_minvol=dict() self.buy_om_signal=dict() self.buy_mh_signal=dict() self.buy_cyq10_signal=dict() self.buy_cyq30_signal=dict() self.buy_cl_signal=dict() self.today_na=dict() self.today_mo_v=dict() self.buy_setchange=dict() self.timecheng=dict() self.buy_sellvol=dict() self.time_use=dict() self.time_set=dict() self.time_sum=dict() self.time_cut=dict() self.last_dt = datetime.datetime.now() self.sele_flv=dict() self.buy_ov_signal=dict() # 有可用量,才可卖出 self.sell_tf=dict() self.sell_mil=dict() self.sell_grolue=dict() self.sell_selfgrolue=dict() #加入成交量排名指数 vol_jzll=None vol_jz=[] self.vol_num=[] self.trader_list=dict() self.trader_list_oldd=dict() vol_jzp=[] vol_jzpee=[] vol_jzcross=[] vol_zqmc=[] vol_kez=[] self.trader_name_old=[] vol_jzpower=0 vol_cross=False self.today_open=dict() self.today_mot=dict() self.today_mo=dict() self.today_mo_t=dict() self.today_mb=dict() self.seleopen=dict() self.seleclose=dict() self.seleuse=dict() self.seleuptk=dict() self.tod_low=dict() self.buy_ol_signal=dict() self.buy_ol_t_signal=dict() self.trader_vol=dict()
nowdate=datetime.datetime.now().strftime('%Y%m%d') # 文件统一命名TD+日期 dirname='TK'+str(nowdate)+'.csv' try: f =open(dirname,'r') df2=pd.read_csv(dirname,header = 0,encoding='utf8') # print(df.head(2)) # 读取最后取值 grouped = df2.groupby('证券代码') f =open(dirname,'a') for name,group in grouped: print(name) self.trader_name_old.append(name) # print(group) trader_list_oldd = grouped.apply(lambda group:group.tail(1) ) self.trader_list_oldd=pd.DataFrame(trader_list_oldd,columns=['证券代码','证券名称','成熟度','时间','上穿高线','数量','可用','冻量','成交价','涨跌','振幅','成交量','调阈','交易信号']) self.trader_list_oldd.set_index('证券代码',drop=False,inplace=True) print(self.trader_list_oldd,'已经读取') print(dirname,'存在,已经读取') except IOError: df2=pd.DataFrame(None,index=None,columns=['证券代码','证券名称','成熟度','时间','上穿高线','数量','可用','冻量','成交价','涨跌','振幅','成交量','调阈','交易信号']) with open(dirname, 'a',newline='',encoding='utf8') as f: wf=csv.writer(f) wf.writerow(df2) f.close() print(dirname,'不存在;22已经创建') for i,d in enumerate(self.datas): opentod=get_nowvalue(d.name,'名称') vol_jz.append(d.name) vol_zqmc.append(opentod) vol_jzp.append(vol_jzll) vol_jzcross.append(vol_cross) vol_jzpee.append(vol_jzpower) self.trader_list['证券代码']=vol_jz self.trader_list['app']=vol_jz self.trader_list['证券名称']=vol_zqmc # self.trader_list['成熟度']=vol_jzpee self.trader_list['时间']=vol_jzp self.trader_list['数量']=vol_jzpee self.trader_list['可用']=vol_jzpee self.trader_list['冻量']=vol_jzpee self.trader_list['成交价']=vol_jzp self.trader_list['涨跌']=vol_jzp self.trader_list['振幅']=vol_jzp self.trader_list['成交量']=vol_jzp self.trader_list['交易信号']=vol_jzp # try: self.selehigh[d]=get_washdata(d.name,'最高')
self.selelow[d]=get_washdata(d.name,'最低') self.selevolume[d]=get_washdata(d.name,'成交量') self.today_open[d]=get_nowvalue(d.name,'今开') self.today_na[d]=get_nowvalue(d.name,'名称')[-4:] self.selehigh[d]=get_washdata(d.name,'最高') self.seleopen[d]=get_washdata(d.name,'open') self.seleclose[d]=get_washdata(d.name,'close') self.seleuptk[d]=get_washdata(d.name,'买入阈值TK') print(self.seleuptk[d],'---------成交量') # 上穿最高线 self.buy_oh_signal[d]=bt.ind.CrossUp(d.close,self.seleuptk[d])
self.trader_list=pd.DataFrame(self.trader_list) self.trader_list = self.trader_list.copy() self.trader_list.set_index('证券代码', inplace=True) print(self.trader_list,'读取数据后的') # 读取持仓 try: for code in self.trader_name_old: self.trader_list.loc[code,'app'] =self.trader_list_oldd.loc[code,'证券代码'] self.trader_list.loc[code,'证券名称'] =self.trader_list_oldd.loc[code,'证券名称'] self.trader_list.loc[code,'时间'] =self.trader_list_oldd.loc[code,'时间'] print(self.trader_list,'第一次赋值') except: pass try: position = user.position pos=pd.DataFrame(position) pos=pos.copy() print(pos) kcstock=[] for code in pos['证券代码']: kcstock.append(code) pos.set_index('证券代码', inplace=True) # 设置已有股票 for code in kcstock: codecc= 'sz'+code # 不同的券商需要写特定的列表 try: self.trader_list.loc[codecc,'数量'] =pos.loc[code,'股份余额'] self.trader_list.loc[codecc,'可用'] =pos.loc[code,'可用股份'] self.trader_list.loc[codecc,'冻量'] =pos.loc[code,'冻结数量'] self.trader_list.loc[codecc,'证券名称'] =pos.loc[code,'证券名称'] except: # 应该是中信的 self.trader_list.loc[codecc,'数量'] =pos.loc[code,'股票余额'] self.trader_list.loc[codecc,'可用'] =pos.loc[code,'可用余额'] self.trader_list.loc[codecc,'冻量'] =pos.loc[code,'冻结数量'] self.trader_list.loc[codecc,'证券名称'] =pos.loc[code,'证券名称'] except: pass
self.order = None self.buyprice = None self.buycomm = None self.size=0 self.count=0 print(len(self.trader_list)) print('开始交易!!!') def notify_order(self, order): #订单状态处理 if order.status in [order.Submitted, order.Accepted]: return # 买入卖出订单处理 if order.status in [order.Completed]: if order.isbuy(): self.log('买入单价: %.2f, 买入总金额: %.2f, 手续费 %.2f,数量%.2f'% (order.executed.price, order.executed.value *-1, order.executed.comm, order.executed.size))
elif order.issell(): self.log('卖出单价: %.2f, 总资产: %.2f, 手续费 %.2f,数量:%.2f' % (order.executed.price, self.broker.get_cash(), order.executed.comm, order.executed.size)) self.bar_executed = len(self) elif order.status in [order.Canceled, order.Margin, order.Rejected]: self.log('订单取消/保证金不足/拒绝') # Write down: no pending order self.order = None def notify_trade(self, trade): if not trade.isclosed: return
self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' % (trade.pnl, trade.pnlcomm)) # @timefn def next(self): for i,d in enumerate(self.datas): # 股票处于上涨状态 self.priceright[d]=(d.open[0]-d.open[-1])>0 # 卖出 self.sell_tf[d]=self.trader_list.loc[d.name,'可用'] > 0 self.time_set[d]=self.last_dt # FMT = '%H:%M:%S.%f' self.time_use[d]=datetime.datetime.now() self.time_sum[d]=(self.time_use[d]-self.time_set[d]).seconds self.last_dt=self.time_use[d] cname=d.name self.time_cut[d]=self.time_sum[d]>12 if self.time_cut[d]==1: print('无效数据') if self.time_sum[d]>6: print('[{cname:<{len}}\t读取成交量间隔:{dt}:\t交易时间:{dnow}'.format(cname=d.name+']',len=8-len(cname.encode('GBK'))+len(cname),dt=self.time_sum[d],dnow= self.time_use[d]))# 时间分段 self.trader_sell_time=datetime.datetime.now().time()>datetime.time(9, 29) and datetime.datetime.now().time()<datetime.time(14,55) self.trader_buy_time=datetime.datetime.now().time()>datetime.time(9, 45) and datetime.datetime.now().time()<datetime.time(14, 45) self.trader_time=datetime.datetime.now().time()>datetime.time(9, 30) and datetime.datetime.now().time()<datetime.time(14,55) self.trader_no_time=datetime.datetime.now().time()>datetime.time(9, 24) and datetime.datetime.now().time()<datetime.time(9,47) if self.priceright[d] ==1 and self.trader_time==1: self.trader_list.loc[d.name,'涨跌']='上涨' if self.priceright[d] !=1 and self.trader_time==1: self.trader_list.loc[d.name,'涨跌']='下跌' self.trader_list.loc[d.name,'时间']= datetime.datetime.now().time()
self.buy_kucun[d]=self.trader_list.loc[d.name,'数量']==0 self.trader_list.loc[d.name,'成交价']=d.close[0] # 格式化成交量100的倍数 def downcast(amount, lot): return abs(amount//lot*lot) try: # 买入金额分组以61.8万为单仓金额 mon27=20000 if self.buy_oh_signal[d]==1: balance = (user.balance['可用金额']) if balance >0: pass else: balance=0 print(d.name,'不满足预留金额') # 给自己的交易模式起个名字 self.trader_list.loc[d.name,'交易信号'] ='穿云箭' trader_save(self.trader_list.loc[d.name]) # 满足一个仓位交易 if balance >mon27: stake= downcast(mon27/self.datas[i].close, 100) if stake!=0: buy_no=user.buy(d._name[2:8], price=self.today_mb[d], amount = stake) self.trader_list.loc[d.name,'数量'] = stake+self.trader_list.loc[d.name,'数量'] print(buy_no) self.order = self.buy(data = d,size=stake) self.log(f'买入{d.name}, 买入价格:{d.close[0]:.2f}, 买入数量: {stake}') self.trader_list.loc[d.name,'交易信号'] ='穿云箭' trader_save(self.trader_list.loc[d.name]) else: pass
except Exception as e: print('错误类型是',e.__class__.__name__) print('错误明细是',e) def stop(self): pass

注意这里:

图片

是买入条件,单纯的演示用

利用买入条件,买入股票:

图片

12,bt的主程序

@timefndef runstart():    # cpus=6,防止死机    cerebro = bt.Cerebro(oldtrades=False, stdstats=False,maxcpus=6)

for code in stockpool: codename=code feed = SimpleLivingData(codename, stockpool,timeframe=bt.TimeFrame.Ticks, compression=1) cerebro.adddata(feed,name=codename) cerebro.addstrategy(TK)

#回测-资金规则(总资产和手续费) balance = (user.balance)['可用金额'] print('可用金额',balance)    # 这没用,broker间接等于自定义 cerebro.broker.setcash(1000000) cerebro.broker.setcommission(0.01) print('开始自动化交易 Running')
result = cerebro.run(runonce=False)
print('结束运行 Finish')

13,从交易软件读取股票信息:

# 读取证券中的数据@timefndef get_ck(): position = user.position pos=pd.DataFrame(position) pos=pos.copy() pos.set_index('证券代码') kcstock=[] for code in pos['证券代码']: codecc= 'sz'+code kcstock.append(codecc) print('\033[0;31m%s\033[0m' % kcstock) # print(kcstock) return kcstock

14,查看可用资金

def get_position():    position = user.position    print(position)

15,选股程序,以及主程序

if __name__ == '__main__': global out out['代码']=out['code'] print (out)# 读取股票池 selec_dict=out[(out['今开'] >=out['stand_price'])&(out['买入阈值TK'] <out['zt'])][['代码']] print('选中股票:' ,len(selec_dict),'支') selec_csv=selec_dict stockpool=selec_csv['代码'] print(stockpool,) # 尝试读取持仓股票代码 try: position = user.position pos=pd.DataFrame(position) pos=pos.copy() pos.set_index('证券代码') kcstocks=[] print(pos) for code in pos['证券代码']: if '60' in code: codecc= 'sh'+code else: codecc= 'sz'+code print(codecc) if codecc in selec_csv['code'].values.tolist(): pass else: kcstocks.append(codecc) stockpool=np.append(stockpool,kcstocks) except: pass # 查看一下代码,股票池+持仓 print(stockpool) runstart()

bingo,跑起来了

图片

预计周末我有时间,把演示程序整理出来。

看一下今天的结果:

多少不尽人意,还是要用周易

图片

2023年5月24日观瞻

图片

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约