分享

量化投资之高级策略合集(建议收藏)

 老鹰666 2024-06-15 发布于重庆

此部分内容对于传统投资学专业有障碍,对于金融科技专业学生应该可以把握。

1.巴菲特的alpha

      巴菲特的投资策略和其所实现的超额回报(即Alpha)一直是金融学界和实务界的研究焦点。Alpha是一个衡量投资性能的指标,代表了一个投资组合相对于其基准指数的超额回报。在沃伦·巴菲特的案例中,这意味着他管理的伯克希尔·哈撒韦公司的投资回报超过了一般市场的表现。

经济学家和金融分析师经常研究巴菲特的投资策略,试图量化他的Alpha并理解其能够持续战胜市场的原因。研究显示,巴菲特的Alpha部分来自于他选股的能力和行业配置的策略,而且他那看似简单的价值投资策略背后实际上是复杂和精细的财务分析及市场洞察。
巴菲特的成功和高Alpha也常被视为投资领域中的一个标杆,激励了无数投资者和基金经理去模仿他的策略和原则。不过,巴菲特本人常常强调,耐心、纪律和正确的心态是投资成功的关键。

在《巴菲特的alpha》文章里,后人把巴菲特的收益分成六个维度,分别是市场,估值,规模,动量,质量和波动率六个维度。我们今天就开始复现其中的原理,除去市场维度,我们从其他五个维度分别挑选因子,总共6个,组成6因子模型。  

对于因子的处理,由于因子来自不同维度,所以无需进行降维或者因子正交处理来解决它的相关性问题,所以简单进行了去极值和标准化处理

对于股票列表,已经进行剔除ST,上市未满60天的新股,停牌股和开盘涨停股

对于打分方式:针对升序因子乘以-1;针对降序因子乘以1,最后进行叠加

在择时方面,采用RSRS的方式对指数进行择时信号。

资金规模在10000000,20日调仓,回测时间从2010-01-01到2018-11-08

'''收益还可以,可以使用
'''
import pandas as pdimport numpy as npimport datetime as dtimport talib as tafrom datetime import date,timedelta import statsmodels.api as sm
#初始化账户 def init(context): set_params(context) set_variables(context) set_backtest() run_daily(stop_loss) #设置策参数def set_params(context):
g.tc = 20 #调仓频率 g.t=0 g.big_small = 'big' #big是降序,small为升序 context.stock = '000300.SH' g.long_pct = 0.05 g.stock='000300.SH' #择时选取的指数 g.total_positionprevious=0 #仓位 g.N = 18 #RSRS选取的回归长度 g.M = 1100 #RSRS均值窗口 def set_variables(context): context.X_length=11 context.flag=True g.buy = 0.7 #买入阀门 g.sell = -0.7 #卖出阀门 g.ans = [] g.ans_rightdev= [] def set_backtest(): set_benchmark('000300.SH') # 设置基准 set_slippage(PriceSlippage(0.002)) # 设置可变滑点

def before_trading(context): #需要先建立过去的数据集合,否则后面新数据没有历史数据作为窗口 if context.flag: initlast_date=context.now-timedelta(days=1) prices = get_price(g.stock, '2006-06-05', initlast_date, '1d', ['high', 'low']) #获取最高价和最低价 highs = prices.high lows = prices.low #建立一个初始的装有beta从过去到初始阶段的历史数据的列表g.ans g.ans = [] for i in range(len(highs))[g.N:]: data_high = highs.iloc[i-g.N+1:i+1] data_low = lows.iloc[i-g.N+1:i+1] X = sm.add_constant(data_low) model = sm.OLS(data_high,X) results = model.fit() g.ans.append(results.params[1]) # 装有rsquare从过去到初始阶段历史数据的列表 g.ans_rightdev.append(results.rsquared) context.flag=False

#个股止损def stop_loss(context,bar_dict): for stock in list(context.portfolio.positions): cumulative_return=bar_dict[stock].close/context.portfolio.positions[stock].cost_basis if cumulative_return<0.9: order_target_value(stock,0)


