本文涉及到的个股市场数据来自开源量化框架QUANTAXIS,涉及到的质量(mass)数据来自开源数据引擎原版tushare和湘财证券版tushare。需安装好QUANTAXIS,tushare,xcsc_tushare等模块,如果你不需要湘财证券版tushare的数据,也可以只安装前面两个模块。
源代码粘贴如下(回测效果图在源代码后面):
# -*- coding: utf-8 -*-
"""
@author: yinxiuqu
@license: GNU General Public License v3.0
@contact: yinxiuqu@qq.com
@file: physics.py
@time: 2020/7/7 下午9:48
@Software: PyCharm
本文件定义简单物理模型进行金融建模
共有三种动能计算方法,分别为:
1、日内(intraday):今收/今开 - 1
2、隔夜(overnight):今开/昨收 - 1
3、正常(normal):今收/昨收 - 1
"""
# 导入相应模块
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import copy
import QUANTAXIS as QA
import tushare as ts
import xcsc_tushare as xcts
# 预处理数据接口和常量
ts.set_token('bf1ec1f2e80391feae35a46db063501ad03accf588990dxxxxxxxxxxxxxx')
# 此处请用你自己的token
xcts.set_token('96e5167f90449e742407e3dc09de7795fd13edfee80xxxxxxxxxxxxxxx')
# 此处请用你自己的token
pro = ts.pro_api()
xcpro = xcts.pro_api(env='prd')
now = datetime.now().strftime('%Y-%m-%d')
# 定义从QUANTAXIS获取个股日线数据的函数
def get_stock_day_QA(start, end=None, code=None):
"""
:param code:str type,stock code
:param start:str type,start date
:param end:str type,end date
:return:multi-index sources-frame
"""
# 处理默认结束日期
if end is None or end > now or end is 'now':
end = now
# 如果结束日为今天且今天是交易日且没收盘
if end == now and QA.QA_util_if_trade(end) and '09:30' < datetime.now().strftime('%H:%M') < '15:00':
# 盘中在线获取个股列表
if code is None:
code = QA.QAFetch.QATdx.QA_fetch_get_stock_list('stock').index.get_level_values(0).to_list()
# 如果起止日期不是同一个交易日
if start != end:
df1 = QA.QA_fetch_stock_day_adv(code=code, start=start, end=QA.QA_util_get_last_day(end, 1)).data
df2 = QA.QAFetch.QATdx.QA_fetch_get_stock_latest(code=code)
if df2 is not None:
df2 = df2.loc[now].drop_duplicates()
df2.rename({'vol': 'volume'}, axis='columns', inplace=True)
df2.set_index([df2.index, 'code'], drop=True, inplace=True)
df2 = df2[list(df1.columns)]
df = df1.append(df2)
# 如果起止日期是同一个交易日,只获取盘中数据(因为还没收盘)
else:
df = QA.QAFetch.QATdx.QA_fetch_get_stock_latest(code=code)
df = df.loc[now].drop_duplicates()
df.rename({'vol': 'volume'}, axis='columns', inplace=True)
df.set_index([df.index, 'code'], drop=True, inplace=True)
df = df[['open', 'high', 'low', 'close', 'volume', 'amount']]
# 在非交易日或者交易日的非交易时间段内
else:
# 从数据库获取个股列表
if code is None:
code = QA.QA_fetch_stock_list_adv().index.to_list()
df = QA.QA_fetch_stock_day_adv(code=code, start=start, end=end).data
return df
# 将多维列表打平为一维列表的函数(因为python3已经不支持列表flatten了)
def flatten_list(input_list):
"""
将多维列表打平为一维列表的函数,因为python3已经不支持列表flatten。
:param input_list: 输入的多维列表
:return: 返回打通后的一维列表
"""
output_list = []
while True:
if input_list == []:
break
for index, value in enumerate(input_list):
# index :索引序列 value:索引序列对应的值
# enumerate() 函数用于将一个可遍历的数据对象(如列表、元组或字符串)组合为一个索引序列,
# 同时列出数据和数据下标,一般用在 for 循环当中。
if type(value) == list:
input_list = value + input_list[index+1:]
break # 这里跳出for循环后,从While循环进入的时候index是更新后的input_list新开始算的。
else:
output_list.append(value)
input_list.pop(index)
break
return output_list
# 定义股票代码风格变换函数,本函数仅改变单个代码,即字符串代码
def change_codestyle(code, style):
"""
:param code: str
:param style: 'QA'/'ts'/'bs'/'jq'分别为QUANTAXIS、tushare、baostock和joinquant风格
:return: 返回改变风格后的code
"""
# 特殊符号做code保持不变,例如
'',' '等
if len(code) <= 3:
code = code
else:
if style == 'QA':
if code.startswith('sh') or code.startswith('sz'):
code = code[3:]
elif code.endswith('SH') or code.endswith('SZ'):
code = code[:-3]
elif code[-1].isdigit() and code[0].isdigit():
code = code
elif code.endswith('XSHG') or code.endswith('XSHE'):
code = code[:-5]
elif style == 'ts':
if code.startswith('sh') or code.startswith('sz'):
code = code[3:] + '.' + code[:2].upper()
elif code.endswith('SH') or code.endswith('SZ'):
code = code
elif code[-1].isdigit() and code[0].isdigit():
if code[0] == '6':
code = code + '.SH'
else:
code = code + '.SZ'
elif code.endswith('XSHG') or code.endswith('XSHE'):
if code.endswith('XSHG'):
code = code[:-5] + '.SH'
else:
code = code[:-5] + '.SZ'
elif style == 'bs':
if code.startswith('sh') or code.startswith('sz'):
code = code
elif code.endswith('SH') or code.endswith('SZ'):
code = code[-2:].lower() + '.' + code[:6]
elif code[-1].isdigit() and code[0].isdigit():
if code[0] == '6':
code = 'sh.' + code
else:
code = 'sz.' + code
elif code.endswith('XSHG') or code.endswith('XSHE'):
if code.endswith('XSHG'):
code = 'sh.' + code[:-5]
else:
code = 'sz.' + code[:-5]
elif style == 'jq':
if code.startswith('sh') or code.startswith('sz'):
if code.startswith('sh'):
code = code[3:] + '.XSHG'
else:
code = code[3:] + '.XSHE'
elif code.endswith('SH') or code.endswith('SZ'):
if code.endswith('SH'):
code = code[:-3] + '.XSHG'
else:
code = code[:-3] + '.XSHE'
elif code[-1].isdigit() and code[0].isdigit():
if code[0] == '6':
code = code + '.XSHG'
else:
code = code + '.XSHE'
elif code.endswith('XSHG') or code.endswith('XSHE'):
code = code
return code
# 定义code风格改变函数,code可以是字符串或列表。
def change_codeformat(code, style):
"""
:param code: str or list
:param style: 'QA'/'ts'/'bs'/'jq'分别为QUANTAXIS/tushare/baostock/joinquant风格
:return: 返回改变风格后的code
"""
if type(code) is str:
code = change_codestyle(code, style)
elif type(code) is list:
code = [change_codestyle(i, style) for i in code]
elif type(code) is pd.Series:
index = code.index
code = code.values.tolist()
code = [change_codestyle(i, style) for i in code]
code = pd.Series(code, index=index)
return code
# 定义日期格式转换函数,'2018-01-01'、'20180101'、'2018/01/01'、20180101等格式互相转化
def change_dateformat(date, sep='-'):
"""
:param date: 输入的日期
:param sep: 日期中的间隔符号
"""
if type(date) is int:
date = str(date)
elif type(date) is datetime:
date = datetime.strftime(date.date(), '%Y%m%d')
elif date is None:
date = date
elif type(date) is str:
if sep == '-':
if '-' not in date:
if '/' not in date:
date = date[:4] + '-' + date[4:6] + '-' + date[6:]
elif '/' in date:
date = date.replace('/', '-')
if sep == '/':
if '/' not in date:
if '-' not in date:
date = date[:4] + '/' + date[4:6] + '/' + date[6:]
elif '-' in date:
date = date.replace('-', '/')
if sep is None:
if '/' in date:
date = date.replace('/', '')
elif '-' in date:
date = date.replace('-', '')
if sep == '':
if '/' in date:
date = date.replace('/', '')
elif '-' in date:
date = date.replace('-', '')
return date
# 定义获取回测期间选股、换股日期节点的函数。
def date_node(start, end, freq='m', n=None):
"""
:param start: 回测起始日期
:param end: 回测终止日期
:param freq: 选股、换股频率,'d'/'w'/'m'/'q'/'y':分别表示日、周、月、季、年
当freq='w'时,按照每5个交易日取样(避免某些周无交易日或只有一两个交易日)。
当freq = None时,时间节点按n个交易日确定,n和freq不能同时为None。
:param n: 选股、换股频率,为n个交易日。n和freq不能同时为None,必有一个为None。
:return: 返回选股、换股时间节点的日期,即每个频率周期段的最后一个交易日
"""
# 将日期统一成QA风格形式
start = change_dateformat(start, sep='-')
end = change_dateformat(end, sep='-')
date_range = QA.QA_util_get_trade_range(start, end)
df_date_range = pd.DataFrame(data=date_range, index=date_range)
df_date_range.index = pd.to_datetime(df_date_range.index)
if n is None:
if freq == 'w':
df_date_range = df_date_range.reset_index()
df_date_range = df_date_range[df_date_range.index % 5 == 0]
df_date_range = df_date_range.set_index('index')
elif freq == 'd':
df_date_range = df_date_range
else:
df_date_range = df_date_range.resample(rule=freq).first() # 以周期初为时间节点
else:
if n == 1:
df_date_range = df_date_range
else:
df_date_range = df_date_range.reset_index()
df_date_range = df_date_range[df_date_range.index % n == 0]
df_date_range = df_date_range.set_index('index')
list_date_range = list(df_date_range.iloc[:, 0])
df = pd.DataFrame(data=list_date_range, index=list_date_range)
df.index = pd.to_datetime(df.index)
df.columns = ['date']
df = df.dropna() # 排除日线取样时的nan
return df
# 定期换股函数,按传入的选股序列,计算买股、卖股、换股的序列
def exchange_stocks(df, start, end, buy_today=False, h_days=None):
"""
:param df: 传入的股票序列,以日期为index,包含'code'列
:param start: 开始日期
:param end: 结束日期
:param buy_today: 是否当天买:False为第二天买,True为当天买
:param h_days: 为None时按照换股节点换股,为数字时按照数字代表的交易日数换股
:return: 返回买股、卖股、换股的序列
"""
# 处理缺省值
if end is None:
end = now
# 将日期统一成QA风格形式
start = change_dateformat(start, sep='-')
end = change_dateformat(end, sep='-')
# 如果按照节点换股
if h_days is None:
# 根据每个节点所选股票列表,确定卖股节点为下一个日期节点
# 注意并非推迟一个交易日,而是推迟一个节点!
df = df.assign(sell_code=df.code.shift(1))
# 如果不是选股当天买入或换股,而是第二个交易日
if buy_today is False:
# 如果换股频率不是每天(如果连续四个节点相差不止一个交易日,可确保节点不是每天换股):
if df.index[4] - df.index[3] != timedelta(days=1) or \
df.index[3] - df.index[2] != timedelta(days=1) or \
df.index[2] - df.index[1] != timedelta(days=1) or \
df.index[1] - df.index[0] != timedelta(days=1):
# 恢复成日采样形式,便于推算后一个交易日
df = df.resample('d').last()
# 去掉非交易日行
trade_day_index = QA.QA_util_get_trade_range(start, end)
trade_day_index = pd.to_datetime(trade_day_index)
df = df.reindex(trade_day_index)
# 此处买入为推后一个交易日,并非推迟一个节点
df = df.assign(buy_code=df.code.shift(1))
# 卖股也推迟一个交易日,与买股同一个交易日,以便于判断是否需要新卖出
df['sell_code'] = df.sell_code.shift(1)
# 如果是选股当天买股或换股,很简单
else:
df = df.assign(buy_code=df.code)
# 恢复成日采样形式,便于回测统一日期进度
df = df.resample('d').last()
# 去掉非交易日行
trade_day_index = QA.QA_util_get_trade_range(start, end)
trade_day_index = pd.to_datetime(trade_day_index)
df = df.reindex(trade_day_index)
# 对buy_code、sell_code分别和code进行去除交集运算,求出须实际买卖的股票代码列表
df['new_sell_code'] = [np.nan] * len(df)
df['new_buy_code'] = [np.nan] * len(df)
df['new_sell_code'] = df['new_sell_code'].astype(object)
df['new_buy_code'
] = df['new_buy_code'].astype(object)
for i in df.index:
# 如果没有卖出股,只有买入股,则只买入股
if (type(df.loc[i, 'sell_code']) is not list or df.loc[i, 'sell_code'] == []) and \
(type(df.loc[i, 'buy_code']) is list and len(df.loc[i, 'buy_code']) > 0):
df.at[i, 'new_buy_code'] = list(set(df.loc[i, 'buy_code']))
# 如果买股卖股都有,则买卖都取差集
elif (type(df.loc[i, 'sell_code']) is list and len(df.loc[i, 'sell_code']) > 0) \
and (type(df.loc[i, 'buy_code']) is list and len(df.loc[i, 'buy_code']) > 0):
df.at[i, 'new_sell_code'] = list(set(df.loc[i, 'sell_code']) - set(df.loc[i, 'buy_code']))
df.at[i, 'new_buy_code'] = list(set(df.loc[i, 'buy_code']) - set(df.loc[i, 'sell_code']))
# 如果没有买入股,只有卖出股,则只卖出股
elif (type(df.loc[i, 'sell_code']) is list and len(df.loc[i, 'sell_code']) > 0) and \
(type(df.loc[i, 'buy_code']) is not list or df.loc[i, 'buy_code'] == []):
df.at[i, 'new_sell_code'] = list(set(df.loc[i, 'sell_code']))
# 如果date列包含在df中,则抛弃掉
if 'date' in df.columns:
df = df.drop(columns='date')
# 按固定交易日数持股,即持股h_days卖掉。
else:
date_range = pd.date_range(start, end)
df_date_range = pd.DataFrame(data=date_range, index=date_range)
df_date_range.index = pd.to_datetime(df_date_range.index)
# 为原始code单独建立一个序列,深度复制
df_code = copy.deepcopy(df)
# 将code序列转化成日期为交易日的序列,此处原index为非交易日的转化后可能产生重叠的index,如周六、周日都转化为后一个周一等。
df_code.index = [QA.QA_util_get_real_date(i, towards=1) for i in df_code.index]
# 合并index重复项
df_code = df_code.groupby(level=0, group_keys=False).apply(lambda x: x.values.tolist())
df_code = pd.Series([flatten_list(i) for i in df_code], index=df_code.index)
df_code = pd.DataFrame(df_code)
df_code.columns = ['code']
df_code.index = pd.to_datetime(df_code.index)
#
将原选股序列恢复成每天的形式
df = df.reindex(df_date_range.index)
df['buy_code'] = np.nan
df['sell_code'] = np.nan
# 将这两列转化为object类型,以便能够赋列表形式的值
df[['buy_code', 'sell_code']] = df[['buy_code', 'sell_code']].astype(object)
# 把df对应df_code日期位置的buy_code设置为df_code.buy_code的值
if buy_today is True:
# 为df添加(h_days)个交易日,便于最后一个交易日买的股能在h_days个交易日后卖
added_trad_start = QA.QA_util_get_real_date(df.index[-1]) # 此处是real_date:如果是交易日就当天买
added_trade_end = QA.QA_util_get_next_trade_date(added_trad_start, h_days)
added_trade_days = QA.QA_util_get_trade_range(added_trad_start, added_trade_end)
added_trade_days = pd.to_datetime(added_trade_days)
df_added = pd.DataFrame([np.nan]*len(added_trade_days), index=added_trade_days)
df = df.append(df_added)
# 当天购买
df.loc[df_code.index, 'buy_code'] = df_code.code.to_list()
else:
# 为df添加(1+h_days)个交易日,便于最后一天选出的个股能在下一个交易日购买,在(1+h_days)个交易日后卖
added_trad_start = QA.QA_util_get_next_trade_date(df.index[-1]) # 此处是next_trade,下一个交易日买
added_trade_end = QA.QA_util_get_next_trade_date(added_trad_start, h_days)
added_trade_days = QA.QA_util_get_trade_range(added_trad_start, added_trade_end)
added_trade_days = pd.to_datetime(added_trade_days)
df_added = pd.DataFrame([np.nan]*len(added_trade_days), index=added_trade_days)
df_added.columns = ['code']
df = df.append(df_added, sort=True)
# 推迟一个交易日购买
next_trade_date = [QA.QA_util_get_next_trade_date(i) for i in df_code.index]
next_trade_date = pd.to_datetime(next_trade_date)
df.loc[next_trade_date, 'buy_code'] = df_code.code.to_list()
# 将选股序列按h_days递延,作为卖出列表
# 买入后持有h_days个交易日卖掉
h_days_trade_date = [QA.QA_util_get_next_trade_date(i, h_days) for i in df_code.index]
h_days_trade_date = pd.to_datetime(h_days_trade_date)
df.loc[h_days_trade_date, 'sell_code'] = df_code.code.to_list()
return df
# 定义计算一段日期内个股日内或隔夜收益率函数(每天)
def cal_stock_ret(start, end=None, code=None, method='normal'):
"""
:param start: 开始日期
:param end: 结束日期
:param code: 个股代码,等于None时为市场内所有个股代码列表
:param method: 计算收益率的方式,intraday为日内,overnight为隔夜,normal为正常, both为日内+隔夜的相反数
:return: 返回个股收益率序列
"""
# 日期和代码预处理
start = QA.QA_util_get_real_date(start)
if end is None:
end = now
if type(code) is str:
code = [code]
# 计算普通收益率
if method == 'normal':
start = QA.QA_util_get_last_day(start)
# 获取个股行情数据
df = get_stock_day_QA(code=code, start=start, end=end)
# 将行情数据转化为QA的DataStruct数据结构,以便于复权
data = QA.QA_DataStruct_Stock_day(df)
# 前复权(仅计算今天盘中数据时,不能复权,其他情况都做复权处理)
if end == now and QA.QA_util_if_trade(end) and \
'09:30' < datetime.now().strftime('%H:%M') < '15:00' and start != end:
data = data.to_qfq()
ret = data.close / data.close.groupby(level=1).shift(1) - 1
# 计算日内收益率
elif method == 'intraday':
# 获取个股行情数据
df = get_stock_day_QA(code=code, start=start, end=end)
# 将行情数据转化为QA的DataStruct数据结构,以便于复权
data = QA.QA_DataStruct_Stock_day(df)
# 前复权
data = data.to_qfq()
ret = data.close / data.open - 1
# 计算隔夜收益率
elif method == 'overnight':
start = QA.QA_util_get_last_day(start)
# 获取个股行情数据
df = get_stock_day_QA(code=code, start=start, end=end)
# 将行情数据转化为QA的DataStruct数据结构,以便于复权
data = QA.QA_DataStruct_Stock_day(df)
# 前复权
data = data.to_qfq()
ret = data.open / data.close.groupby(level=1).shift(1) - 1
# 计算日内-隔夜:即日内+隔夜的相反数
elif method == 'both':
start = QA.QA_util_get_last_day(start)
# 获取个股行情数据
df = get_stock_day_QA(code=code, start=start, end=end)
# 将行情数据转化为QA的DataStruct数据结构,以便于复权
data = QA.QA_DataStruct_Stock_day(df)
# 前复权
data = data.to_qfq()
# 日内收益率与隔夜收益率的差
ret = data.close / data.open - data.open / data.close.groupby(level=1).shift(1)
# 错误方式反馈
else:
ret = None
print('Only can calculate intraday or overnight return! Please retry!')
ret = ret.dropna()
return ret
# 计算个股前n个交易日涨跌率的函数
def cal_n_ret(code=None, n=14, m=0, date=None, assets='stock', method='normal'):
"""
:param code: 用于计算动量的指数或个股代码列表
:param n: 计算前n天的动量
:param m: 排除最近m天的动量
:param date: 基准日期
:param assets: 计算动量的证券种类,'index' or 'stock' or 'future'
:param method: 计算动量的方法,'overnight' 、'intraday' or 'normal' or 'both'
:return: 返回各标的的动量序列
"""
if date is None:
date = now
date = QA.QA_util_get_real_date(date)
# 过去n个交易日的日期(前一个交易日就是它本身)
last_n_day = QA.QA_util_get_last_day(date=date, n=n - 1)
if type(code) is str: # 如果code为字符串,即为单独一个代码而非列表,则转化为列表形式
code = [code]
# 如果标的为指数
if assets == 'index':
pass
# 如果标的为个股
elif assets == 'stock':
ret = cal_stock_ret(start=last_n_day, end=date, code=code, method=method)
else: # 为期货等预留
pass
# 排除前m天的涨跌率
if m != 0:
index2 = ret.index.get_level_values(level=0).drop_duplicates()[:-m]
ret = ret.reindex(index2, level=0)
# 对过去n-m个交易日收益率进行加总
n_ret = ret.groupby(level=1).sum()
n_ret.name = 'sum_ret'
return n_ret.sort_values() # 返回排序后的动量序列
# 计算一段日期内,每天前n个交易日的涨跌率的函数(上个函数仅计算一个基准日,本函数计算一段日期内每个交易日做基准日)
def cal_period_ret(start, end=None, code=None, n=14, m=0, assets='index', method='normal'):
"""
:param start: 起始日期
:param end: 结束日期
:param code: 用于计算动量的指数或个股代码或代码列表
:param n: 计算前n天的动量
:param m: 排除最近m天的动量
:param assets: 计算动量的证券种类,'index' or 'stock'
:param method: 计算动量的方法,'overnight'、'intraday' or 'normal' or 'both'
:return: 返回各标的的动量序列(multiindex)
"""
if end is None:
end = now
end = QA.QA_util_get_real_date(end) #
结束日期如果为非交易日,则向前找最近交易日做结束日
start = QA.QA_util_get_real_date(start, towards=1) # 开始日期如果为非交易日,则向后找最近交易日做开始日
# start之前的n个交易日的日期(前一个交易日就是它本身)
last_n_day = QA.QA_util_get_last_day(date=start, n=n-1)
if type(code) is str: # 如果code为字符串,即为单独一个代码而非列表,则转化为列表形式
code = [code]
# 如果标的为指数
if assets == 'index':
pass
# 如果标的为个股
elif assets == 'stock':
ret = cal_stock_ret(start=last_n_day, end=end, code=code, method=method)
else: # 为期货预留
pass
period_ret = ret.groupby(level=1).rolling(n - m).sum().dropna() # 按窗口移动加总收益率
period_ret = period_ret.droplevel(level=2) # 上一步计算中,momentum会形成第三层index,所以抛弃第三层index
period_ret.name = 'sum_ret' # 设置序列名称
period_ret = period_ret.swaplevel().sort_index(level=0) # 按日期排序
# 下面的排序中增加了一层重复index,去掉。返回根据n日涨跌率排名后的multi_index序列(以日期和证券代码为index)。
period_ret = period_ret.groupby(level=0).apply(lambda x: x.sort_values()).droplevel(0)
if m != 0:
period_ret = period_ret.groupby(level=1).shift(m).dropna().\
groupby(level=0).apply(lambda x: x.sort_values()).droplevel(0)
return period_ret
# 定义计算一段日期个股涨跌幅的涨跌幅,即加速度acceleration
def cal_acceleration_period(start, end=None, code=None, n=14, m=0, assets='index', method='normal'):
"""
:param start: 起始日期
:param end: 结束日期
:param code: 用于计算动量的指数或个股代码或代码列表
:param n: 计算前n天每天的收益率
:param m: 排除前m天的收益率
:param assets: 计算动量的证券种类,'index' or 'stock' or 'future'
:param method: 计算动量的方法,'overnight'、'intraday' or 'normal' or 'both'
:return: 返回各标的的动量序列(multiindex)
"""
# 先处理起止时间
if end is None:
end = now
# 结束日期如果为非交易日,则向前找最近交易日做结束日
end = QA.QA_util_get_real_date(end)
# 开始日期如果为非交易日,则向后找最近交易日做开始日
start = QA.QA_util_get_real_date(start, towards=1)
# start之前的n个交易日的日期(前一个交易日就是它本身)
last_n_day = QA.QA_util_get_last_day(date=start, n=n-1)
# 如果code为字符串,即为单独一个代码而非列表,则转化为列表形式
if type(code) is str:
code = [code]
# 先求前n天内,投资品的每天的收益率(当作速度)
velocity = cal_period_ret(start=last_n_day, end=end, code=code, n=n, m=m,
assets=assets, method=method)
# 求每天的加速度
acceleration = velocity / velocity.groupby(level=1).shift() - 1
# 求过去n天加速度的和
acceleration = acceleration.rolling(n).sum()
# 修改名称
acceleration.name = 'acceleration'
return acceleration.dropna().sort_index().groupby(level=0, group_keys=False).apply(lambda x: x.sort_values())
# 计算市场或板块或给定范围的个股,每天涨跌幅均值与其标准差的比值,命名为尹氏指数
def cal_yin_period(start, end=None, code=None, assets='stock', method='normal'):
"""
:param start: 开始日期
:param end: 结束日期
:param code: 计算范围, 个股或指数代码列表
:param assets: 投资种类,'index'/'stock'
:param method: 涨跌幅计算方式
:return: 返回每天涨跌幅均值与其标准差的比值的序列
"""
# 先处理起止时间
if end is None:
end = now
# 结束日期如果为非交易日,则向前找最近交易日做结束日
end = QA.QA_util_get_real_date(end)
# 开始日期如果为非交易日,则向后找最近交易日做开始日
start = QA.QA_util_get_real_date(start, towards=1)
# 如果code为字符串,即为单独一个代码而非列表,则转化为列表形式
if type(code) is str:
code = [code]
# 求每天的收益率
ret = cal_period_ret(start=start, end=end, code=code, n=1, m=0,
assets=assets, method=method)
# 求每天收益率的均值
mean = ret.groupby(level=0).mean()
std = ret.groupby(level=0).std()
# 求尹氏指数
yin = mean / std
return mean, std, yin
# 定义一段日期内,求每天的净资产(或者公司其他的总量性质的统计量),当作个股的质量因子,用于构建动量、动能模型
def cal_period_mass(start, end=None, code=None, kind='net_assets'):
"""
:param start: 起始日期
:param end: 结束日期
:param code: 用于计算动量的指数或个股代码或代码列表
:param kind: 质量类型,如净资产/营业收入/经营活动产生的现金流量净额等等总量性质的统计量,具体可参看tushare每日指标数据
:return: 返回每天各个股的质量(multiindex)
"""
# 先处理起止时间
if end is None:
end = now
# 结束日期如果为非交易日,则向前找最近交易日做结束日
end = QA.QA_util_get_real_date(end)
# 开始日期如果为非交易日,则向后找最近交易日做开始日
start = QA.QA_util_get_real_date(start, towards=1)
# 将起止日期内的交易日期做成序列
df_date = QA.QA_util_get_trade_range(start=start, end=end)
df_date = pd.to_datetime(df_date)
df_date = pd.DataFrame(df_date, index=df_date, columns=['date'])
# 如果code为字符串,即为单独一个代码而非列表,则转化为列表形式
if type(code) is str:
code = change_codeformat(code, 'QA')
code = [code]
elif type(code) is list:
code = change_codeformat(code, 'QA')
# 逐日获取全市场质量因子
df = pd.DataFrame()
for i in df_date.date:
try:
# 从原版tushare获取指标
df = df.append(pro.daily_basic(trade_date=i.strftime('%Y%m%d'), fields="ts_code,trade_date," + kind))
# 从湘财tushare获取指标
except Exception:
df = df.append(xcpro.daily_basic(trade_date=i.strftime('%Y%m%d'), fields="ts_code,trade_date," + kind))
# 整理数据
df.trade_date = pd.to_datetime(df.trade_date)
df.ts_code = change_codeformat(df.ts_code.values.tolist(), 'QA')
df.columns = ['code', 'date', kind]
df.set_index(['date', 'code'], inplace=True)
df.sort_index(inplace=True)
if code is not None:
df = df.reindex(code, level=1)
return df[kind]
# 定义一段日期内,计算每天前n个交易日个股的物理动量的函数
def cal_physics_momentum(start, end=None, code=None, n=14, m=0, method='normal', kind='net_assets'):
"""
:param start: 起始日期
:param end: 结束日期
:param code: 用于计算动量的指数或个股代码或代码列表
:param n: 计算前n天的动量
:param m: 排除最近m天的动量
:param method: 计算动量的方法,'overnight'、'intraday' or 'normal' or 'both'
:param kind: 质量类型,如净资产/营业收入/经营活动产生的现金流量净额等等,具体可参看tushare每日指标数据
:return: 返回各标的的动量序列(multiindex)
"""
# 先求物理速度,即前n天的涨跌幅
velocity = cal_period_ret(start=start, end=end, code=code, n=n, m=m,
assets='stock', method=method)
# 再求物理质量,即个股每天的净资产
mass = cal_period_mass(start=start, end=end, code=code, kind=kind)
# 最后求动量,p=mv
momentum = velocity * mass
return momentum.dropna()
# 定义一段日期内,计算每天前n个交易日个股的物理动能的函数
def cal_physics_energy(start, end=None, code=None, n=14, m=0, method='normal', kind='net_assets'):
"""
:param start: 起始日期
:param end: 结束日期
:param code: 用于计算动量的指数或个股代码或代码列表
:param n: 计算前n天的动量
:param m: 排除最近m天的动量
:param method: 计算动量的方法,'overnight'、'intraday' or 'normal' or 'both'
:param kind: 质量类型,如净资产/营业收入/经营活动产生的现金流量净额等等,具体可参看tushare每日指标数据
:return: 返回各标的的动量序列(multiindex)
"""
# 先求物理速度,即前n天的涨跌幅
velocity = cal_period_ret(start=start, end=end, code=code, n=n, m=m,
assets='stock', method=method)
# 再求物理质量,即个股每天的净资产
mass = cal_period_mass(start=start, end=end, code=code, kind=kind)
# 最后求动量,e=0.5mv**2
energy = 0.5 * mass * velocity**2
return energy.dropna()
# 定义将序列数据分割成i段并选取第j段数据的函数(按pandas.qcut分割)
def qcut_select(s, n, m):
"""
:param s: 传入用于分割的序列数据,类型为pandas.Series/pandas.Dataframe
:param n: 将序列数据切割成n份
:param m: 选取第m份数据,0=<m<n
:return: 返回分割i份后选取的第j份数据
"""
# 排序
s = s.sort_values()
# 如果序列长度小于等于m,则返回原序列
if len(s) <= m:
if len(s) == 1:
pass
else:
s = s[0:1]
# 如果序列长度小于n,大于m,则直接切取第m个元素
elif m < len(s) < n:
s = s[m:m+1]
# 如果序列长度大于等于n,进行分割选取
elif len(s) >= n:
# 为分割设置从0到n-1的标签
labels = [str(i) for i in range(n)]
r = pd.qcut(s, n, labels=labels)
# 取第m层数据
r = r[r == str(m)]
s = s[r.index]
return s
# 定义将序列数据分割成i段并选取第j段数据的函数(按pandas.cut分割)
def cut_select(s, n, m):
"""
:param s: 传入用于分割的序列数据,类型为pandas.Series/pandas.Dataframe
:param n: 将序列数据切割成n份
:param m: 选取第m份数据,0=<m<n
:return: 返回分割i份后选取的第j份数据
"""
# 排序
s = s.sort_values()
# 如果序列长度小于等于m,则返回第一个元素
if len(s) <= m:
if len(s) == 1:
pass
else:
s = s[0:1]
# 如果序列长度小于n,大于m,则直接切取第m个元素
elif m < len(s) < n:
s = s[m:m+1]
# 为分割设置从0到n-1的标签
elif len(s) >= n:
labels = [str(i) for i in range(n)]
r = pd.cut(s, n, labels=labels)
# 取第m层数据
r = r[r == str(m)]
s = s[r.index]
return s
# 根据个股加速度acceleration分层选取
def acceleration_select(start, k, s, end=None, code=None, freq='m', freq_n=None,
n=14,
split_method='qcut', assets='stock'):
"""
:param start: 选股开始日期
:param end: 选股结束日期
:param code: 选股范围,个股代码列表,None时为全市场范围
:param freq: 换股频率,即持股天数,可以计算各换股节点,str type or None
:param freq_n: 换股间隔天数,和freq不能同时为None
:param n: 计算前n天的加速度
:param k: 将动量按从大到小分成k组
:param s: 从k组成选择第s组数据
:param split_method: 分组方法,'qcut/cut',参见pandas.qcut和pandas.cut方法
:param assets: 证券种类,'stok/index'
:return: 返回各选股时间节点上选出的个股列表序列
"""
# 先处理起止时间
if end is None:
end = now
# 结束日期如果为非交易日,则向前找最近交易日做结束日
end = QA.QA_util_get_real_date(end)
# 开始日期如果为非交易日,则向后找最近交易日做开始日
start = QA.QA_util_get_real_date(start, towards=1)
# 计算加速度
acceleration = cal_acceleration_period(start=start, end=end, code=code, n=n, assets=assets)
# 获取时间节点序列
df_date = date_node(start=start, end=end, freq=freq, n=freq_n)
# 用时间节点过滤
acceleration = acceleration.reindex(df_date.index, level=0)
# 用分割函数分割k组后取第s组
if split_method == 'qcut':
acceleration = acceleration.groupby(level=0).rank(na_option='top', method='first').groupby(level=0).\
apply(qcut_select, k, s).droplevel(0)
elif split_method == 'cut':
acceleration = acceleration.groupby(level=0).apply(cut_select, k, s).droplevel(0)
# 仅返回每个时间点选出的板块的代码
return acceleration.reset_index(level=1).groupby(level=0).\
apply(lambda x: list(x.code.values.flatten()))
# 根据个股物理模型分层选股函数
def physics_select(start, k, s, end=None, code=None, freq='m', freq_n=None, n=14,
m=0, model='acceleration', kind=None, method='normal'):
"""
:param start: 选股开始日期
:param end: 选股结束日期
:param code: 选股范围,个股代码列表,None时为全市场范围
:param freq: 换股频率,即持股天数,可以计算各换股节点,str type or None
:param freq_n: 换股间隔天数,和freq不能同时为None
:param n: 计算前n天的速度或加速度
:param m: 计算速度时排除最近m天数据
:param k: 将动量按从大到小分成k组
:param s: 从k组成选择第s组数据
:param kind: str type, 计算物理质量类型,如'net_assets'等
:param model: 物理模型名称
:param method: 计算速度或加速度的方式,如'normal'/'overnight'/'intraday'等
:return: 返回各选股时间节点上选出的个股列表序列
"""
# 先处理起止时间
if end is None:
end = now
# 结束日期如果为非交易日,则向前找最近交易日做结束日
end = QA.QA_util_get_real_date(end)
# 开始日期如果为非交易日,则向后找最近交易日做开始日
start = QA.QA_util_get_real_date(start, towards=1)
# 计算物理模型值
if model == 'acceleration':
physics = cal_acceleration_period(start=start, end=end, code=code, n=n, m=m, assets='stock', method=method)
elif model == 'momentum':
physics = cal_physics_momentum(start=start, end=end, code=code, n=n, m=m, kind=kind, method=method)
elif model == 'energy':
physics = cal_physics_energy(start=start, end=end, code=code, n=n, m=m, kind=kind, method=method)
# 为其他物理模型预留
else:
physics = None
# 获取时间节点序列
df_date = date_node(start=start, end=end, freq=freq, n=freq_n)
# 用时间节点过滤
physics = physics.reindex(df_date.index, level=0)
# 用分割函数分割k组后取第s组
physics = physics.groupby(level=0).rank(na_option='top', method='first').groupby(level=0).\
apply(qcut_select, k, s).droplevel(0)
# 仅返回每个时间点选出的板块的代码
return physics.reset_index(level=1).groupby(level=0).\
apply(lambda x: list(x.code.values.flatten()))
# 定义直接用物理模型选股的回测函数
def stock_physics_backtest(start, end=None, k=300, s=299, code=None, freq='m', freq_n=None,
n=7, m=3, model='momentum', kind='net_assets', method='intraday', cash=1000000):
"""
:param start: 回测开始日期
:param end: 回测结束日期
:param k: 将个股动量分成k份
:param s: 选取动量排名第s份的个股
:param code: 指定个股范围,不指定时为所有A股
:param freq: 换股频率,即持股天数,可以计算各换股节点,str type or None
:param freq_n: 换股间隔天数,和freq不能同时为None
:param n: 计算过去n天的数据
:param m: 排除过去m天的数据
:param model: 物理模型名称
:param kind: 计算物理质量的类型,如'net_assets'等
:param method: 计算速度/加速度的方法
:param cash: 回测初始现金
:return: 返回回测结果
"""
# 用模型选股
ss = physics_select(start=start, k=k, s=s, end=end, code=code, freq=freq, freq_n=freq_n, n=n,
m=m, model=model, kind=kind, method=method)
ss.name =
'code'
df = pd.DataFrame(ss)
# 根据每个节点的选股列表,确定各节点的买入卖出股票
df = exchange_stocks(df=df, start=start, end=end, buy_today=False, h_days=None)
stock_code = list() # 用于采集所有涉及到的股票交易数据
for i in df.index:
if type(df.loc[i, 'code']) is list:
for j in df.loc[i, 'code']:
stock_code.append(j)
stock_code = list(set(stock_code))
data = QA.QA_fetch_stock_day_adv(stock_code, df.index[0], df.index[-1])
data = data.to_qfq()
# 设置回测架构
Account = QA.QA_Account(strategy_name='physics_model', user_cookie='yinxiuqu',
portfolio_cookie=str(kind), start=start, end=end)
Broker = QA.QA_BacktestBroker()
Account.reset_assets(cash)
Account.account_cookie = 'k:' + str(k) + 's:' + str(s) + 'n:' + str(n) + 'm:' +\
'freq:' + str(freq) + 'freq_n:' + str(freq_n)
# 找出选股代码列表里,最长的列表的长度
long = df.code.dropna().groupby(level=0).apply(lambda x: len(x.values[0])).groupby(level=0).max()[0]
cash_weight = 0.6 / long # 平均每只股票分配的资金比例
for items in data.panel_gen:
# 下行用于查找买卖日期节点
if type(df.loc[items.date, 'sell_code'].values[0]) is list:
# 到节点全部清仓
for i in Account.sell_available.index.to_list():
# 可卖的股票未停牌时
if i in items.code:
order = Account.send_order(
code=i,
time=items.date,
amount=Account.sell_available.get(i, 0),
towards=QA.ORDER_DIRECTION.SELL,
price=0,
order_model=QA.ORDER_MODEL.MARKET,
amount_model=QA.AMOUNT_MODEL.BY_AMOUNT
)
# 能够成交时
if order:
Broker.receive_order(QA.QA_Event(order=order, market_data=items.select_code(i)))
trade_mes = Broker.query_orders(Account.account_cookie, 'filled')
res = trade_mes.loc[order.account_cookie, order.realorder_id]
order.trade(res.trade_id, res.trade_price, res.trade_amount, res.trade_time)
if type(df.loc[items.date, 'buy_code'].values[0]) is list:
for j in df.loc[items.date, 'buy_code'].values[0]:
if j in items.code: # 欲买的股票未停牌时
order = Account.send_order(
code=j,
time=items.date,
money=cash_weight * Account.cash_available,
# 添加ams指数权重
# money=cash_weight * df.loc[items.date, 'weight'].values[0] * Account.cash_available,
towards=QA.ORDER_DIRECTION.BUY,
price=items.select_code(j).open[0],
order_model=QA.ORDER_MODEL.MARKET,
amount_model=QA.AMOUNT_MODEL.BY_MONEY
)
if order: # 能够成交时
Broker.receive_order(QA.QA_Event(order=order, market_data=items.select_code(j)))
trade_mes = Broker.query_orders(Account.account_cookie, 'filled')
res = trade_mes.loc[order.account_cookie, order.realorder_id]
order.trade(res.trade_id, res.trade_price, res.trade_amount, res.trade_time)
Account.settle()
Risk = QA.QA_Risk(Account)
return Account, Risk
选择2015年作为模型回测时间段。质量mass我选取的是营业收入(TTM)因子,速度计算方法,前两个模型用的是普通计算模式,第三个模型用的是隔夜计算模式。
1、先用加速度模型回测,每7个交易日计算前7个交易日的加速度,选取靠近最大值的个股。
account, risk = stock_physics_backtest(start=’2015-01-01’, end=’2016-01-01’, k=300, s=289, code=None, freq=None, freq_n=7,
n=7, m=0, model=’acceleration’, kind=’oper_rev_ttm’, method=’normal’, cash=1000000)
运行结束后,运行risk.plot_assets_curve(),效果图如下:
蓝色线是沪深300基准线,当年上涨3%,策略组合为红色线,当年上涨41%
2、用动能模型回测,每7个交易日计算前7个交易日的动能,选取靠近最大值的个股。
account, risk = stock_physics_backtest(start=’2015-01-01’, end=’2016-01-01’, k=300, s=289, code=None, freq=None, freq_n=7,
n=7, m=0, model=’energy’, kind=’oper_rev_ttm’, method=’normal’, cash=1000000)
再运行risk.plot_assets_curve(),效果图如下:
年收益率27%。
3、用动量模型回测,每7个交易日计算前7个交易日的动量,选取靠近最大值的个股。注意这里计算速度用的是隔夜模式。
account, risk = stock_physics_backtest(start=’2015-01-01’, end=’2016-01-01’, k=300, s=289, code=None, freq=None, freq_n=7,
n=7, m=0, model=’momentum’, kind=’oper_rev_ttm’, method=’overnight’, cash=1000000)
risk.plot_assets_curve()
效果图如下:
年收益为17%
仅以三个简单模型为例,说明用物理模型进行金融建模是可行的,参数也并未做优化,也有很多错误,请批指正。