量化缠论——维克多交易策略(一) 2016-10-24 程序化交易者 程序化交易者
程序化交易者网(www.programtrader.net),国内最专业的程序化交易门户,提供专业的程序化交易,高频交易,量化投资,算法交易,交易策略等资讯、培训、技术支持等服务。 如果涉及复杂的运算,推荐使用提取Dict形式的数据回测,速度会有质的飞跃。 收益风险 源码: import pandas as pd from datetime import timedelta, date def initialize(context): g.security = ['600307.XSHG'] set_universe(g.security) set_benchmark('600307.XSHG') set_commission(PerTrade(buy_cost=0.0003, sell_cost=0.0013, min_cost=5)) set_slippage(PriceRelatedSlippage()) g.n = 5 #获取几分钟k线 ## 获取前几日的趋势 ''' temp_data 包含处理后最后一个k线的数据 zoushi 包含关系处理中关于走势的记录 after_baohan 合并后的k线''' g.temp_data, g.zoushi, g.after_baohan = k_initialization(security=g.security[0],num = 10*48,n=g.n) ## 每日运行一次'根据跌幅止损'、'根据大盘跌幅止损' run_daily(dapan_stoploss) #根据大盘止损,如不想加入大盘止损,注释此句即可 # run_daily(sell) # 根据跌幅卖出 # run_daily(sell2) # 按天亏损率止损 & 固定止盈 def handle_data(context, data): security = g.security Cash = context.portfolio.cash hour = context.current_dt.hour minute = context.current_dt.minute n = g.n if (hour==9 and minute ==30) or (hour==13 and minute ==00): pass else: if minute%n==0: print 'time: %s:%s' %(hour,minute) # 获得前n分钟的k线 x = '%sm'%n temp_hist = attribute_history(security[0], 1, str(x),['high', 'low'], df=False) # 包含关系处理 Input_k = {'high':[],'low':[]} Input_k['high'].append(g.temp_data['high']) Input_k['low'].append(g.temp_data['low']) Input_k['high'].append(temp_hist['high'][0]) Input_k['low'].append(temp_hist['low'][0]) g.temp_data, g.zoushi, g.after_baohan = recognition_baohan(Input_k, g.zoushi, g.after_baohan) # 分型 fenxing_type, fenxing_time, fenxing_plot, fenxing_data = recognition_fenxing(g.after_baohan) ''' fenxing_type 记录分型点的类型,1为顶分型,-1为底分型 fenxing_time 记录分型点的时间 fenxing_plot 记录点的数值,为顶分型去high值,为底分型去low值 fenxing_data 分型点的DataFrame值 ''' # print fenxing_type, fenxing_time, fenxing_plot, fenxing_data # 判断趋势是否反转,并买进 if len(fenxing_type)>7: if fenxing_type[0] == -1: location_1 = [i for i,a in enumerate(fenxing_type) if a==1] # 找出1在列表中的所有位置 location_2 = [i for i,a in enumerate(fenxing_type) if a==-1] # 找出-1在列表中的所有位置 # 线段破坏 case1 = fenxing_data['low'][location_2[0]] > fenxing_data['low'][location_2[1]] # 线段形成 case2 = fenxing_data['high'][location_1[1]] < fenxing_data['high'][location_1[2]]=""><> case3 = fenxing_data['low'][location_2[1]] < fenxing_data['low'][location_2[2]]=""><> # 第i笔中底比第i+2笔顶高(扶助判断条件,根据实测加入之后条件太苛刻,很难有买入机会) case4 = fenxing_data['low'][location_2[1]] > fenxing_data['high'][location_1[3]] if case1 and case2 and case3 : # 买入 order_value(security[0],Cash)
# 每分钟亏损率止损 & 固定止盈 if len(context.portfolio.positions) > 0: for stock in list(context.portfolio.positions.keys()): price = data[stock].pre_close avg_cost = context.portfolio.positions[stock].avg_cost # if (price-avg_cost)/avg_cost >= 0.2 : # order_target(stock, 0) if (price-avg_cost)/avg_cost <= -0.1="">=> order_target(stock, 0) elif (price-avg_cost)/avg_cost >= 0.18 : order_target(stock, 0)
'''下面为各类函数''' ################################################################ def sell2(context): ## 亏损率止损 & 固定止盈 if len(context.portfolio.positions) > 0: for stock in list(context.portfolio.positions.keys()): hist = attribute_history(stock, 1, '1d', 'close',df=False) price = hist['close'][0] avg_cost = context.portfolio.positions[stock].avg_cost if (price-avg_cost)/avg_cost <= -0.1="">=> order_target(stock, 0) elif (price-avg_cost)/avg_cost >= 0.2 : order_target(stock, 0) def dapan_stoploss(context): ## 根据局大盘止损,具体用法详见dp_stoploss函数说明 stoploss = dp_stoploss(kernel=2, n=10, zs=0.05) if stoploss: if len(context.portfolio.positions)>0: for stock in list(context.portfolio.positions.keys()): order_target(stock, 0) # return
def sell(context): # 根据跌幅卖出 if len(context.portfolio.positions)>0: for stock in list(context.portfolio.positions.keys()): hist = attribute_history(stock, 3, '1d', 'close',df=False) if ((1-float(hist['close'][-1]/hist['close'][0])) >= 0.15): order_target(stock,0) def recognition_fenxing(after_baohan): ''' 从后往前找 返回值: fenxing_type 记录分型点的类型,1为顶分型,-1为底分型 fenxing_time 记录分型点的时间 fenxing_plot 记录点的数值,为顶分型去high值,为底分型去low值 fenxing_data 分型点的DataFrame值 ''' ## 找出顶和底 temp_num = 0 #上一个顶或底的位置 temp_high = 0 #上一个顶的high值 temp_low = 0 #上一个底的low值 temp_type = 0 # 上一个记录位置的类型 end = len(after_baohan['high']) i = end-2 fenxing_type = [] # 记录分型点的类型,1为顶分型,-1为底分型 fenxing_time = [] # 记录分型点的时间 fenxing_plot = [] # 记录点的数值,为顶分型去high值,为底分型去low值 fenxing_data = {'high':[],'low':[]} # 分型点的DataFrame值 while (i >= 1): if len(fenxing_type)>8: break else: case1 = after_baohan['high'][i-1] case2 = after_baohan['low'][i-1]>after_baohan['low'][i] and after_baohan['low'][i] if case1: if temp_type == 1: # 如果上一个分型为顶分型,则进行比较,选取高点更高的分型 if after_baohan['high'][i] <=>=> i -= 1 else: temp_high = after_baohan['high'][i] temp_num = i temp_type = 1 i -= 4 elif temp_type == 2: # 如果上一个分型为底分型,则记录上一个分型,用当前分型与后面的分型比较,选取同向更极端的分型 if temp_low >= after_baohan['high'][i]: # 如果上一个底分型的底比当前顶分型的顶高,则跳过当前顶分型。 i -= 1 else: fenxing_type.append(-1) # fenxing_time.append(after_baohan.index[temp_num].strftime('%Y-%m-%d %H:%M:%S')) fenxing_data['high'].append(after_baohan['high'][temp_num]) fenxing_data['low'].append(after_baohan['low'][temp_num]) fenxing_plot.append(after_baohan['high'][i]) temp_high = after_baohan['high'][i] temp_num = i temp_type = 1 i -= 4 else: if (after_baohan['low'][i-2]>after_baohan['low'][i-1] and after_baohan['low'][i-1]<>< p=''><> temp_low = after_baohan['low'][i] temp_num = i-1 temp_type = 2 i -= 4 else: temp_high = after_baohan['high'][i] temp_num = i temp_type = 1 i -= 4
elif case2: if temp_type == 2: # 如果上一个分型为底分型,则进行比较,选取低点更低的分型 if after_baohan['low'][i] >= temp_low: i -= 1 else: temp_low = after_baohan['low'][i] temp_num = i temp_type = 2 i -= 4 elif temp_type == 1: # 如果上一个分型为顶分型,则记录上一个分型,用当前分型与后面的分型比较,选取同向更极端的分型 if temp_high <= after_baohan['low'][i]:="" #="">=> i -= 1 else: fenxing_type.append(1) # fenxing_time.append(after_baohan.index[temp_num].strftime('%Y-%m-%d %H:%M:%S')) fenxing_data['high'].append(after_baohan['high'][temp_num]) fenxing_data['low'].append(after_baohan['low'][temp_num]) fenxing_plot.append(after_baohan['low'][i]) temp_low = after_baohan['low'][i] temp_num = i temp_type = 2 i -= 4 else: if (after_baohan['high'][i-2] temp_high = after_baohan['high'][i] temp_num = i-1 temp_type = 1 i -= 4 else: temp_low = after_baohan['low'][i] temp_num = i temp_type = 2 i -= 4 else: i -= 1 return fenxing_type, fenxing_time, fenxing_plot, fenxing_data def recognition_baohan(Input_k, zoushi, after_baohan): ''' =''> 阅读 |
|