def handle_bar(context,bar_dict): stock = g.stock beta=0 r2=0 prices = history(stock,['high', 'low'], g.N, '1d', False, 'pre', is_panel=1) highs = prices.high lows = prices.low X = sm.add_constant(lows) model = sm.OLS(highs, X) #得到beta beta = model.fit().params[1] #将新的beta添加到装有历史数据列表 g.ans.append(beta) #得到rsquare数据 r2=model.fit().rsquared #将新的rsquare添加到装有历史数据列表 g.ans_rightdev.append(r2) #为了标准化当下的beta数值,拿过去1100天的数据作为均值的窗口 section = g.ans[-g.M:] # 计算均值序列 mu = np.mean(section) # 计算标准化RSRS指标序列 sigma = np.std(section) zscore = (section[-1]-mu)/sigma #计算右偏RSRS标准分,就是将标准化后的beta数据乘以原始beta再乘以拟合度 zscore_rightdev= zscore*beta*r2 #根据交易信号买入卖出 if zscore_rightdev > g.buy: total_position=1 elif zscore_rightdev < g.sell: total_position=0 else: total_position=g.total_positionprevious if (g.total_positionprevious != total_position) or (g.t%g.tc==0): g.total_positionprevious=total_position last_date=get_last_datetime().strftime('%Y%m%d') stock_list=list(get_all_securities('stock',date=last_date).index) #对stock_list进行去除st,停牌等处理 stock_list=fun_unpaused(bar_dict, stock_list) stock_list=fun_st(bar_dict,stock_list) stock_list=fun_highlimit(bar_dict,stock_list) stock_list=fun_remove_new(stock_list, 60) #以下是各单因子 #规模因子 cap_df = market_cap(stock_list, 'valuation_market_cap',last_date) cap_df = cap_df * -1 #估值因子 PB_df = PB(stock_list, 'valuation_pb',last_date) PB_df = PB_df * -1 #动量因子 MTM20_df = MTM20(stock_list, 'MTM20') MTM20_df=MTM20_df* -1 #质量因子 #1.ROE(高利润) roe_df = roe(stock_list, 'profit_roe_ths',last_date) #2.净利润同比增长率(高成长) net_profit_growth_ratio_df=net_profit_growth_ratio(stock_list,'growth_net_profit_growth_ratio',last_date) #波动率因子 ATR20_df = ATR20(stock_list, 'ATR20') ATR20_df = ATR20_df * -1


#合并多因子 concat_obj = [cap_df, PB_df,MTM20_df,roe_df,net_profit_growth_ratio_df,ATR20_df] df = pd.concat(concat_obj, axis=1) df = df.dropna()# log.info(type(df)) sum = df.sum(axis=1) #log.info(sum)

#进行排序 if g.big_small == 'big': # 按照大排序 sum.sort_values(ascending = False,inplace=True) if g.big_small == 'small': # 按照小排序 sum.sort_values(ascending = True,inplace=True) # 根据比例取出排序后靠前部分 stock_list1 = sum[0:int(len(stock_list)*g.long_pct)].index #log.info(stock_list1) buy_list = [] for stock in stock_list1: buy_list.append(stock) #买卖操作 for stock in list(context.portfolio.positions): if stock not in buy_list: order_target(stock, 0) cash = context.portfolio.portfolio_value position=cash*g.total_positionprevious # position=cash*g.SAR_signal num=int(len(stock_list)*g.long_pct) ## 买入 for stock in buy_list: order_target_value(stock,position/num)
g.t=g.t+1
'''以下是单因子''' def market_cap(stocklist, factor,last_date): # 取数据 df = get_fundamentals(query(valuation.symbol, valuation.market_cap).filter(valuation.symbol.in_(stocklist)),date=last_date) #log.info(df)
df = df.set_index('valuation_symbol') # 绝对中位数法取极值 after_MAD = MAD(factor, df) # z-score法标准化 after_zscore = zscore(factor, after_MAD) return after_zscore def PB(stocklist, factor,last_date): # 取数据 df = get_fundamentals(query(valuation.symbol, valuation.pb).filter(valuation.symbol.in_(stocklist)),date=last_date) df = df.set_index('valuation_symbol') # 绝对中位数法取极值 after_MAD = MAD(factor, df) # z-score法标准化 after_zscore = zscore(factor, after_MAD) return after_zscore


def MTM20(stocklist, factor): # 取数据 for stock in stocklist: df1=history(stock,['close'],20,'1d') # log.info(df1) s = pd.DataFrame([(df1['close'][-1]-df1['close'][0])/df1['close'][0]], index=[stock]) # log.info(s) if 'df' in locals(): df = df.append(s) else: df = s #log.info(df) df.columns = ['MTM20'] df.index.name = 'valuation_symbol' # 绝对中位数法取极值 after_MAD = MAD(factor, df) # z-score法标准化 after_zscore = zscore(factor, after_MAD) return after_zscore

def roe(stocklist, factor,last_date): # 取数据 df = get_fundamentals(query(valuation.symbol, profit.roe_ths).filter(valuation.symbol.in_(stocklist)),date=last_date)# log.info(df) df = df.set_index('valuation_symbol') # 绝对中位数法取极值 after_MAD = MAD(factor, df) # z-score法标准化 after_zscore = zscore(factor, after_MAD) return after_zscore


