今天的都是干货。 我的每一篇文章丝丝相扣,不可孤篇而论。 1,修改个装饰器: backtrader把股票名设为了data._name,本文不建议通过这种方式进行抽取。最好是在linseries.py里加入装饰器@property # in linseries.py 这样我们就可以用self.name了 2,交易软件设置一下:先开通模拟交易,(每个券商的交易项是有微调的,我以模拟交易为例演示,各自需要微调。) 这些个内容哥哥券商不一致。 买入卖出界面设置成空的,我们自己填入,不需要自动填入。 3,自动化流程简述:
4,引入依赖设置 import pandas as pd # import tushare as ts import numpy as np import requests import datetime import backtrader as bt import time import easyquotation import csv # 进度条 from tqdm import tqdm import time from functools import wraps import os import comdata import rich import jqktrader from feed import SimpleLivingData
5,懂得都懂: 作为全段看见.dot就头疼,本想写个句柄教程的,以后再说吧,这个是我git上搜的,能用就行,理论简单,操作硬性,没什么难的。 最好的方案还是改easytrader,放心。 # 这是个交易端 user = jqktrader.use() user.connect( exe_path=r'D:\同花顺软件\同花顺\xiadan.exe', tesseract_cmd=r'C:\Program Files\Tesseract-OCR\tesseract.exe' ) user.enable_type_keys_for_editor() # user.position 6,加入时间测试,毕竟自动化时间就是命根:
7,每一次触发,写入csv文件中,作为记录: # 记载每一次的交易 def trader_save(sa): row=sa nowdate=datetime.datetime.now().strftime('%Y%m%d') # 文件统一命名TK+日期 dirname='TK'+str(nowdate)+'.csv' with open(dirname,'a',newline='' ,encoding='utf8') as cf: wf=csv.writer(cf) wf.writerow(row) 这个是今天下午的交易记录,时间上基本可用: 8,读取历史数据,和用历史数据生产的交易参数,此数据属于过去式,可以作为全局变量,加载到交易系统中。 读取的文件,就是盘前选股的文章中生成的文件。
写一个函数,按需从历史数据中调取: # 从wash文件中获取数据 # 0.005m速度 # @timefn def get_washdata(code,wap): global out
try: keydata=out.loc[code,wap] except: keydata=None print(code,wap,'不存在') # print(smban) return keydata 写一个函数,按需读取实时数据,
9,这里写个案例,可以利用8中的数据做一个冲高后回落的卖出条件: # 冲高后,回落3%函数 @timefn def get_grolue(code,lvwap): # print(code) try: tnowh=get_nowvalue(code,'最高') yclose=get_nowvalue(code,'昨收') ylow=get_washdata(code,'最低') except: return print(code,'数据有误') if tnowh>yclose: tnowhight=tnowh else: tnowhight=yclose # 择日修改,这里有负数没有处理 growlue=abs((tnowhight-ylow)/20)*lvwap if (tnowh-yclose)*100/yclose>12: growlue=abs((tnowhight-ylow)/20)*2
return float('%.3f' % growlue) 10,获取股票全局代码名称的函数,待修改,不完美
11,tk策略代码,将就看吧,我出一期专门的介绍。 class TK(bt.Strategy): params = dict( exitbars = 5, min_period = 21, ) #日志格式 def log(self, txt, dt=None,doprint=True): if doprint: dt = self.datas[0].datetime.datetime(0) print('%s, %s' % (dt.isoformat(), txt))
def __init__(self): global df0
self.inds=dict() self.volnow5=dict() self.priceright=dict() self.buy_cgvol_change=dict() self.buy_bgvol_change=dict() self.buy_apvol_change=dict() self.buy_dpvol_change=dict() self.selehigh=dict() self.selelow=dict() self.selevolume=dict() self.selemidban=dict() self.selemaxhigh=dict() self.seleminlow=dict() self.seleovlban=dict() self.selesmvo5=dict() self.opt_cyq_70=dict() self.buy_oh_signal=dict() self.buy_numvol=dict() self.buy_svol_change=dict() self.buy_kucun=dict() self.buy_bigvol=dict() self.buy_sobigvol=dict() self.buy_minvol=dict() self.buy_om_signal=dict() self.buy_mh_signal=dict() self.buy_cyq10_signal=dict() self.buy_cyq30_signal=dict() self.buy_cl_signal=dict() self.today_na=dict() self.today_mo_v=dict() self.buy_setchange=dict() self.timecheng=dict() self.buy_sellvol=dict() self.time_use=dict() self.time_set=dict() self.time_sum=dict() self.time_cut=dict() self.last_dt = datetime.datetime.now() self.sele_flv=dict() self.buy_ov_signal=dict() # 有可用量,才可卖出 self.sell_tf=dict() self.sell_mil=dict() self.sell_grolue=dict() self.sell_selfgrolue=dict() #加入成交量排名指数 vol_jzll=None vol_jz=[] self.vol_num=[] self.trader_list=dict() self.trader_list_oldd=dict() vol_jzp=[] vol_jzpee=[] vol_jzcross=[] vol_zqmc=[] vol_kez=[] self.trader_name_old=[] vol_jzpower=0 vol_cross=False self.today_open=dict() self.today_mot=dict() self.today_mo=dict() self.today_mo_t=dict() self.today_mb=dict() self.seleopen=dict() self.seleclose=dict() self.seleuse=dict() self.seleuptk=dict() self.tod_low=dict() self.buy_ol_signal=dict() self.buy_ol_t_signal=dict() self.trader_vol=dict()
nowdate=datetime.datetime.now().strftime('%Y%m%d') # 文件统一命名TD+日期 dirname='TK'+str(nowdate)+'.csv' try: f =open(dirname,'r') df2=pd.read_csv(dirname,header = 0,encoding='utf8') # print(df.head(2)) # 读取最后取值 grouped = df2.groupby('证券代码') f =open(dirname,'a') for name,group in grouped: print(name) self.trader_name_old.append(name) # print(group) trader_list_oldd = grouped.apply(lambda group:group.tail(1) ) self.trader_list_oldd=pd.DataFrame(trader_list_oldd,columns=['证券代码','证券名称','成熟度','时间','上穿高线','数量','可用','冻量','成交价','涨跌','振幅','成交量','调阈','交易信号']) self.trader_list_oldd.set_index('证券代码',drop=False,inplace=True) print(self.trader_list_oldd,'已经读取') print(dirname,'存在,已经读取') except IOError: df2=pd.DataFrame(None,index=None,columns=['证券代码','证券名称','成熟度','时间','上穿高线','数量','可用','冻量','成交价','涨跌','振幅','成交量','调阈','交易信号']) with open(dirname, 'a',newline='',encoding='utf8') as f: wf=csv.writer(f) wf.writerow(df2) f.close() print(dirname,'不存在;22已经创建') for i,d in enumerate(self.datas): opentod=get_nowvalue(d.name,'名称') vol_jz.append(d.name) vol_zqmc.append(opentod) vol_jzp.append(vol_jzll) vol_jzcross.append(vol_cross) vol_jzpee.append(vol_jzpower) self.trader_list['证券代码']=vol_jz self.trader_list['app']=vol_jz self.trader_list['证券名称']=vol_zqmc # self.trader_list['成熟度']=vol_jzpee self.trader_list['时间']=vol_jzp self.trader_list['数量']=vol_jzpee self.trader_list['可用']=vol_jzpee self.trader_list['冻量']=vol_jzpee self.trader_list['成交价']=vol_jzp self.trader_list['涨跌']=vol_jzp self.trader_list['振幅']=vol_jzp self.trader_list['成交量']=vol_jzp self.trader_list['交易信号']=vol_jzp # try: self.selehigh[d]=get_washdata(d.name,'最高')
self.selelow[d]=get_washdata(d.name,'最低') self.selevolume[d]=get_washdata(d.name,'成交量') self.today_open[d]=get_nowvalue(d.name,'今开') self.today_na[d]=get_nowvalue(d.name,'名称')[-4:] self.selehigh[d]=get_washdata(d.name,'最高') self.seleopen[d]=get_washdata(d.name,'open') self.seleclose[d]=get_washdata(d.name,'close') self.seleuptk[d]=get_washdata(d.name,'买入阈值TK') print(self.seleuptk[d],'---------成交量') # 上穿最高线 self.buy_oh_signal[d]=bt.ind.CrossUp(d.close,self.seleuptk[d])
self.trader_list=pd.DataFrame(self.trader_list) self.trader_list = self.trader_list.copy() self.trader_list.set_index('证券代码', inplace=True) print(self.trader_list,'读取数据后的') # 读取持仓 try: for code in self.trader_name_old: self.trader_list.loc[code,'app'] =self.trader_list_oldd.loc[code,'证券代码'] self.trader_list.loc[code,'证券名称'] =self.trader_list_oldd.loc[code,'证券名称'] self.trader_list.loc[code,'时间'] =self.trader_list_oldd.loc[code,'时间'] print(self.trader_list,'第一次赋值') except: pass try: position = user.position pos=pd.DataFrame(position) pos=pos.copy() print(pos) kcstock=[] for code in pos['证券代码']: kcstock.append(code) pos.set_index('证券代码', inplace=True) # 设置已有股票 for code in kcstock: codecc= 'sz'+code # 不同的券商需要写特定的列表 try: self.trader_list.loc[codecc,'数量'] =pos.loc[code,'股份余额'] self.trader_list.loc[codecc,'可用'] =pos.loc[code,'可用股份'] self.trader_list.loc[codecc,'冻量'] =pos.loc[code,'冻结数量'] self.trader_list.loc[codecc,'证券名称'] =pos.loc[code,'证券名称'] except: # 应该是中信的 self.trader_list.loc[codecc,'数量'] =pos.loc[code,'股票余额'] self.trader_list.loc[codecc,'可用'] =pos.loc[code,'可用余额'] self.trader_list.loc[codecc,'冻量'] =pos.loc[code,'冻结数量'] self.trader_list.loc[codecc,'证券名称'] =pos.loc[code,'证券名称'] except: pass
self.order = None self.buyprice = None self.buycomm = None self.size=0 self.count=0 print(len(self.trader_list)) print('开始交易!!!') def notify_order(self, order): #订单状态处理 if order.status in [order.Submitted, order.Accepted]: return # 买入卖出订单处理 if order.status in [order.Completed]: if order.isbuy(): self.log('买入单价: %.2f, 买入总金额: %.2f, 手续费 %.2f,数量%.2f'% (order.executed.price, order.executed.value *-1, order.executed.comm, order.executed.size))
elif order.issell(): self.log('卖出单价: %.2f, 总资产: %.2f, 手续费 %.2f,数量:%.2f' % (order.executed.price, self.broker.get_cash(), order.executed.comm, order.executed.size)) self.bar_executed = len(self) elif order.status in [order.Canceled, order.Margin, order.Rejected]: self.log('订单取消/保证金不足/拒绝') # Write down: no pending order self.order = None def notify_trade(self, trade): if not trade.isclosed: return
self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' % (trade.pnl, trade.pnlcomm)) # @timefn def next(self): for i,d in enumerate(self.datas): # 股票处于上涨状态 self.priceright[d]=(d.open[0]-d.open[-1])>0 # 卖出 self.sell_tf[d]=self.trader_list.loc[d.name,'可用'] > 0 self.time_set[d]=self.last_dt # FMT = '%H:%M:%S.%f' self.time_use[d]=datetime.datetime.now() self.time_sum[d]=(self.time_use[d]-self.time_set[d]).seconds self.last_dt=self.time_use[d] cname=d.name self.time_cut[d]=self.time_sum[d]>12 if self.time_cut[d]==1: print('无效数据') if self.time_sum[d]>6: print('[{cname:<{len}}\t读取成交量间隔:{dt}:\t交易时间:{dnow}'.format(cname=d.name+']',len=8-len(cname.encode('GBK'))+len(cname),dt=self.time_sum[d],dnow= self.time_use[d])) # 时间分段 self.trader_sell_time=datetime.datetime.now().time()>datetime.time(9, 29) and datetime.datetime.now().time()<datetime.time(14,55) self.trader_buy_time=datetime.datetime.now().time()>datetime.time(9, 45) and datetime.datetime.now().time()<datetime.time(14, 45) self.trader_time=datetime.datetime.now().time()>datetime.time(9, 30) and datetime.datetime.now().time()<datetime.time(14,55) self.trader_no_time=datetime.datetime.now().time()>datetime.time(9, 24) and datetime.datetime.now().time()<datetime.time(9,47) if self.priceright[d] ==1 and self.trader_time==1: self.trader_list.loc[d.name,'涨跌']='上涨' if self.priceright[d] !=1 and self.trader_time==1: self.trader_list.loc[d.name,'涨跌']='下跌' self.trader_list.loc[d.name,'时间']= datetime.datetime.now().time()
self.buy_kucun[d]=self.trader_list.loc[d.name,'数量']==0 self.trader_list.loc[d.name,'成交价']=d.close[0] # 格式化成交量100的倍数 def downcast(amount, lot): return abs(amount//lot*lot) try: # 买入金额分组以61.8万为单仓金额 mon27=20000 if self.buy_oh_signal[d]==1: balance = (user.balance['可用金额']) if balance >0: pass else: balance=0 print(d.name,'不满足预留金额') # 给自己的交易模式起个名字 self.trader_list.loc[d.name,'交易信号'] ='穿云箭' trader_save(self.trader_list.loc[d.name]) # 满足一个仓位交易 if balance >mon27: stake= downcast(mon27/self.datas[i].close, 100) if stake!=0: buy_no=user.buy(d._name[2:8], price=self.today_mb[d], amount = stake) self.trader_list.loc[d.name,'数量'] = stake+self.trader_list.loc[d.name,'数量'] print(buy_no) self.order = self.buy(data = d,size=stake) self.log(f'买入{d.name}, 买入价格:{d.close[0]:.2f}, 买入数量: {stake}') self.trader_list.loc[d.name,'交易信号'] ='穿云箭' trader_save(self.trader_list.loc[d.name]) else: pass
except Exception as e: print('错误类型是',e.__class__.__name__) print('错误明细是',e) def stop(self): pass 注意这里: 是买入条件,单纯的演示用 利用买入条件,买入股票: 12,bt的主程序
13,从交易软件读取股票信息: # 读取证券中的数据 @timefn def get_ck(): position = user.position pos=pd.DataFrame(position) pos=pos.copy() pos.set_index('证券代码') kcstock=[] for code in pos['证券代码']: codecc= 'sz'+code kcstock.append(codecc) print('\033[0;31m%s\033[0m' % kcstock) # print(kcstock) return kcstock 14,查看可用资金
15,选股程序,以及主程序 if __name__ == '__main__': global out out['代码']=out['code'] print (out) # 读取股票池 selec_dict=out[(out['今开'] >=out['stand_price'])&(out['买入阈值TK'] <out['zt'])][['代码']] print('选中股票:' ,len(selec_dict),'支') selec_csv=selec_dict stockpool=selec_csv['代码'] print(stockpool,) # 尝试读取持仓股票代码 try: position = user.position pos=pd.DataFrame(position) pos=pos.copy() pos.set_index('证券代码') kcstocks=[] print(pos) for code in pos['证券代码']: if '60' in code: codecc= 'sh'+code else: codecc= 'sz'+code print(codecc) if codecc in selec_csv['code'].values.tolist(): pass else: kcstocks.append(codecc) stockpool=np.append(stockpool,kcstocks) except: pass # 查看一下代码,股票池+持仓 print(stockpool) runstart() bingo,跑起来了 预计周末我有时间,把演示程序整理出来。 看一下今天的结果: 多少不尽人意,还是要用周易 2023年5月24日观瞻 |
|