此部分内容对于传统投资学专业有障碍,对于金融科技专业学生应该可以把握。 1.巴菲特的alpha 巴菲特的投资策略和其所实现的超额回报(即Alpha)一直是金融学界和实务界的研究焦点。Alpha是一个衡量投资性能的指标,代表了一个投资组合相对于其基准指数的超额回报。在沃伦·巴菲特的案例中,这意味着他管理的伯克希尔·哈撒韦公司的投资回报超过了一般市场的表现。 在《巴菲特的alpha》文章里,后人把巴菲特的收益分成六个维度,分别是市场,估值,规模,动量,质量和波动率六个维度。我们今天就开始复现其中的原理,除去市场维度,我们从其他五个维度分别挑选因子,总共6个,组成6因子模型。 对于因子的处理,由于因子来自不同维度,所以无需进行降维或者因子正交处理来解决它的相关性问题,所以简单进行了去极值和标准化处理 对于股票列表,已经进行剔除ST,上市未满60天的新股,停牌股和开盘涨停股 对于打分方式:针对升序因子乘以-1;针对降序因子乘以1,最后进行叠加 在择时方面,采用RSRS的方式对指数进行择时信号。 资金规模在10000000,20日调仓,回测时间从2010-01-01到2018-11-08 ''' 收益还可以,可以使用
'''
import pandas as pd import numpy as np import datetime as dt import talib as ta from datetime import date,timedelta import statsmodels.api as sm
#初始化账户 def init(context): set_params(context) set_variables(context) set_backtest() run_daily(stop_loss) #设置策参数 def set_params(context):
g.tc = 20 #调仓频率 g.t=0 g.big_small = 'big' #big是降序,small为升序 context.stock = '000300.SH' g.long_pct = 0.05 g.stock='000300.SH' #择时选取的指数 g.total_positionprevious=0 #仓位 g.N = 18 #RSRS选取的回归长度 g.M = 1100 #RSRS均值窗口 def set_variables(context): context.X_length=11 context.flag=True g.buy = 0.7 #买入阀门 g.sell = -0.7 #卖出阀门 g.ans = [] g.ans_rightdev= [] def set_backtest(): set_benchmark('000300.SH') # 设置基准 set_slippage(PriceSlippage(0.002)) # 设置可变滑点
def before_trading(context): #需要先建立过去的数据集合,否则后面新数据没有历史数据作为窗口 if context.flag: initlast_date=context.now-timedelta(days=1) prices = get_price(g.stock, '2006-06-05', initlast_date, '1d', ['high', 'low']) #获取最高价和最低价 highs = prices.high lows = prices.low #建立一个初始的装有beta从过去到初始阶段的历史数据的列表g.ans g.ans = [] for i in range(len(highs))[g.N:]: data_high = highs.iloc[i-g.N+1:i+1] data_low = lows.iloc[i-g.N+1:i+1] X = sm.add_constant(data_low) model = sm.OLS(data_high,X) results = model.fit() g.ans.append(results.params[1]) # 装有rsquare从过去到初始阶段历史数据的列表 g.ans_rightdev.append(results.rsquared) context.flag=False
#个股止损 def stop_loss(context,bar_dict): for stock in list(context.portfolio.positions): cumulative_return=bar_dict[stock].close/context.portfolio.positions[stock].cost_basis if cumulative_return<0.9: order_target_value(stock,0)
def handle_bar(context,bar_dict): stock = g.stock beta=0 r2=0 prices = history(stock,['high', 'low'], g.N, '1d', False, 'pre', is_panel=1) highs = prices.high lows = prices.low X = sm.add_constant(lows) model = sm.OLS(highs, X) #得到beta beta = model.fit().params[1] #将新的beta添加到装有历史数据列表 g.ans.append(beta) #得到rsquare数据 r2=model.fit().rsquared #将新的rsquare添加到装有历史数据列表 g.ans_rightdev.append(r2) #为了标准化当下的beta数值,拿过去1100天的数据作为均值的窗口 section = g.ans[-g.M:] # 计算均值序列 mu = np.mean(section) # 计算标准化RSRS指标序列 sigma = np.std(section) zscore = (section[-1]-mu)/sigma #计算右偏RSRS标准分,就是将标准化后的beta数据乘以原始beta再乘以拟合度 zscore_rightdev= zscore*beta*r2 #根据交易信号买入卖出 if zscore_rightdev > g.buy: total_position=1 elif zscore_rightdev < g.sell: total_position=0 else: total_position=g.total_positionprevious if (g.total_positionprevious != total_position) or (g.t%g.tc==0): g.total_positionprevious=total_position last_date=get_last_datetime().strftime('%Y%m%d') stock_list=list(get_all_securities('stock',date=last_date).index) #对stock_list进行去除st,停牌等处理 stock_list=fun_unpaused(bar_dict, stock_list) stock_list=fun_st(bar_dict,stock_list) stock_list=fun_highlimit(bar_dict,stock_list) stock_list=fun_remove_new(stock_list, 60) #以下是各单因子 #规模因子 cap_df = market_cap(stock_list, 'valuation_market_cap',last_date) cap_df = cap_df * -1 #估值因子 PB_df = PB(stock_list, 'valuation_pb',last_date) PB_df = PB_df * -1 #动量因子 MTM20_df = MTM20(stock_list, 'MTM20') MTM20_df=MTM20_df* -1 #质量因子 #1.ROE(高利润) roe_df = roe(stock_list, 'profit_roe_ths',last_date) #2.净利润同比增长率(高成长) net_profit_growth_ratio_df=net_profit_growth_ratio(stock_list,'growth_net_profit_growth_ratio',last_date) #波动率因子 ATR20_df = ATR20(stock_list, 'ATR20') ATR20_df = ATR20_df * -1
#合并多因子 concat_obj = [cap_df, PB_df,MTM20_df,roe_df,net_profit_growth_ratio_df,ATR20_df] df = pd.concat(concat_obj, axis=1) df = df.dropna() # log.info(type(df)) sum = df.sum(axis=1) #log.info(sum)
#进行排序 if g.big_small == 'big': # 按照大排序 sum.sort_values(ascending = False,inplace=True) if g.big_small == 'small': # 按照小排序 sum.sort_values(ascending = True,inplace=True) # 根据比例取出排序后靠前部分 stock_list1 = sum[0:int(len(stock_list)*g.long_pct)].index #log.info(stock_list1) buy_list = [] for stock in stock_list1: buy_list.append(stock) #买卖操作 for stock in list(context.portfolio.positions): if stock not in buy_list: order_target(stock, 0) cash = context.portfolio.portfolio_value position=cash*g.total_positionprevious # position=cash*g.SAR_signal num=int(len(stock_list)*g.long_pct) ## 买入 for stock in buy_list: order_target_value(stock,position/num)
g.t=g.t+1
''' 以下是单因子 ''' def market_cap(stocklist, factor,last_date): # 取数据 df = get_fundamentals(query(valuation.symbol, valuation.market_cap).filter(valuation.symbol.in_(stocklist)),date=last_date) #log.info(df)
df = df.set_index('valuation_symbol') # 绝对中位数法取极值 after_MAD = MAD(factor, df) # z-score法标准化 after_zscore = zscore(factor, after_MAD) return after_zscore def PB(stocklist, factor,last_date): # 取数据 df = get_fundamentals(query(valuation.symbol, valuation.pb).filter(valuation.symbol.in_(stocklist)),date=last_date) df = df.set_index('valuation_symbol') # 绝对中位数法取极值 after_MAD = MAD(factor, df) # z-score法标准化 after_zscore = zscore(factor, after_MAD) return after_zscore
def MTM20(stocklist, factor): # 取数据 for stock in stocklist: df1=history(stock,['close'],20,'1d') # log.info(df1) s = pd.DataFrame([(df1['close'][-1]-df1['close'][0])/df1['close'][0]], index=[stock]) # log.info(s) if 'df' in locals(): df = df.append(s) else: df = s #log.info(df) df.columns = ['MTM20'] df.index.name = 'valuation_symbol' # 绝对中位数法取极值 after_MAD = MAD(factor, df) # z-score法标准化 after_zscore = zscore(factor, after_MAD) return after_zscore
def roe(stocklist, factor,last_date): # 取数据 df = get_fundamentals(query(valuation.symbol, profit.roe_ths).filter(valuation.symbol.in_(stocklist)),date=last_date) # log.info(df) df = df.set_index('valuation_symbol') # 绝对中位数法取极值 after_MAD = MAD(factor, df) # z-score法标准化 after_zscore = zscore(factor, after_MAD) return after_zscore
def net_profit_growth_ratio(stocklist, factor,last_date): # 取数据 df = get_fundamentals(query(valuation.symbol, growth.net_profit_growth_ratio).filter(valuation.symbol.in_(stocklist)),date=last_date) # log.info(df) df = df.set_index('valuation_symbol') # 绝对中位数法取极值 after_MAD = MAD(factor, df) # z-score法标准化 after_zscore = zscore(factor, after_MAD) return after_zscore
def ATR20(stocklist, new_factor): # 取数据 for stock in stocklist: Data_ATR = history(stock,['close','high','low'],20,'1d') close_ATR = np.array(Data_ATR['close']) high_ATR = np.array(Data_ATR['high']) low_ATR = np.array(Data_ATR['low']) ''' if np.isnan(close_ATR).any(): continue ''' ATR = ta.ATR(high_ATR, low_ATR, close_ATR, timeperiod=1) indices = ~np.isnan(ATR) result = np.average(ATR[indices]) s = pd.Series(result.astype(float), index=[stock]) if 'ATR_df' in locals(): ATR_df = ATR_df.append(s) else: ATR_df = s df = ATR_df.to_frame() df.index.name = 'valuation_symbol' df.columns = [new_factor] # 绝对中位数法取极值 after_MAD = MAD(new_factor, df) # z-score法标准化 after_zscore = zscore(new_factor, after_MAD) return after_zscore
''' 以下是进行因子数据处理,对因子进行MAD去极值,以及标准化处理 ''' def MAD(factor, df): # 取得中位数 median = df[factor].median() # 取得数据与中位数差值 df1 = df-median # 取得差值绝对值 df1 = df1.abs() # 取得绝对中位数 MAD = df1[factor].median() # 得到数据上下边界 extreme_upper = median + 3 * 1.483 * MAD extreme_lower = median - 3 * 1.483 * MAD # 将数据上下边界外的数值归到边界上 df.ix[(df[factor]<extreme_lower), factor] = extreme_lower df.ix[(df[factor]>extreme_upper), factor] = extreme_upper return df
# z-score标准化 def zscore(factor, df): # 取得均值 mean = df[factor].mean() # 取得标准差 std = df[factor].std() # 取得标准化后数据 df = (df - mean) / std return df
''' 以下对股票列表进行去除ST,停牌,去新股,以及去除开盘涨停股 ''' #去除开盘涨停股票 def fun_highlimit(bar_dict,stock_list): return [stock for stock in stock_list if bar_dict[stock].open!=bar_dict[stock].high_limit]
#去除st股票 def fun_st(bar_dict,stock_list): return [stock for stock in stock_list if not bar_dict[stock].is_st]
def fun_unpaused(bar_dict, stock_list):
return [s for s in stock_list if not bar_dict[s].is_paused]
def fun_remove_new(_stock_list, days): deltaDate = get_datetime() - dt.timedelta(days) stock_list = [] for stock in _stock_list: if get_security_info(stock).listed_date < deltaDate: stock_list.append(stock) return stock_list
2.彼得林奇PEG价值选股策略 彼得·林奇是一位非常成功的投资者和前麦哲伦基金的基金经理,他提出了一种被广泛采用的价值选股策略,即使用市盈率相对盈利增长率(PEG)作为衡量股票价值的主要指标。PEG比率是一种受到投资者喜爱的指标,因为它试图通过考虑公司的增长来提供比传统市盈率(PE)更全面的股票估值方法。 策略名称: 彼得林奇PEG价值选股策略 策略思路: 1.选择PEG < 0.5, 即稳定成长且价值被低估的股票 其中PEG = PE / growth_rate 2.使用ES风险平价配权 3.根据组合的日内波动小于3%的条件, 与货币基金组合配资 4.最大持仓5只股票和1只货币基金, 优先买入市值小的, 15天调仓一次 5.剔除了周期性和项目类行业(该部分对改善回撤有明显的效果)
3.詹姆斯.奥肖内西价值投资法 詹姆斯·奥肖内西是一位著名的投资者,他的投资方法深受定量分析的影响,尤其是在价值投资领域中。他的方法通过使用历史数据和复杂的数学模型来识别被低估的股票,从而尝试系统地战胜市场。奥肖内西在他的书籍和研究中提出了几种策略,其中最著名的是“怎样击败市场”(How to Beat the Market)系列。通过这些策略,奥肖内西希望能找到市场中的误价股票,并通过长期持有这些股票获得超额回报。他的方法为价值投资提供了一种更科学、系统的方式,使其区别于更传统的、主观判断较多的价值投资策略。 奥肖内西的独特之处:
通过这些策略,奥肖内西希望能找到市场中的误价股票,并通过长期持有这些股票获得超额回报。他的方法为价值投资提供了一种更科学、系统的方式,使其区别于更传统的、主观判断较多的价值投资策略。 ''' 策略选股 A. 股票的市值大于市场的中位数 B. 股票的股本大于市场的中位数 C. 股票的市现率大于0,从小到大排列,取前400只股票 D. 股票的市销率大于0,从小到大排列,取前400只股票 E. 股票的股息率从大到小排列,取前400只股票 F. 取上述5个条件满足下的前30只股票 交易方式:按月调仓 止损方式 A. 当个股价格低于成本价的7%时,卖出该股票 B. 当5日内大盘下跌13%时,卖出所有股票
'''
from datetime import timedelta, date import pandas as pd
############################## 以下为主要函数 ################################ # 初始化函数 ##################################################################
def init(context): # 设置手续费为交易额的0.02%,最少5元 set_commission(PerShare(type='stock', cost=0.0003, min_trade_cost=5.0)) # 设置可变滑点,买入成交价 = 委托价 * (1 + 0.1%),卖出成交价 = 委托价 * (1 - 0.1%); set_slippage(PriceSlippage(0.002)) context.selected = 400 context.n = 30 # 持股数 context.trade_date = range(1,13,1) ## 按月调用程序 run_monthly(trade,date_rule=-1) # 月末调仓函数 ################################################################# def trade(context, bar_dict): date = get_datetime() months = get_datetime().month if months in context.trade_date: ##获得购买股票列表 market_cap_list = stocks_market_cap(context, bar_dict) PCF_list = stocks_PCF(context, bar_dict) PS_list = stocks_PS(context, bar_dict) capitalization_list = stocks_capitalization(context, bar_dict) DY_list = stocks_DY(context, bar_dict) ## 获得满足每种条件的股票池 stock_list = list(set(market_cap_list)&set(PCF_list)&set(PS_list)&set(capitalization_list)&set(DY_list)) log.info(len(stock_list)) ## 卖出 if len(context.portfolio.positions) > 0: for stock in list(context.portfolio.positions): if stock not in stock_list: order_target(stock, 0) ## 买入 if len(stock_list) > 0: for stock in stock_list: if stock not in list(context.portfolio.positions): if len(context.portfolio.positions) < context.n : number = context.n - len(context.portfolio.positions) order_value(stock,context.portfolio.available_cash/number) else: order_value(stock,context.portfolio.available_cash) else: pass
# 每日检查止损条件 ############################################################# def handle_bar(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') if len(context.portfolio.positions) > 0: # 止损:个股跌幅超过8%,卖出 securities = list(context.portfolio.positions) for stock in securities: price = history(stock, ['close'], 1, '1d', False,'pre') if context.portfolio.positions[stock].cost_basis/price['close'][0]-1 < -0.08: order_target(stock, 0) #log.info('%s 止损:%s' %(last_date,stock)) #止损:5天内大盘下跌13%,卖出 price_bench = history('000300.SH', ['close'], 5, '1d', False,'pre') if price_bench['close'][-5]/price_bench['close'][-1]-1 > 0.13: if len(list(context.portfolio.positions))>0: for stock in list(context.portfolio.positions): order_target(stock, 0)
################## 以下为功能函数, 在主要函数中调用 ##########################
# 1 根据市值来筛选股票列表 def stocks_market_cap(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') market_cap = get_fundamentals(query( valuation.symbol, valuation.market_cap ).order_by( valuation.market_cap.desc() ),date = last_date) length = len(market_cap) market_cap = market_cap[:int(length/2)] return list(market_cap['valuation_symbol'])
# 2. 根据股本来筛选股票列表 def stocks_capitalization(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') capitalization = get_fundamentals(query( valuation.symbol, valuation.capitalization ).order_by( valuation.capitalization.desc() ),date = last_date) length = len(capitalization) capitalization = capitalization[:int(length/2)] return list(capitalization['valuation_symbol'])
# 3. 根据市现率来筛选股票列表 def stocks_PCF(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') PCF = get_fundamentals(query( valuation.symbol, valuation.pcf ).filter( valuation.pcf > 0 ).order_by( valuation.pcf.asc() ).limit( context.selected ),date = last_date) return list(PCF['valuation_symbol'])
# 4. 根据市销率来筛选股票列表 def stocks_PS(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') PS = get_fundamentals(query( valuation.symbol, valuation.ps ).filter( valuation.ps>0 ).order_by( valuation.ps.asc() ),date = last_date) return list(PS['valuation_symbol'])
# 5. 根据股息率(每股收益/每股市价代替)来筛选股票列表
def stocks_DY(context, bar_dict): last_date = get_last_datetime().strftime('%Y%m%d') EPS = get_fundamentals(query( income.symbol, income.basic_eps ),date = last_date) stock_List = list(EPS['income_symbol']) close_price = history(stock_List,['close'],1,'1d',True,None) DY_stock = dict(zip(EPS['income_symbol'],EPS['income_basic_eps'])) log.info(len(DY_stock)-len(EPS['income_symbol'])) for stock in stock_List: try: DY_stock[stock] = DY_stock[stock]/close_price[stock]['close'][0] except: DY_stock[stock] = 0 DY_stock = sorted(DY_stock.items(),key=lambda t:t[1],reverse=True) return list(dict(DY_stock[:context.selected]).keys()) 注:部分数据来源于同花顺 后续会针对小白手把手创建最简单策略,以及对于API解读,马上要燃尽了,但行好事,莫问前程。 |
|