def net_profit_growth_ratio(stocklist, factor,last_date): # 取数据 df = get_fundamentals(query(valuation.symbol, growth.net_profit_growth_ratio).filter(valuation.symbol.in_(stocklist)),date=last_date)# log.info(df) df = df.set_index('valuation_symbol') # 绝对中位数法取极值 after_MAD = MAD(factor, df) # z-score法标准化 after_zscore = zscore(factor, after_MAD) return after_zscore
def ATR20(stocklist, new_factor): # 取数据 for stock in stocklist: Data_ATR = history(stock,['close','high','low'],20,'1d') close_ATR = np.array(Data_ATR['close']) high_ATR = np.array(Data_ATR['high']) low_ATR = np.array(Data_ATR['low']) ''' if np.isnan(close_ATR).any(): continue ''' ATR = ta.ATR(high_ATR, low_ATR, close_ATR, timeperiod=1) indices = ~np.isnan(ATR) result = np.average(ATR[indices]) s = pd.Series(result.astype(float), index=[stock]) if 'ATR_df' in locals(): ATR_df = ATR_df.append(s) else: ATR_df = s df = ATR_df.to_frame() df.index.name = 'valuation_symbol' df.columns = [new_factor] # 绝对中位数法取极值 after_MAD = MAD(new_factor, df) # z-score法标准化 after_zscore = zscore(new_factor, after_MAD) return after_zscore
'''以下是进行因子数据处理,对因子进行MAD去极值,以及标准化处理''' def MAD(factor, df): # 取得中位数 median = df[factor].median() # 取得数据与中位数差值 df1 = df-median # 取得差值绝对值 df1 = df1.abs() # 取得绝对中位数 MAD = df1[factor].median() # 得到数据上下边界 extreme_upper = median + 3 * 1.483 * MAD extreme_lower = median - 3 * 1.483 * MAD # 将数据上下边界外的数值归到边界上 df.ix[(df[factor]<extreme_lower), factor] = extreme_lower df.ix[(df[factor]>extreme_upper), factor] = extreme_upper return df


# z-score标准化def zscore(factor, df): # 取得均值 mean = df[factor].mean() # 取得标准差 std = df[factor].std() # 取得标准化后数据 df = (df - mean) / std return df
'''以下对股票列表进行去除ST,停牌,去新股,以及去除开盘涨停股''' #去除开盘涨停股票def fun_highlimit(bar_dict,stock_list): return [stock for stock in stock_list if bar_dict[stock].open!=bar_dict[stock].high_limit]
#去除st股票def fun_st(bar_dict,stock_list): return [stock for stock in stock_list if not bar_dict[stock].is_st]

def fun_unpaused(bar_dict, stock_list):
return [s for s in stock_list if not bar_dict[s].is_paused]

def fun_remove_new(_stock_list, days): deltaDate = get_datetime() - dt.timedelta(days) stock_list = [] for stock in _stock_list: if get_security_info(stock).listed_date < deltaDate: stock_list.append(stock) return stock_list

2.彼得林奇PEG价值选股策略

       彼得·林奇是一位非常成功的投资者和前麦哲伦基金的基金经理,他提出了一种被广泛采用的价值选股策略,即使用市盈率相对盈利增长率(PEG)作为衡量股票价值的主要指标。PEG比率是一种受到投资者喜爱的指标,因为它试图通过考虑公司的增长来提供比传统市盈率(PE)更全面的股票估值方法。

策略名称: 彼得林奇PEG价值选股策略

策略思路:

1.选择PEG < 0.5, 即稳定成长且价值被低估的股票

  其中PEG = PE / growth_rate

2.使用ES风险平价配权

3.根据组合的日内波动小于3%的条件, 与货币基金组合配资

4.最大持仓5只股票和1只货币基金, 优先买入市值小的, 15天调仓一次

5.剔除了周期性和项目类行业(该部分对改善回撤有明显的效果)

'''策略名称: 彼得林奇PEG价值选股策略作者: edward07t策略思路:1.选择PEG < 0.5, 即稳定成长且价值被低估的股票  其中PEG = PE / growth_rate2.使用ES风险平价配权3.根据组合的日内波动小于3%的条件, 与货币基金组合配资4.最大持仓5只股票和1只货币基金, 优先买入市值小的, 15天调仓一次5.剔除了周期性和项目类行业(该部分对改善回撤有明显的效果)'''
import pandas as pdimport numpy as npimport datetime as dt
def init(context): # 当前持仓数:0 context.num = 0 # 最大持仓股票数:5 context.stock_max_num = 5 # 每只股票的最大风险敞口 context.risk_limit = 0.03 / context.stock_max_num # 已持仓天数:0天 # 持仓周期:15天 context.hold_days, context.hold_cycle = 0, 15 # 计算ES的置信度 context.confidenceLevel = 0.05 # 计算ES的历史回测长度 context.lag = 180 # 每个交易日09:31运行 run_daily(func=fun_main, reference_security='000001.SZ')
###################### 主函数 def fun_main(context, bar_dict): flag = fun_needRebalance(context) if flag: last_date = get_last_datetime().strftime('%Y%m%d') universe = list(get_all_securities('stock', date = last_date).index) # stock_list = universe stock_list = fun_unpaused(universe,bar_dict) stock_list = fun_remove_new(stock_list, 60) stock_list = fun_remove_periodic_industry(stock_list, last_date)
line = '###############' + str(last_date) + ': ' + str(len(universe)) + ' / ' + str(len(stock_list)) print(line)
# 计算单季度增长率 df_growth = fun_cal_growth_rate(bar_dict, stock_list, last_date)
# 计算PEG df_all = fun_cal_PEG(bar_dict, df_growth.copy(), last_date)
# 选股 buy_list = fun_get_buy_list(context, bar_dict, df_all.copy(), context.stock_max_num, last_date)
# 计算股票权重 stock_weight = fun_cal_stock_weight(buy_list, context.lag, context.confidenceLevel, last_date, 'ES') print('stock_weight: ') print(stock_weight) bond_weight = {'511880.OF': 1.0} # 计算最终权重(考虑组合风险 + 债券) trade_ratio = fun_cal_position(context, bar_dict, stock_weight, bond_weight) print('trade_ratio: ') print(trade_ratio) # 下单 fun_do_trade(context, bar_dict, trade_ratio)
context.num = len(context.portfolio.stock_account.positions)

