本文涉及到的个股市场数据来自开源量化框架QUANTAXIS,涉及到的质量(mass)数据来自开源数据引擎原版tushare和湘财证券版tushare。需安装好QUANTAXIS,tushare,xcsc_tushare等模块,如果你不需要湘财证券版tushare的数据,也可以只安装前面两个模块。
源代码粘贴如下(回测效果图在源代码后面):
# -*- coding: utf-8 -*-"""@author: yinxiuqu@license: GNU General Public License v3.0@contact: yinxiuqu@qq.com@file: physics.py@time: 2020/7/7 下午9:48@Software: PyCharm本文件定义简单物理模型进行金融建模共有三种动能计算方法,分别为:1、日内(intraday):今收/今开 - 12、隔夜(overnight):今开/昨收 - 13、正常(normal):今收/昨收 - 1"""# 导入相应模块from datetime import datetime, timedeltaimport pandas as pdimport numpy as npimport copyimport QUANTAXIS as QAimport tushare as tsimport xcsc_tushare as xcts# 预处理数据接口和常量ts.set_token('bf1ec1f2e80391feae35a46db063501ad03accf588990dxxxxxxxxxxxxxx')# 此处请用你自己的tokenxcts.set_token('96e5167f90449e742407e3dc09de7795fd13edfee80xxxxxxxxxxxxxxx')# 此处请用你自己的tokenpro = ts.pro_api()xcpro = xcts.pro_api(env='prd')now = datetime.now().strftime('%Y-%m-%d')# 定义从QUANTAXIS获取个股日线数据的函数def get_stock_day_QA(start, end=None, code=None):""":param code:str type,stock code:param start:str type,start date:param end:str type,end date:return:multi-index sources-frame"""# 处理默认结束日期if end is None or end > now or end is 'now':end = now# 如果结束日为今天且今天是交易日且没收盘if end == now and QA.QA_util_if_trade(end) and '09:30' < datetime.now().strftime('%H:%M') < '15:00':# 盘中在线获取个股列表if code is None:code = QA.QAFetch.QATdx.QA_fetch_get_stock_list('stock').index.get_level_values(0).to_list()# 如果起止日期不是同一个交易日if start != end:df1 = QA.QA_fetch_stock_day_adv(code=code, start=start, end=QA.QA_util_get_last_day(end, 1)).datadf2 = QA.QAFetch.QATdx.QA_fetch_get_stock_latest(code=code)if df2 is not None:df2 = df2.loc[now].drop_duplicates()df2.rename({'vol': 'volume'}, axis='columns', inplace=True)df2.set_index([df2.index, 'code'], drop=True, inplace=True)df2 = df2[list(df1.columns)]df = df1.append(df2)# 如果起止日期是同一个交易日,只获取盘中数据(因为还没收盘)else:df = QA.QAFetch.QATdx.QA_fetch_get_stock_latest(code=code)df = df.loc[now].drop_duplicates()df.rename({'vol': 'volume'}, axis='columns', inplace=True)df.set_index([df.index, 'code'], drop=True, inplace=True)df = df[['open', 'high', 'low', 'close', 'volume', 'amount']]# 在非交易日或者交易日的非交易时间段内else:# 从数据库获取个股列表if code is None:code = QA.QA_fetch_stock_list_adv().index.to_list()df = QA.QA_fetch_stock_day_adv(code=code, start=start, end=end).datareturn df# 将多维列表打平为一维列表的函数(因为python3已经不支持列表flatten了)def flatten_list(input_list):"""将多维列表打平为一维列表的函数,因为python3已经不支持列表flatten。:param input_list: 输入的多维列表:return: 返回打通后的一维列表"""output_list = []while True:if input_list == []:breakfor index, value in enumerate(input_list):# index :索引序列 value:索引序列对应的值# enumerate() 函数用于将一个可遍历的数据对象(如列表、元组或字符串)组合为一个索引序列,# 同时列出数据和数据下标,一般用在 for 循环当中。if type(value) == list:input_list = value + input_list[index+1:]break # 这里跳出for循环后,从While循环进入的时候index是更新后的input_list新开始算的。else:output_list.append(value)input_list.pop(index)breakreturn output_list# 定义股票代码风格变换函数,本函数仅改变单个代码,即字符串代码def change_codestyle(code, style):""":param code: str:param style: 'QA'/'ts'/'bs'/'jq'分别为QUANTAXIS、tushare、baostock和joinquant风格:return: 返回改变风格后的code"""# 特殊符号做code保持不变,例如'',' '等if len(code) <= 3:code = codeelse:if style == 'QA':if code.startswith('sh') or code.startswith('sz'):code = code[3:]elif code.endswith('SH') or code.endswith('SZ'):code = code[:-3]elif code[-1].isdigit() and code[0].isdigit():code = codeelif code.endswith('XSHG') or code.endswith('XSHE'):code = code[:-5]elif style == 'ts':if code.startswith('sh') or code.startswith('sz'):code = code[3:] + '.' + code[:2].upper()elif code.endswith('SH') or code.endswith('SZ'):code = codeelif code[-1].isdigit() and code[0].isdigit():if code[0] == '6':code = code + '.SH'else:code = code + '.SZ'elif code.endswith('XSHG') or code.endswith('XSHE'):if code.endswith('XSHG'):code = code[:-5] + '.SH'else:code = code[:-5] + '.SZ'elif style == 'bs':if code.startswith('sh') or code.startswith('sz'):code = codeelif code.endswith('SH') or code.endswith('SZ'):code = code[-2:].lower() + '.' + code[:6]elif code[-1].isdigit() and code[0].isdigit():if code[0] == '6':code = 'sh.' + codeelse:code = 'sz.' + codeelif code.endswith('XSHG') or code.endswith('XSHE'):if code.endswith('XSHG'):code = 'sh.' + code[:-5]else:code = 'sz.' + code[:-5]elif style == 'jq':if code.startswith('sh') or code.startswith('sz'):if code.startswith('sh'):code = code[3:] + '.XSHG'else:code = code[3:] + '.XSHE'elif code.endswith('SH') or code.endswith('SZ'):if code.endswith('SH'):code = code[:-3] + '.XSHG'else:code = code[:-3] + '.XSHE'elif code[-1].isdigit() and code[0].isdigit():if code[0] == '6':code = code + '.XSHG'else:code = code + '.XSHE'elif code.endswith('XSHG') or code.endswith('XSHE'):code = codereturn code# 定义code风格改变函数,code可以是字符串或列表。def change_codeformat(code, style):""":param code: str or list:param style: 'QA'/'ts'/'bs'/'jq'分别为QUANTAXIS/tushare/baostock/joinquant风格:return: 返回改变风格后的code"""if type(code) is str:code = change_codestyle(code, style)elif type(code) is list:code = [change_codestyle(i, style) for i in code]elif type(code) is pd.Series:index = code.indexcode = code.values.tolist()code = [change_codestyle(i, style) for i in code]code = pd.Series(code, index=index)return code# 定义日期格式转换函数,'2018-01-01'、'20180101'、'2018/01/01'、20180101等格式互相转化def change_dateformat(date, sep='-'):""":param date: 输入的日期:param sep: 日期中的间隔符号"""if type(date) is int:date = str(date)elif type(date) is datetime:date = datetime.strftime(date.date(), '%Y%m%d')elif date is None:date = dateelif type(date) is str:if sep == '-':if '-' not in date:if '/' not in date:date = date[:4] + '-' + date[4:6] + '-' + date[6:]elif '/' in date:date = date.replace('/', '-')if sep == '/':if '/' not in date:if '-' not in date:date = date[:4] + '/' + date[4:6] + '/' + date[6:]elif '-' in date:date = date.replace('-', '/')if sep is None:if '/' in date:date = date.replace('/', '')elif '-' in date:date = date.replace('-', '')if sep == '':if '/' in date:date = date.replace('/', '')elif '-' in date:date = date.replace('-', '')return date# 定义获取回测期间选股、换股日期节点的函数。def date_node(start, end, freq='m', n=None):""":param start: 回测起始日期:param end: 回测终止日期:param freq: 选股、换股频率,'d'/'w'/'m'/'q'/'y':分别表示日、周、月、季、年当freq='w'时,按照每5个交易日取样(避免某些周无交易日或只有一两个交易日)。当freq = None时,时间节点按n个交易日确定,n和freq不能同时为None。:param n: 选股、换股频率,为n个交易日。n和freq不能同时为None,必有一个为None。:return: 返回选股、换股时间节点的日期,即每个频率周期段的最后一个交易日"""# 将日期统一成QA风格形式start = change_dateformat(start, sep='-')end = change_dateformat(end, sep='-')date_range = QA.QA_util_get_trade_range(start, end)df_date_range = pd.DataFrame(data=date_range, index=date_range)df_date_range.index = pd.to_datetime(df_date_range.index)if n is None:if freq == 'w':df_date_range = df_date_range.reset_index()df_date_range = df_date_range[df_date_range.index % 5 == 0]df_date_range = df_date_range.set_index('index')elif freq == 'd':df_date_range = df_date_rangeelse:df_date_range = df_date_range.resample(rule=freq).first() # 以周期初为时间节点else:if n == 1:df_date_range = df_date_rangeelse:df_date_range = df_date_range.reset_index()df_date_range = df_date_range[df_date_range.index % n == 0]df_date_range = df_date_range.set_index('index')list_date_range = list(df_date_range.iloc[:, 0])df = pd.DataFrame(data=list_date_range, index=list_date_range)df.index = pd.to_datetime(df.index)df.columns = ['date']df = df.dropna() # 排除日线取样时的nanreturn df# 定期换股函数,按传入的选股序列,计算买股、卖股、换股的序列def exchange_stocks(df, start, end, buy_today=False, h_days=None):""":param df: 传入的股票序列,以日期为index,包含'code'列:param start: 开始日期:param end: 结束日期:param buy_today: 是否当天买:False为第二天买,True为当天买:param h_days: 为None时按照换股节点换股,为数字时按照数字代表的交易日数换股:return: 返回买股、卖股、换股的序列"""# 处理缺省值if end is None:end = now# 将日期统一成QA风格形式start = change_dateformat(start, sep='-')end = change_dateformat(end, sep='-')# 如果按照节点换股if h_days is None:# 根据每个节点所选股票列表,确定卖股节点为下一个日期节点# 注意并非推迟一个交易日,而是推迟一个节点!df = df.assign(sell_code=df.code.shift(1))# 如果不是选股当天买入或换股,而是第二个交易日if buy_today is False:# 如果换股频率不是每天(如果连续四个节点相差不止一个交易日,可确保节点不是每天换股):if df.index[4] - df.index[3] != timedelta(days=1) or \df.index[3] - df.index[2] != timedelta(days=1) or \df.index[2] - df.index[1] != timedelta(days=1) or \df.index[1] - df.index[0] != timedelta(days=1):# 恢复成日采样形式,便于推算后一个交易日df = df.resample('d').last()# 去掉非交易日行trade_day_index = QA.QA_util_get_trade_range(start, end)trade_day_index = pd.to_datetime(trade_day_index)df = df.reindex(trade_day_index)# 此处买入为推后一个交易日,并非推迟一个节点df = df.assign(buy_code=df.code.shift(1))# 卖股也推迟一个交易日,与买股同一个交易日,以便于判断是否需要新卖出df['sell_code'] = df.sell_code.shift(1)# 如果是选股当天买股或换股,很简单else:df = df.assign(buy_code=df.code)# 恢复成日采样形式,便于回测统一日期进度df = df.resample('d').last()# 去掉非交易日行trade_day_index = QA.QA_util_get_trade_range(start, end)trade_day_index = pd.to_datetime(trade_day_index)df = df.reindex(trade_day_index)# 对buy_code、sell_code分别和code进行去除交集运算,求出须实际买卖的股票代码列表df['new_sell_code'] = [np.nan] * len(df)df['new_buy_code'] = [np.nan] * len(df)df['new_sell_code'] = df['new_sell_code'].astype(object)df['new_buy_code'] = df['new_buy_code'].astype(object)for i in df.index:# 如果没有卖出股,只有买入股,则只买入股if (type(df.loc[i, 'sell_code']) is not list or df.loc[i, 'sell_code'] == []) and \(type(df.loc[i, 'buy_code']) is list and len(df.loc[i, 'buy_code']) > 0):df.at[i, 'new_buy_code'] = list(set(df.loc[i, 'buy_code']))# 如果买股卖股都有,则买卖都取差集elif (type(df.loc[i, 'sell_code']) is list and len(df.loc[i, 'sell_code']) > 0) \and (type(df.loc[i, 'buy_code']) is list and len(df.loc[i, 'buy_code']) > 0):df.at[i, 'new_sell_code'] = list(set(df.loc[i, 'sell_code']) - set(df.loc[i, 'buy_code']))df.at[i, 'new_buy_code'] = list(set(df.loc[i, 'buy_code']) - set(df.loc[i, 'sell_code']))# 如果没有买入股,只有卖出股,则只卖出股elif (type(df.loc[i, 'sell_code']) is list and len(df.loc[i, 'sell_code']) > 0) and \(type(df.loc[i, 'buy_code']) is not list or df.loc[i, 'buy_code'] == []):df.at[i, 'new_sell_code'] = list(set(df.loc[i, 'sell_code']))# 如果date列包含在df中,则抛弃掉if 'date' in df.columns:df = df.drop(columns='date')# 按固定交易日数持股,即持股h_days卖掉。else:date_range = pd.date_range(start, end)df_date_range = pd.DataFrame(data=date_range, index=date_range)df_date_range.index = pd.to_datetime(df_date_range.index)# 为原始code单独建立一个序列,深度复制df_code = copy.deepcopy(df)# 将code序列转化成日期为交易日的序列,此处原index为非交易日的转化后可能产生重叠的index,如周六、周日都转化为后一个周一等。df_code.index = [QA.QA_util_get_real_date(i, towards=1) for i in df_code.index]# 合并index重复项df_code = df_code.groupby(level=0, group_keys=False).apply(lambda x: x.values.tolist())df_code = pd.Series([flatten_list(i) for i in df_code], index=df_code.index)df_code = pd.DataFrame(df_code)df_code.columns = ['code']df_code.index = pd.to_datetime(df_code.index)#将原选股序列恢复成每天的形式df = df.reindex(df_date_range.index)df['buy_code'] = np.nandf['sell_code'] = np.nan# 将这两列转化为object类型,以便能够赋列表形式的值df[['buy_code', 'sell_code']] = df[['buy_code', 'sell_code']].astype(object)# 把df对应df_code日期位置的buy_code设置为df_code.buy_code的值if buy_today is True:# 为df添加(h_days)个交易日,便于最后一个交易日买的股能在h_days个交易日后卖added_trad_start = QA.QA_util_get_real_date(df.index[-1]) # 此处是real_date:如果是交易日就当天买added_trade_end = QA.QA_util_get_next_trade_date(added_trad_start, h_days)added_trade_days = QA.QA_util_get_trade_range(added_trad_start, added_trade_end)added_trade_days = pd.to_datetime(added_trade_days)df_added = pd.DataFrame([np.nan]*len(added_trade_days), index=added_trade_days)df = df.append(df_added)# 当天购买df.loc[df_code.index, 'buy_code'] = df_code.code.to_list()else:# 为df添加(1+h_days)个交易日,便于最后一天选出的个股能在下一个交易日购买,在(1+h_days)个交易日后卖added_trad_start = QA.QA_util_get_next_trade_date(df.index[-1]) # 此处是next_trade,下一个交易日买added_trade_end = QA.QA_util_get_next_trade_date(added_trad_start, h_days)added_trade_days = QA.QA_util_get_trade_range(added_trad_start, added_trade_end)added_trade_days = pd.to_datetime(added_trade_days)df_added = pd.DataFrame([np.nan]*len(added_trade_days), index=added_trade_days)df_added.columns = ['code']df = df.append(df_added, sort=True)# 推迟一个交易日购买next_trade_date = [QA.QA_util_get_next_trade_date(i) for i in df_code.index]next_trade_date = pd.to_datetime(next_trade_date)df.loc[next_trade_date, 'buy_code'] = df_code.code.to_list()# 将选股序列按h_days递延,作为卖出列表# 买入后持有h_days个交易日卖掉h_days_trade_date = [QA.QA_util_get_next_trade_date(i, h_days) for i in df_code.index]h_days_trade_date = pd.to_datetime(h_days_trade_date)df.loc[h_days_trade_date, 'sell_code'] = df_code.code.to_list()return df# 定义计算一段日期内个股日内或隔夜收益率函数(每天)def cal_stock_ret(start, end=None, code=None, method='normal'):""":param start: 开始日期:param end: 结束日期:param code: 个股代码,等于None时为市场内所有个股代码列表:param method: 计算收益率的方式,intraday为日内,overnight为隔夜,normal为正常, both为日内+隔夜的相反数:return: 返回个股收益率序列"""# 日期和代码预处理start = QA.QA_util_get_real_date(start)if end is None:end = nowif type(code) is str:code = [code]# 计算普通收益率if method == 'normal':start = QA.QA_util_get_last_day(start)# 获取个股行情数据df = get_stock_day_QA(code=code, start=start, end=end)# 将行情数据转化为QA的DataStruct数据结构,以便于复权data = QA.QA_DataStruct_Stock_day(df)# 前复权(仅计算今天盘中数据时,不能复权,其他情况都做复权处理)if end == now and QA.QA_util_if_trade(end) and \'09:30' < datetime.now().strftime('%H:%M') < '15:00' and start != end:data = data.to_qfq()ret = data.close / data.close.groupby(level=1).shift(1) - 1# 计算日内收益率elif method == 'intraday':# 获取个股行情数据df = get_stock_day_QA(code=code, start=start, end=end)# 将行情数据转化为QA的DataStruct数据结构,以便于复权data = QA.QA_DataStruct_Stock_day(df)# 前复权data = data.to_qfq()ret = data.close / data.open - 1# 计算隔夜收益率elif method == 'overnight':start = QA.QA_util_get_last_day(start)# 获取个股行情数据df = get_stock_day_QA(code=code, start=start, end=end)# 将行情数据转化为QA的DataStruct数据结构,以便于复权data = QA.QA_DataStruct_Stock_day(df)# 前复权data = data.to_qfq()ret = data.open / data.close.groupby(level=1).shift(1) - 1# 计算日内-隔夜:即日内+隔夜的相反数elif method == 'both':start = QA.QA_util_get_last_day(start)# 获取个股行情数据df = get_stock_day_QA(code=code, start=start, end=end)# 将行情数据转化为QA的DataStruct数据结构,以便于复权data = QA.QA_DataStruct_Stock_day(df)# 前复权data = data.to_qfq()# 日内收益率与隔夜收益率的差ret = data.close / data.open - data.open / data.close.groupby(level=1).shift(1)# 错误方式反馈else:ret = Noneprint('Only can calculate intraday or overnight return! Please retry!')ret = ret.dropna()return ret# 计算个股前n个交易日涨跌率的函数def cal_n_ret(code=None, n=14, m=0, date=None, assets='stock', method='normal'):""":param code: 用于计算动量的指数或个股代码列表:param n: 计算前n天的动量:param m: 排除最近m天的动量:param date: 基准日期:param assets: 计算动量的证券种类,'index' or 'stock' or 'future':param method: 计算动量的方法,'overnight' 、'intraday' or 'normal' or 'both':return: 返回各标的的动量序列"""if date is None:date = nowdate = QA.QA_util_get_real_date(date)# 过去n个交易日的日期(前一个交易日就是它本身)last_n_day = QA.QA_util_get_last_day(date=date, n=n - 1)if type(code) is str: # 如果code为字符串,即为单独一个代码而非列表,则转化为列表形式code = [code]# 如果标的为指数if assets == 'index':pass# 如果标的为个股elif assets == 'stock':ret = cal_stock_ret(start=last_n_day, end=date, code=code, method=method)else: # 为期货等预留pass# 排除前m天的涨跌率if m != 0:index2 = ret.index.get_level_values(level=0).drop_duplicates()[:-m]ret = ret.reindex(index2, level=0)# 对过去n-m个交易日收益率进行加总n_ret = ret.groupby(level=1).sum()n_ret.name = 'sum_ret'return n_ret.sort_values() # 返回排序后的动量序列# 计算一段日期内,每天前n个交易日的涨跌率的函数(上个函数仅计算一个基准日,本函数计算一段日期内每个交易日做基准日)def cal_period_ret(start, end=None, code=None, n=14, m=0, assets='index', method='normal'):""":param start: 起始日期:param end: 结束日期:param code: 用于计算动量的指数或个股代码或代码列表:param n: 计算前n天的动量:param m: 排除最近m天的动量:param assets: 计算动量的证券种类,'index' or 'stock':param method: 计算动量的方法,'overnight'、'intraday' or 'normal' or 'both':return: 返回各标的的动量序列(multiindex)"""if end is None:end = nowend = QA.QA_util_get_real_date(end) #结束日期如果为非交易日,则向前找最近交易日做结束日start = QA.QA_util_get_real_date(start, towards=1) # 开始日期如果为非交易日,则向后找最近交易日做开始日# start之前的n个交易日的日期(前一个交易日就是它本身)last_n_day = QA.QA_util_get_last_day(date=start, n=n-1)if type(code) is str: # 如果code为字符串,即为单独一个代码而非列表,则转化为列表形式code = [code]# 如果标的为指数if assets == 'index':pass# 如果标的为个股elif assets == 'stock':ret = cal_stock_ret(start=last_n_day, end=end, code=code, method=method)else: # 为期货预留passperiod_ret = ret.groupby(level=1).rolling(n - m).sum().dropna() # 按窗口移动加总收益率period_ret = period_ret.droplevel(level=2) # 上一步计算中,momentum会形成第三层index,所以抛弃第三层indexperiod_ret.name = 'sum_ret' # 设置序列名称period_ret = period_ret.swaplevel().sort_index(level=0) # 按日期排序# 下面的排序中增加了一层重复index,去掉。返回根据n日涨跌率排名后的multi_index序列(以日期和证券代码为index)。period_ret = period_ret.groupby(level=0).apply(lambda x: x.sort_values()).droplevel(0)if m != 0:period_ret = period_ret.groupby(level=1).shift(m).dropna().\groupby(level=0).apply(lambda x: x.sort_values()).droplevel(0)return period_ret# 定义计算一段日期个股涨跌幅的涨跌幅,即加速度accelerationdef cal_acceleration_period(start, end=None, code=None, n=14, m=0, assets='index', method='normal'):""":param start: 起始日期:param end: 结束日期:param code: 用于计算动量的指数或个股代码或代码列表:param n: 计算前n天每天的收益率:param m: 排除前m天的收益率:param assets: 计算动量的证券种类,'index' or 'stock' or 'future':param method: 计算动量的方法,'overnight'、'intraday' or 'normal' or 'both':return: 返回各标的的动量序列(multiindex)"""# 先处理起止时间if end is None:end = now# 结束日期如果为非交易日,则向前找最近交易日做结束日end = QA.QA_util_get_real_date(end)# 开始日期如果为非交易日,则向后找最近交易日做开始日start = QA.QA_util_get_real_date(start, towards=1)# start之前的n个交易日的日期(前一个交易日就是它本身)last_n_day = QA.QA_util_get_last_day(date=start, n=n-1)# 如果code为字符串,即为单独一个代码而非列表,则转化为列表形式if type(code) is str:code = [code]# 先求前n天内,投资品的每天的收益率(当作速度)velocity = cal_period_ret(start=last_n_day, end=end, code=code, n=n, m=m,assets=assets, method=method)# 求每天的加速度acceleration = velocity / velocity.groupby(level=1).shift() - 1# 求过去n天加速度的和acceleration = acceleration.rolling(n).sum()# 修改名称acceleration.name = 'acceleration'return acceleration.dropna().sort_index().groupby(level=0, group_keys=False).apply(lambda x: x.sort_values())# 计算市场或板块或给定范围的个股,每天涨跌幅均值与其标准差的比值,命名为尹氏指数def cal_yin_period(start, end=None, code=None, assets='stock', method='normal'):""":param start: 开始日期:param end: 结束日期:param code: 计算范围, 个股或指数代码列表:param assets: 投资种类,'index'/'stock':param method: 涨跌幅计算方式:return: 返回每天涨跌幅均值与其标准差的比值的序列"""# 先处理起止时间if end is None:end = now# 结束日期如果为非交易日,则向前找最近交易日做结束日end = QA.QA_util_get_real_date(end)# 开始日期如果为非交易日,则向后找最近交易日做开始日start = QA.QA_util_get_real_date(start, towards=1)# 如果code为字符串,即为单独一个代码而非列表,则转化为列表形式if type(code) is str:code = [code]# 求每天的收益率ret = cal_period_ret(start=start, end=end, code=code, n=1, m=0,assets=assets, method=method)# 求每天收益率的均值mean = ret.groupby(level=0).mean()std = ret.groupby(level=0).std()# 求尹氏指数yin = mean / stdreturn mean, std, yin# 定义一段日期内,求每天的净资产(或者公司其他的总量性质的统计量),当作个股的质量因子,用于构建动量、动能模型def cal_period_mass(start, end=None, code=None, kind='net_assets'):""":param start: 起始日期:param end: 结束日期:param code: 用于计算动量的指数或个股代码或代码列表:param kind: 质量类型,如净资产/营业收入/经营活动产生的现金流量净额等等总量性质的统计量,具体可参看tushare每日指标数据:return: 返回每天各个股的质量(multiindex)"""# 先处理起止时间if end is None:end = now# 结束日期如果为非交易日,则向前找最近交易日做结束日end = QA.QA_util_get_real_date(end)# 开始日期如果为非交易日,则向后找最近交易日做开始日start = QA.QA_util_get_real_date(start, towards=1)# 将起止日期内的交易日期做成序列df_date = QA.QA_util_get_trade_range(start=start, end=end)df_date = pd.to_datetime(df_date)df_date = pd.DataFrame(df_date, index=df_date, columns=['date'])# 如果code为字符串,即为单独一个代码而非列表,则转化为列表形式if type(code) is str:code = change_codeformat(code, 'QA')code = [code]elif type(code) is list:code = change_codeformat(code, 'QA')# 逐日获取全市场质量因子df = pd.DataFrame()for i in df_date.date:try:# 从原版tushare获取指标df = df.append(pro.daily_basic(trade_date=i.strftime('%Y%m%d'), fields="ts_code,trade_date," + kind))# 从湘财tushare获取指标except Exception:df = df.append(xcpro.daily_basic(trade_date=i.strftime('%Y%m%d'), fields="ts_code,trade_date," + kind))# 整理数据df.trade_date = pd.to_datetime(df.trade_date)df.ts_code = change_codeformat(df.ts_code.values.tolist(), 'QA')df.columns = ['code', 'date', kind]df.set_index(['date', 'code'], inplace=True)df.sort_index(inplace=True)if code is not None:df = df.reindex(code, level=1)return df[kind]# 定义一段日期内,计算每天前n个交易日个股的物理动量的函数def cal_physics_momentum(start, end=None, code=None, n=14, m=0, method='normal', kind='net_assets'):""":param start: 起始日期:param end: 结束日期:param code: 用于计算动量的指数或个股代码或代码列表:param n: 计算前n天的动量:param m: 排除最近m天的动量:param method: 计算动量的方法,'overnight'、'intraday' or 'normal' or 'both':param kind: 质量类型,如净资产/营业收入/经营活动产生的现金流量净额等等,具体可参看tushare每日指标数据:return: 返回各标的的动量序列(multiindex)"""# 先求物理速度,即前n天的涨跌幅velocity = cal_period_ret(start=start, end=end, code=code, n=n, m=m,assets='stock', method=method)# 再求物理质量,即个股每天的净资产mass = cal_period_mass(start=start, end=end, code=code, kind=kind)# 最后求动量,p=mvmomentum = velocity * massreturn momentum.dropna()# 定义一段日期内,计算每天前n个交易日个股的物理动能的函数def cal_physics_energy(start, end=None, code=None, n=14, m=0, method='normal', kind='net_assets'):""":param start: 起始日期:param end: 结束日期:param code: 用于计算动量的指数或个股代码或代码列表:param n: 计算前n天的动量:param m: 排除最近m天的动量:param method: 计算动量的方法,'overnight'、'intraday' or 'normal' or 'both':param kind: 质量类型,如净资产/营业收入/经营活动产生的现金流量净额等等,具体可参看tushare每日指标数据:return: 返回各标的的动量序列(multiindex)"""# 先求物理速度,即前n天的涨跌幅velocity = cal_period_ret(start=start, end=end, code=code, n=n, m=m,assets='stock', method=method)# 再求物理质量,即个股每天的净资产mass = cal_period_mass(start=start, end=end, code=code, kind=kind)# 最后求动量,e=0.5mv**2energy = 0.5 * mass * velocity**2return energy.dropna()# 定义将序列数据分割成i段并选取第j段数据的函数(按pandas.qcut分割)def qcut_select(s, n, m):""":param s: 传入用于分割的序列数据,类型为pandas.Series/pandas.Dataframe:param n: 将序列数据切割成n份:param m: 选取第m份数据,0=<m<n:return: 返回分割i份后选取的第j份数据"""# 排序s = s.sort_values()# 如果序列长度小于等于m,则返回原序列if len(s) <= m:if len(s) == 1:passelse:s = s[0:1]# 如果序列长度小于n,大于m,则直接切取第m个元素elif m < len(s) < n:s = s[m:m+1]# 如果序列长度大于等于n,进行分割选取elif len(s) >= n:# 为分割设置从0到n-1的标签labels = [str(i) for i in range(n)]r = pd.qcut(s, n, labels=labels)# 取第m层数据r = r[r == str(m)]s = s[r.index]return s# 定义将序列数据分割成i段并选取第j段数据的函数(按pandas.cut分割)def cut_select(s, n, m):""":param s: 传入用于分割的序列数据,类型为pandas.Series/pandas.Dataframe:param n: 将序列数据切割成n份:param m: 选取第m份数据,0=<m<n:return: 返回分割i份后选取的第j份数据"""# 排序s = s.sort_values()# 如果序列长度小于等于m,则返回第一个元素if len(s) <= m:if len(s) == 1:passelse:s = s[0:1]# 如果序列长度小于n,大于m,则直接切取第m个元素elif m < len(s) < n:s = s[m:m+1]# 为分割设置从0到n-1的标签elif len(s) >= n:labels = [str(i) for i in range(n)]r = pd.cut(s, n, labels=labels)# 取第m层数据r = r[r == str(m)]s = s[r.index]return s# 根据个股加速度acceleration分层选取def acceleration_select(start, k, s, end=None, code=None, freq='m', freq_n=None,n=14,split_method='qcut', assets='stock'):""":param start: 选股开始日期:param end: 选股结束日期:param code: 选股范围,个股代码列表,None时为全市场范围:param freq: 换股频率,即持股天数,可以计算各换股节点,str type or None:param freq_n: 换股间隔天数,和freq不能同时为None:param n: 计算前n天的加速度:param k: 将动量按从大到小分成k组:param s: 从k组成选择第s组数据:param split_method: 分组方法,'qcut/cut',参见pandas.qcut和pandas.cut方法:param assets: 证券种类,'stok/index':return: 返回各选股时间节点上选出的个股列表序列"""# 先处理起止时间if end is None:end = now# 结束日期如果为非交易日,则向前找最近交易日做结束日end = QA.QA_util_get_real_date(end)# 开始日期如果为非交易日,则向后找最近交易日做开始日start = QA.QA_util_get_real_date(start, towards=1)# 计算加速度acceleration = cal_acceleration_period(start=start, end=end, code=code, n=n, assets=assets)# 获取时间节点序列df_date = date_node(start=start, end=end, freq=freq, n=freq_n)# 用时间节点过滤acceleration = acceleration.reindex(df_date.index, level=0)# 用分割函数分割k组后取第s组if split_method == 'qcut':acceleration = acceleration.groupby(level=0).rank(na_option='top', method='first').groupby(level=0).\apply(qcut_select, k, s).droplevel(0)elif split_method == 'cut':acceleration = acceleration.groupby(level=0).apply(cut_select, k, s).droplevel(0)# 仅返回每个时间点选出的板块的代码return acceleration.reset_index(level=1).groupby(level=0).\apply(lambda x: list(x.code.values.flatten()))# 根据个股物理模型分层选股函数def physics_select(start, k, s, end=None, code=None, freq='m', freq_n=None, n=14,m=0, model='acceleration', kind=None, method='normal'):""":param start: 选股开始日期:param end: 选股结束日期:param code: 选股范围,个股代码列表,None时为全市场范围:param freq: 换股频率,即持股天数,可以计算各换股节点,str type or None:param freq_n: 换股间隔天数,和freq不能同时为None:param n: 计算前n天的速度或加速度:param m: 计算速度时排除最近m天数据:param k: 将动量按从大到小分成k组:param s: 从k组成选择第s组数据:param kind: str type, 计算物理质量类型,如'net_assets'等:param model: 物理模型名称:param method: 计算速度或加速度的方式,如'normal'/'overnight'/'intraday'等:return: 返回各选股时间节点上选出的个股列表序列"""# 先处理起止时间if end is None:end = now# 结束日期如果为非交易日,则向前找最近交易日做结束日end = QA.QA_util_get_real_date(end)# 开始日期如果为非交易日,则向后找最近交易日做开始日start = QA.QA_util_get_real_date(start, towards=1)# 计算物理模型值if model == 'acceleration':physics = cal_acceleration_period(start=start, end=end, code=code, n=n, m=m, assets='stock', method=method)elif model == 'momentum':physics = cal_physics_momentum(start=start, end=end, code=code, n=n, m=m, kind=kind, method=method)elif model == 'energy':physics = cal_physics_energy(start=start, end=end, code=code, n=n, m=m, kind=kind, method=method)# 为其他物理模型预留else:physics = None# 获取时间节点序列df_date = date_node(start=start, end=end, freq=freq, n=freq_n)# 用时间节点过滤physics = physics.reindex(df_date.index, level=0)# 用分割函数分割k组后取第s组physics = physics.groupby(level=0).rank(na_option='top', method='first').groupby(level=0).\apply(qcut_select, k, s).droplevel(0)# 仅返回每个时间点选出的板块的代码return physics.reset_index(level=1).groupby(level=0).\apply(lambda x: list(x.code.values.flatten()))# 定义直接用物理模型选股的回测函数def stock_physics_backtest(start, end=None, k=300, s=299, code=None, freq='m', freq_n=None,n=7, m=3, model='momentum', kind='net_assets', method='intraday', cash=1000000):""":param start: 回测开始日期:param end: 回测结束日期:param k: 将个股动量分成k份:param s: 选取动量排名第s份的个股:param code: 指定个股范围,不指定时为所有A股:param freq: 换股频率,即持股天数,可以计算各换股节点,str type or None:param freq_n: 换股间隔天数,和freq不能同时为None:param n: 计算过去n天的数据:param m: 排除过去m天的数据:param model: 物理模型名称:param kind: 计算物理质量的类型,如'net_assets'等:param method: 计算速度/加速度的方法:param cash: 回测初始现金:return: 返回回测结果"""# 用模型选股ss = physics_select(start=start, k=k, s=s, end=end, code=code, freq=freq, freq_n=freq_n, n=n,m=m, model=model, kind=kind, method=method)ss.name ='code'df = pd.DataFrame(ss)# 根据每个节点的选股列表,确定各节点的买入卖出股票df = exchange_stocks(df=df, start=start, end=end, buy_today=False, h_days=None)stock_code = list() # 用于采集所有涉及到的股票交易数据for i in df.index:if type(df.loc[i, 'code']) is list:for j in df.loc[i, 'code']:stock_code.append(j)stock_code = list(set(stock_code))data = QA.QA_fetch_stock_day_adv(stock_code, df.index[0], df.index[-1])data = data.to_qfq()# 设置回测架构Account = QA.QA_Account(strategy_name='physics_model', user_cookie='yinxiuqu',portfolio_cookie=str(kind), start=start, end=end)Broker = QA.QA_BacktestBroker()Account.reset_assets(cash)Account.account_cookie = 'k:' + str(k) + 's:' + str(s) + 'n:' + str(n) + 'm:' +\'freq:' + str(freq) + 'freq_n:' + str(freq_n)# 找出选股代码列表里,最长的列表的长度long = df.code.dropna().groupby(level=0).apply(lambda x: len(x.values[0])).groupby(level=0).max()[0]cash_weight = 0.6 / long # 平均每只股票分配的资金比例for items in data.panel_gen:# 下行用于查找买卖日期节点if type(df.loc[items.date, 'sell_code'].values[0]) is list:# 到节点全部清仓for i in Account.sell_available.index.to_list():# 可卖的股票未停牌时if i in items.code:order = Account.send_order(code=i,time=items.date,amount=Account.sell_available.get(i, 0),towards=QA.ORDER_DIRECTION.SELL,price=0,order_model=QA.ORDER_MODEL.MARKET,amount_model=QA.AMOUNT_MODEL.BY_AMOUNT)# 能够成交时if order:Broker.receive_order(QA.QA_Event(order=order, market_data=items.select_code(i)))trade_mes = Broker.query_orders(Account.account_cookie, 'filled')res = trade_mes.loc[order.account_cookie, order.realorder_id]order.trade(res.trade_id, res.trade_price, res.trade_amount, res.trade_time)if type(df.loc[items.date, 'buy_code'].values[0]) is list:for j in df.loc[items.date, 'buy_code'].values[0]:if j in items.code: # 欲买的股票未停牌时order = Account.send_order(code=j,time=items.date,money=cash_weight * Account.cash_available,# 添加ams指数权重# money=cash_weight * df.loc[items.date, 'weight'].values[0] * Account.cash_available,towards=QA.ORDER_DIRECTION.BUY,price=items.select_code(j).open[0],order_model=QA.ORDER_MODEL.MARKET,amount_model=QA.AMOUNT_MODEL.BY_MONEY)if order: # 能够成交时Broker.receive_order(QA.QA_Event(order=order, market_data=items.select_code(j)))trade_mes = Broker.query_orders(Account.account_cookie, 'filled')res = trade_mes.loc[order.account_cookie, order.realorder_id]order.trade(res.trade_id, res.trade_price, res.trade_amount, res.trade_time)Account.settle()Risk = QA.QA_Risk(Account)return Account, Risk
选择2015年作为模型回测时间段。质量mass我选取的是营业收入(TTM)因子,速度计算方法,前两个模型用的是普通计算模式,第三个模型用的是隔夜计算模式。
1、先用加速度模型回测,每7个交易日计算前7个交易日的加速度,选取靠近最大值的个股。
account, risk = stock_physics_backtest(start=’2015-01-01’, end=’2016-01-01’, k=300, s=289, code=None, freq=None, freq_n=7,
n=7, m=0, model=’acceleration’, kind=’oper_rev_ttm’, method=’normal’, cash=1000000)
运行结束后,运行risk.plot_assets_curve(),效果图如下:

蓝色线是沪深300基准线,当年上涨3%,策略组合为红色线,当年上涨41%
2、用动能模型回测,每7个交易日计算前7个交易日的动能,选取靠近最大值的个股。
account, risk = stock_physics_backtest(start=’2015-01-01’, end=’2016-01-01’, k=300, s=289, code=None, freq=None, freq_n=7,
n=7, m=0, model=’energy’, kind=’oper_rev_ttm’, method=’normal’, cash=1000000)
再运行risk.plot_assets_curve(),效果图如下:
年收益率27%。
3、用动量模型回测,每7个交易日计算前7个交易日的动量,选取靠近最大值的个股。注意这里计算速度用的是隔夜模式。
account, risk = stock_physics_backtest(start=’2015-01-01’, end=’2016-01-01’, k=300, s=289, code=None, freq=None, freq_n=7,
n=7, m=0, model=’momentum’, kind=’oper_rev_ttm’, method=’overnight’, cash=1000000)
risk.plot_assets_curve()
效果图如下:
年收益为17%
仅以三个简单模型为例,说明用物理模型进行金融建模是可行的,参数也并未做优化,也有很多错误,请批指正。