def handle_bar(context,bar_dict): pass

#### 1.判断是否调仓函数 #########################################def fun_needRebalance(context): # 条件1: 持仓股数为0时,重新调仓 if context.num == 0: context.hold_days = 0 return True # 条件2:持仓到调仓周期时,重新调仓 elif context.hold_days == context.hold_cycle: context.hold_days = 0 return True else: context.hold_days += 1 return False

#### 2.剔除停牌股票函数 #########################################def fun_unpaused(_stock_list,bar_dict): return [stock for stock in _stock_list if not bar_dict[stock].is_paused]
#### 3.剔除上市不到60天的新股 ###################################def fun_remove_new(_stock_list, days): deltaDate = get_datetime() - dt.timedelta(days) stock_list = [] for stock in _stock_list: if get_security_info(stock).start_date < deltaDate: stock_list.append(stock) return stock_list
#### 4.剔除周期性行业 ###########################################def fun_remove_periodic_industry(stock_list, last_date): periodic_industry = [#'T0101', # 种植业与林业 #'T0102', # 养殖业 #'T0103', # 农产品加工 #'T0104', # 农业服务 'T0201', # 煤炭开采 'T0202', # 石油矿业开采 'T0203', # 采掘服务 'T0301', # 基础化学 'T0302', # 化学制品 'T0303', # 化工新材料 'T0304', # 化工合成材料 'T0401', # 钢铁 'T0501', # 有色冶炼加工 'T0502', # 新材料 'T0601', # 建筑材料 'T0602', # 建筑装饰 'T0701', # 通用设备 'T0702', # 专用设备 'T0703', # 仪器仪表 'T0704', # 电气设备 #'T0801', # 半导体及元件 #'T0802', # 光学光电子 #'T0803', # 其他电子 #'T0804', # 电子制造 'T0901', # 汽车整车 'T0902', # 汽车零部件 'T0903', # 非汽车交运 'T0904', # 交运设备服务 #'T1001', # 通信设备 #'T1002', # 计算机设备 'T1101', # 白色家电 'T1102', # 视听器材 #'T1201', # 饮料制造 #'T1202', # 食品加工制造 #'T1301', # 防止制造 #'T1302', # 服装家纺 #'T1401', # 造纸 #'T1402', # 包装印刷 #'T1403', # 家用轻工 #'T1501', # 化学制药 #'T1502', # 中药 #'T1503', # 生物制品 #'T1504', # 医药商业 #'T1505', # 医疗器械服务 'T1601', # 电力 'T1602', # 燃气水务 'T1603', # 环保工程 'T1701', # 港口航运 'T1702', # 公路铁路运输 #'T1703', # 公交 #'T1704', # 机场航运 #'T1705', # 物流 'T1801', # 房地产开发 'T1802', # 园区开发 'T1901', # 银行 'T1902', # 保险及其他 'T1903', # 证券 #'T2001', # 零售 'T2002', # 贸易 #'T2101', # 景点及旅游 #'T2102', # 酒店及餐饮 #'T2201', # 通信服务 #'T2202', # 计算机应用 #'T2203', # 传媒 #'T2301', # 综合 #'T2401' # 国防军工 ] for industry in periodic_industry: stocks = get_industry_stocks(industry, last_date) stock_list = list(set(stock_list).difference(set(stocks))) return stock_list

#### 5.计算单季度增长率函数 ####################################################def fun_cal_growth_rate(bar_dict, stock_list, last_date): # 计算单季度净收益函数 def fun_get_quarterly_net_profit(current_quarter, last_quarter, stock_list): q = query(income.symbol, income.net_profit ).filter(income.symbol.in_(stock_list))
# 当季度累计净利润 df1 = get_fundamentals(q, statDate = current_quarter) df1.columns = ['symbol', 'total_net_profit']
# 上季度累计净利润 df2 = get_fundamentals(q, statDate = last_quarter) df2.columns = ['symbol', 'last_total_net_profit']
df = pd.merge(df1, df2, on='symbol')
# 当前单季度净利润 df['quarterly_net_profit'] = df['total_net_profit'] - df['last_total_net_profit']
del df['total_net_profit'] del df['last_total_net_profit']
return df ########## 获取财报季度日期 q = query(income.symbol, income.stat_date ).filter(income.symbol.in_(stock_list))
df = get_fundamentals(q, date = last_date) df = df.sort_values(['income_stat_date'], ascending=False) last_statDate = df.iloc[0,1] # 剔除未按时公布报表的公司,避免未来函数 df[df['income_stat_date'] != last_statDate] = None df = df.dropna() stock_list = list(df['income_symbol'].values)
the_year = int(str(last_statDate)[0:4]) the_month = str(last_statDate)[5:7]
########## 获取财报单季度净利润增长率 ''' quarter_1: 当季度 quarter_2: 上季度 quarter_3: 同比上季度 quarter_4: 去年上季度 '''
if the_month == '03': # 因为一季度的报表是单季度表,所以需要单独处理 quarter_1 = str(the_year) + 'q1' quarter_3 = str(the_year - 1) + 'q1'
q = query(income.symbol, income.net_profit ).filter(income.symbol.in_(stock_list))
df1 = get_fundamentals(q, statDate = quarter_1) df1.columns = ['symbol', 'current_net_profit']
df2 = get_fundamentals(q, statDate = quarter_3) df2.columns = ['symbol', 'last_net_profit']
else: if the_month == '12': quarter_1 = str(the_year) + 'q4' quarter_2 = str(the_year) + 'q3' quarter_3 = str(the_year - 1) + 'q4' quarter_4 = str(the_year - 1) + 'q3'
elif the_month == '09': quarter_1 = str(the_year) + 'q3' quarter_2 = str(the_year) + 'q2' quarter_3 = str(the_year - 1) + 'q3' quarter_4 = str(the_year - 1) + 'q2'
elif the_month == '06': quarter_1 = str(the_year) + 'q2' quarter_2 = str(the_year) + 'q1' quarter_3 = str(the_year - 1) + 'q2' quarter_4 = str(the_year - 1) + 'q1'
else: print('There is something wrong with the stat_date.')
# 计算当期单季度净利润 df1 = fun_get_quarterly_net_profit(quarter_1, quarter_2, stock_list) df1.columns = ['symbol', 'current_net_profit']
# 计算同比上期单季度净利润 df2 = fun_get_quarterly_net_profit(quarter_3, quarter_4, stock_list) df2.columns = ['symbol', 'last_net_profit']

df_growth = pd.merge(df1, df2, on='symbol') # 增长率单位为% df_growth['growth_rate'] = (df_growth['current_net_profit'] / df_growth['last_net_profit'] - 1) * 100
return df_growth

#### 6.计算PEG函数 ############################################################def fun_cal_PEG(bar_dict, df_growth, last_date): stock_list = stock_list = list(df_growth['symbol'].values) q = query(valuation.symbol, valuation.pe_ttm, ).filter(valuation.symbol.in_(stock_list))
df_pe = get_fundamentals(q, date = last_date) df_pe.columns = ['symbol', 'pe_ttm'] # 剔除PE值为负的股票 df_pe[df_pe['pe_ttm'] < 0] = None df_pe = df_pe.dropna() # 使用中位数去极值法 df_pe = winsorize(df_pe, 'pe_ttm') df_pe = df_pe.dropna() # 彼得林奇的文章中提到:增长率>50的公司,高增长不可持续 df_growth[df_growth['growth_rate'] > 50] = None # 剔除增长率为负的公司 df_growth[df_growth['growth_rate'] <= 0] = None
df_growth = df_growth.dropna()
del df_growth['current_net_profit'] del df_growth['last_net_profit']
df_all = pd.merge(df_pe, df_growth, on='symbol')
df_all['PEG'] = df_all['pe_ttm'] / df_all['growth_rate'] return df_all

#### 7.根据PEG选股函数 ###########################################################def fun_get_buy_list(context, bar_dict, df_all, n, last_date): # 获取股票市值信息函数 def fun_get_market_cap(df_selected, last_date): stock_list = list(df_selected['symbol'].values) # 获取股票市值 q = query(valuation.symbol, valuation.market_cap ).filter(valuation.symbol.in_(stock_list))
df_cap = get_fundamentals(q, date = last_date) df_cap.columns = ['symbol', 'market_cap']
df_selected = pd.merge(df_selected, df_cap, on = 'symbol') df_selected = df_selected.sort_values(['market_cap'], ascending = True) df_selected = df_selected.reset_index() del df_selected['index'] return df_selected # PEG 需小于 0.5 df_selected = df_all[df_all['PEG'] < 0.5].copy() # 增添股票的市值信息到df_selected df_selected = fun_get_market_cap(df_selected.copy(), last_date) # 获得备选股票列表 if len(df_selected) >= n: buy_list = list(df_selected['symbol'][:n].values) for i in range(n): print(str(df_selected['symbol'][i]) + ', PEG = ' + str(df_selected['PEG'][i])) else: print('新股仅: ' + str(len(df_selected))) buy_list = list(df_selected['symbol'].values) for i in range(len(df_selected)): print(str(df_selected['symbol'][i]) + ', PEG = ' + str(df_selected['PEG'][i])) old_stock_list = list(context.portfolio.stock_account.positions.keys()) if '511880.OF' in old_stock_list: old_stock_list.remove('511880.OF') if len(old_stock_list) > 0: df_growth_old = fun_cal_growth_rate(bar_dict, old_stock_list, last_date)
df_all_old = fun_cal_PEG(bar_dict, df_growth_old.copy(), last_date)
df_selected_old = df_all_old[df_all_old['PEG'] < 1.0].copy()
df_selected_old = fun_get_market_cap(df_selected_old.copy(), last_date)
old_stock_list = list(df_selected_old['symbol'].values)
i = len(buy_list) for stock in old_stock_list: if i < n: buy_list.append(stock) print(str(stock) + ', PEG = ' + str(df_selected_old.loc[df_selected_old['symbol'] == stock, 'PEG'].values)) i += 1 else: break return buy_list

#### 8.计算股票仓位 ##############################################################def fun_cal_stock_weight(stock_list, lag, alpha, last_date, flag=None): # 计算个股ES风险 def fun_cal_stockES(stock, lag, alpha, last_date): if lag * alpha < 3: print('The size of lag is too small for the given confidence level.') prices = get_price(stock, start_date=None, end_date=last_date, fre_step='1d', fields=['close'], skip_paused=False, fq='pre', bar_count=lag, is_panel=0)
dailyReturns = prices.pct_change().dropna() dailyReturns_sort = dailyReturns.sort_values(['close'], ascending=True)
num = round((lag-1) * alpha) ES = dailyReturns_sort['close'][:num].sum() / num
return ES # ES风险平价配股 if flag == 'ES': stock_position = {} total_position = 0 for stock in stock_list: risk = fun_cal_stockES(stock, lag, alpha, last_date) stock_position[stock] = 1.0 / risk total_position += stock_position[stock]
stock_real_position = {} for stock in stock_list: stock_real_position[stock] = stock_position[stock] / total_position # 等权重配股 else: stock_real_position = {} for stock in stock_list: stock_real_position[stock] = 1.0 / len(stock_list)
return stock_real_position

#### 9.计算加入货币基金后的资产配置 ##################################################def fun_cal_position(context, bar_dict, stock_weight, bond_weight, position_ratio = 1.0): # 计算组合收益 def fun_get_portfolio_daily_return(bar_dict, stock_weight, lag=180): last_date = get_last_datetime().strftime('%Y%m%d') stock_list = list(stock_weight.keys()) df = pd.DataFrame() for stock in stock_list: prices = get_price(stock, start_date=None, end_date=last_date, fre_step='1d', fields=['close'], skip_paused=False, fq='pre', bar_count=lag, is_panel=0) df[stock] = prices['close'] df = df.pct_change().dropna() df['portfolio_returns'] = 0 for stock in stock_list: df['portfolio_returns'] += df[stock] * stock_weight[stock] del df[stock] df = df.sort_values(['portfolio_returns'], ascending = True) return df # 计算组合ES值 def fun_get_portfolio_ES(dailyReturns, alpha): lag = len(dailyReturns) num = round(lag * alpha) ES = - (dailyReturns['portfolio_returns'][:num].sum() / num) if ES < 0: ES = 0 print('ES: ' + str(ES)) return ES # 计算组合VaR值 def fun_get_portfolio_VaR(dailyReturns, alpha): z_score = {0.05: 1.65, 0.04: 1.75, 0.01: 2.33, 0.0001: 3.7} VaR = - (dailyReturns['portfolio_returns'].mean() - z_score[alpha] * dailyReturns['portfolio_returns'].std()) if VaR < 0: VaR = 0 print('VaR: ' + str(VaR)) return VaR # 计算股票组合在给定风险损失的情况下最大持仓金额 def fun_get_equity_value(bar_dict, stock_weight, risk_money, max_risk_money, alpha, position_ratio): # 计算组合每日收益 df_daily_returns = fun_get_portfolio_daily_return(bar_dict, stock_weight) # 计算组合ES值 portfolio_ES = fun_get_portfolio_ES(df_daily_returns.copy(), alpha) # 计算组合VaR值 portfolio_VaR = fun_get_portfolio_VaR(df_daily_returns.copy(), alpha) # 组合ES和VaR风险均为0 if (portfolio_ES) == 0 and (portfolio_VaR) == 0: equity_value = context.positions_value * position_ratio print('组合风险评估为0, 请检查数据。') return equity_value if portfolio_ES == 0: print('ES = 0') equity_value = risk_money / portfolio_VaR elif portfolio_VaR == 0: print('VaR = 0') equity_value = max_risk_money / portfolio_ES else: equity_value = min(risk_money / portfolio_VaR, max_risk_money / portfolio_ES) return equity_value stock_num = len(stock_weight) risk_money = context.portfolio.portfolio_value * stock_num * context.risk_limit * position_ratio max_risk_money = risk_money * 1.5 # 股票组合在给定风险损失的情况下,最大持仓金额 stock_value = 0 if stock_weight: stock_value = fun_get_equity_value(bar_dict, stock_weight, risk_money, max_risk_money, context.confidenceLevel, position_ratio) stock_ratio = 0 # 股票持仓比例 bond_ratio = 0 # 债券持仓比例 total_value = context.portfolio.portfolio_value * position_ratio # 最大持仓金额(包括股票和债券) if stock_value > total_value: bond_ratio = 0 stock_ratio = 1.0 * position_ratio else: stock_ratio = (stock_value / total_value) * position_ratio bond_ratio = (1 - (stock_value / total_value)) * position_ratio print('stock_value: ' + str(stock_value)) print('total_value: ' + str(total_value)) trade_ratio = {} for stock in stock_weight: if stock in trade_ratio: trade_ratio[stock] += round((stock_weight[stock] * stock_ratio), 3) else: trade_ratio[stock] = round((stock_weight[stock] * stock_ratio), 3) for stock in bond_weight: if stock in trade_ratio: trade_ratio[stock] += round((bond_weight[stock] * bond_ratio), 3) else: trade_ratio[stock] = round((bond_weight[stock] * bond_ratio), 3) return trade_ratio
#### 10.根据指定的仓位下单 #############################################################def fun_do_trade(context, bar_dict, stock_position): for stock in list(context.portfolio.stock_account.positions.keys()): if stock not in stock_position: order_target_percent(stock, 0) else: order_target_percent(stock, stock_position[stock]) for stock in stock_position: if stock not in list(context.portfolio.stock_account.positions.keys()): order_target_percent(stock, stock_position[stock]) print('当前持仓: ') print(list(context.portfolio.stock_account.positions.keys())) #### 11.中位数去极值函数 ################################################################def winsorize(df, factor, n=20): ''' df为bar_dictFrame数据 factor为需要去极值的列名称 n 为判断极值上下边界的常数 ''' # 提取该列的数据 ls_raw = np.array(df[factor].values) # 排序 ls_raw.sort(axis = 0) # 获取中位数 D_M = np.median(ls_raw) # 计算离差值 ls_deviation = abs(ls_raw - D_M) # 排序 ls_deviation.sort(axis = 0) # 获取离差中位数 D_MAD = np.median(ls_deviation) # 将大于中位数n倍离差中位数的值赋为NaN df.loc[df[factor] >= D_M + n * D_MAD, factor] = None # 将小于中位数n倍离差中位数的值赋为NaN df.loc[df[factor] <= D_M - n * D_MAD, factor] = None return df

3.詹姆斯.奥肖内西价值投资法

      詹姆斯·奥肖内西是一位著名的投资者,他的投资方法深受定量分析的影响,尤其是在价值投资领域中。他的方法通过使用历史数据和复杂的数学模型来识别被低估的股票,从而尝试系统地战胜市场。奥肖内西在他的书籍和研究中提出了几种策略,其中最著名的是“怎样击败市场”(How to Beat the Market)系列。通过这些策略,奥肖内西希望能找到市场中的误价股票,并通过长期持有这些股票获得超额回报。他的方法为价值投资提供了一种更科学、系统的方式,使其区别于更传统的、主观判断较多的价值投资策略。

奥肖内西的独特之处:

  • 定量方法:他强调使用客观、可重复的方法来选择股票,减少人为情感和偏见的影响。

  • 多因子模型:不只依赖单一财务指标,而是综合考虑多个因素来评估股票的价值。

  • 历史数据验证:他的所有策略都基于历史数据的广泛测试,以证明其有效性。

通过这些策略,奥肖内西希望能找到市场中的误价股票,并通过长期持有这些股票获得超额回报。他的方法为价值投资提供了一种更科学、系统的方式,使其区别于更传统的、主观判断较多的价值投资策略。

'''策略选股A. 股票的市值大于市场的中位数B. 股票的股本大于市场的中位数C. 股票的市现率大于0,从小到大排列,取前400只股票D. 股票的市销率大于0,从小到大排列,取前400只股票E. 股票的股息率从大到小排列,取前400只股票F. 取上述5个条件满足下的前30只股票交易方式:按月调仓止损方式A. 当个股价格低于成本价的7%时,卖出该股票B. 当5日内大盘下跌13%时,卖出所有股票
'''
from datetime import timedelta, dateimport pandas as pd
############################## 以下为主要函数 ################################# 初始化函数 ##################################################################
def init(context): # 设置手续费为交易额的0.02%,最少5元 set_commission(PerShare(type='stock', cost=0.0003, min_trade_cost=5.0)) # 设置可变滑点,买入成交价 = 委托价 * (1 + 0.1%),卖出成交价 = 委托价 * (1 - 0.1%); set_slippage(PriceSlippage(0.002)) context.selected = 400 context.n = 30 # 持股数 context.trade_date = range(1,13,1) ## 按月调用程序 run_monthly(trade,date_rule=-1) # 月末调仓函数 #################################################################def trade(context, bar_dict): date = get_datetime() months = get_datetime().month if months in context.trade_date: ##获得购买股票列表 market_cap_list = stocks_market_cap(context, bar_dict) PCF_list = stocks_PCF(context, bar_dict) PS_list = stocks_PS(context, bar_dict) capitalization_list = stocks_capitalization(context, bar_dict) DY_list = stocks_DY(context, bar_dict) ## 获得满足每种条件的股票池 stock_list = list(set(market_cap_list)&set(PCF_list)&set(PS_list)&set(capitalization_list)&set(DY_list)) log.info(len(stock_list)) ## 卖出 if len(context.portfolio.positions) > 0: for stock in list(context.portfolio.positions): if stock not in stock_list: order_target(stock, 0) ## 买入 if len(stock_list) > 0: for stock in stock_list: if stock not in list(context.portfolio.positions): if len(context.portfolio.positions) < context.n : number = context.n - len(context.portfolio.positions) order_value(stock,context.portfolio.available_cash/number) else: order_value(stock,context.portfolio.available_cash) else: pass
# 每日检查止损条件 ############################################################# def handle_bar(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') if len(context.portfolio.positions) > 0: # 止损:个股跌幅超过8%,卖出 securities = list(context.portfolio.positions) for stock in securities: price = history(stock, ['close'], 1, '1d', False,'pre') if context.portfolio.positions[stock].cost_basis/price['close'][0]-1 < -0.08: order_target(stock, 0) #log.info('%s 止损:%s' %(last_date,stock)) #止损:5天内大盘下跌13%,卖出 price_bench = history('000300.SH', ['close'], 5, '1d', False,'pre') if price_bench['close'][-5]/price_bench['close'][-1]-1 > 0.13: if len(list(context.portfolio.positions))>0: for stock in list(context.portfolio.positions): order_target(stock, 0)
################## 以下为功能函数, 在主要函数中调用 ##########################
# 1 根据市值来筛选股票列表def stocks_market_cap(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') market_cap = get_fundamentals(query( valuation.symbol, valuation.market_cap ).order_by( valuation.market_cap.desc() ),date = last_date) length = len(market_cap) market_cap = market_cap[:int(length/2)] return list(market_cap['valuation_symbol'])
# 2. 根据股本来筛选股票列表def stocks_capitalization(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') capitalization = get_fundamentals(query( valuation.symbol, valuation.capitalization ).order_by( valuation.capitalization.desc() ),date = last_date) length = len(capitalization) capitalization = capitalization[:int(length/2)] return list(capitalization['valuation_symbol'])
# 3. 根据市现率来筛选股票列表def stocks_PCF(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') PCF = get_fundamentals(query( valuation.symbol, valuation.pcf ).filter( valuation.pcf > 0 ).order_by( valuation.pcf.asc() ).limit( context.selected ),date = last_date) return list(PCF['valuation_symbol'])
# 4. 根据市销率来筛选股票列表def stocks_PS(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') PS = get_fundamentals(query( valuation.symbol, valuation.ps ).filter( valuation.ps>0 ).order_by( valuation.ps.asc() ),date = last_date) return list(PS['valuation_symbol'])
# 5. 根据股息率(每股收益/每股市价代替)来筛选股票列表
def stocks_DY(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') EPS = get_fundamentals(query( income.symbol, income.basic_eps ),date = last_date) stock_List = list(EPS['income_symbol']) close_price = history(stock_List,['close'],1,'1d',True,None) DY_stock = dict(zip(EPS['income_symbol'],EPS['income_basic_eps'])) log.info(len(DY_stock)-len(EPS['income_symbol'])) for stock in stock_List: try: DY_stock[stock] = DY_stock[stock]/close_price[stock]['close'][0] except: DY_stock[stock] = 0 DY_stock = sorted(DY_stock.items(),key=lambda t:t[1],reverse=True) return list(dict(DY_stock[:context.selected]).keys())

注:部分数据来源于同花顺


后续会针对小白手把手创建最简单策略,以及对于API解读,马上要燃尽了,但行好事,莫问前程。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约