任务介绍
英雄联盟(League of Legends,LoL)是一个多人在线竞技游戏,由拳头游戏(Riot Games)公司出品。在游戏中,每位玩家控制一位有独特技能的英雄,红蓝两支队伍各有五位玩家进行对战,目标是摧毁对方的基地水晶。水晶有多座防御塔保护,通常需要先摧毁一些防御塔再摧毁水晶。玩家所控制的英雄起初非常弱,需要不断击杀小兵、野怪和对方英雄来获得金币、经验。经验可以提升英雄等级和技能等级,金币可以用来购买装备提升攻击、防御等属性。对战过程中一般没有己方单位在附近的地点是没有视野的,即无法看到对面单位,双方可以通过使用守卫来监视某个地点,洞察对面走向、制定战术。
本数据集来自Kaggle,包含了9879场钻一到大师段位的单双排对局,对局双方几乎是同一水平。每条数据是前10分钟的对局情况,每支队伍有19个特征,红蓝双方共38个特征。这些特征包括英雄击杀、死亡,金钱、经验、等级情况等等。一局游戏一般会持续30至40分钟,但是实际前10分钟的局面很大程度上影响了之后胜负的走向。作为最成功的电子竞技游戏之一,对局数据、选手数据的量化与研究具有重要意义,可以启发游戏将来的发展和改进。
Powered by a@antior.cn
本任务是希望同学们依据注释的要求,对代码中空缺部分进行填写,完成决策树模型的详细实现,根据已有的对局前10分钟特征信息,预测最后获胜方是蓝色方还是红色方,了解执行一个机器学习任务的大致流程,并提交代码和实验报告。第一次作业也是一个机器学习小实验的例子,之后的作业可能不再提供预处理等流程代码,由同学们自己设计实验完成代码编写。
导入工具包
pandas是数据分析和处理常用的工具包,非常适合处理行列表格数据。numpy是数学运算工具包,支持高效的矩阵、向量运算。sklearn是机器学习常用工具包,包括了一些已经实现好的简单模型和一些常用数据处理方法、评价指标等函数。
from collections import Counter
import pandas as pd # 数据处理
import numpy as np # 数学运算
from sklearn.model_selection import train_test_split, cross_validate # 划分数据集函数
from sklearn.metrics import accuracy_score # 准确率函数
RANDOM_SEED = 2020 # 固定随机种子
读入数据
假设数据文件放在./data/
目录下,标准的csv文件可以用pandas里的read_csv()
函数直接读入。文件共有40列,38个特征(红蓝方各19),1个标签列(blueWins),和一个对局标号(gameId)。对局标号不是标签也不是特征,可以舍去。
csv_data = './data/high_diamond_ranked_10min.csv' # 数据路径
data_df = pd.read_csv(csv_data, sep=',') # 读入csv文件为pandas的DataFrame
data_df = data_df.drop(columns='gameId') # 舍去对局标号列
数据概览
对于一个机器学习问题,在拿到任务和数据后,首先需要观察数据的情况,比如我们可以通过.iloc[0]
取出数据的第一行并输出。不难看出每个特征都存成了float64浮点数,该对局蓝色方开局10分钟有小优势。同时也可以发现有些特征列是重复冗余的,比如blueGoldDiff表示蓝色队金币优势,redGoldDiff表示红色方金币优势,这两个特征是完全对称的互为相反数。blueCSPerMin是蓝色方每分钟击杀小兵数,它乘10就是10分钟所有小兵击杀数blueTotalMinionsKilled。在之后的特征处理过程中可以考虑去除这些冗余特征。
另外,pandas有非常方便的describe()
函数,可以直接通过DataFrame进行调用,可以展示每一列数据的一些统计信息,对数据分布情况有大致了解,比如blueKills蓝色方击杀英雄数在前十分钟的平均数是6.14、方差为2.93,中位数是6,百分之五十以上的对局中该特征在4-8之间,等等。
print(data_df.iloc[0]) # 输出第一行数据
data_df.describe() # 每列特征的简单统计信息
增删特征
传统的机器学习模型大部分都是基于特征的,因此特征工程是机器学习中非常重要的一步。有时构造一个好的特征比改进一个模型带来的提升更大。这里简单展示一些特征处理的例子。首先,上面提到,特征列中有些特征信息是完全冗余的,会给模型带来不必要的计算量,可以去除。其次,相比于红蓝双方击杀、助攻的绝对值,可能双方击杀英雄的差值更能体现出当前对战的局势。因此,我们可以构造红蓝双方对应特征的差值。数据文件中已有的差值是金币差GoldDiff和经验差ExperienceDiff,实际上每个对应特征都可以构造这样的差值特征。
drop_features = ['blueGoldDiff', 'redGoldDiff',
'blueExperienceDiff', 'redExperienceDiff',
'blueCSPerMin', 'redCSPerMin',
'blueGoldPerMin', 'redGoldPerMin'] # 需要舍去的特征列
df = data_df.drop(columns=drop_features) # 舍去特征列
info_names = [c[3:] for c in df.columns if c.startswith('red')] # 取出要作差值的特征名字(除去red前缀)
drop_features = []
for info in info_names: # 对于每个特征名字
df['br' + info] = df['blue' + info] - df['red' + info] # 构造一个新的特征,由蓝色特征减去红色特征,前缀为br
drop_features.append('red' + info)
# 其中FirstBlood为首次击杀最多有一只队伍能获得,brFirstBlood=1为蓝,0为没有产生,-1为红
drop_features.extend(['blueFirstBlood','redFirstBlood'])
#print(drop_features)
for c in drop_features:
print(c)
try:
df.drop(columns=c, inplace=True)
except:
continue
#df = df.drop(columns=drop_features_red)
#df = df.drop(columns=['blueFirstBlood', 'redFirstBlood']) # 原有的FirstBlood可删除
重新打印数据
print(df.iloc[0]) # 输出第一行数据
df.describe() # 每列特征的简单统计信息
特征离散化
决策树ID3算法一般是基于离散特征的,本例中存在很多连续的数值特征,例如队伍金币。直接应用该算法每个值当作一个该特征的一个取值可能造成严重的过拟合,因此需要对特征进行离散化,即将一定范围内的值映射成一个值,例如对用户年龄特征,将0-10映射到0,11-18映射到1,19-25映射到2,25-30映射到3,等等类似,然后在决策树构建时使用映射后的值计算信息增益。
本小节要求实现特征离散化,请补全相关代码
discrete_df = df.copy() # 先复制一份数据
def get_discrete_conf(df):
Sheet_discrete_list = []
for c in df.columns: # 遍历每一列特征,跳过标签列(label)
if c == 'blueWins':
continue
Sheet_discrete_list.append([c, get_discrete_conf_c(df, c)])
Sheet_discrete_dict = dict(Sheet_discrete_list)
print(Sheet_discrete_dict)
return Sheet_discrete_dict
def get_discrete_conf_c(df, c, field_num = 5):
min_ = min(df[c])
max_ = max(df[c])
field = []
action = 1
normal = 0
if len(set(list(df[c])))<=5:
action = 0
discrete_conf = {
"action" : action,
"normal" : normal,
"min" : min_,
"max" : max_,
#"type" : type_,
"field" : field,
}
return discrete_conf
field = [-500, min(min(df[c]), df.quantile(.25)[c]), max(min(df[c]), df.quantile(.25)[c]), df.quantile(.5)[c], df.quantile(.75)[c]]
discrete_conf = {
"action" : action,
"normal" : normal,
"min" : min_,
"max" : max_,
#"type" : type_,
"field" : field,
}
return discrete_conf
def get_discrete_column(discrete_conf, c, df_c):
if discrete_conf[c]['action'] == 0:
return df_c
values = discrete_conf[c]["field"]
for i in range(len(df_c)):
for j in values:
if df_c[i]>=j:
df_c[i] = j
return df_c
def get_discrete_data(discrete_conf, df):
for c in df.columns:
if c not in discrete_conf:
continue
discrete_column = get_discrete_column(discrete_conf, c, list(df[c]))
df[c] = discrete_column
return df
discrete_conf = get_discrete_conf(df)
discrete_df = get_discrete_data(discrete_conf, df)
print(discrete_df)
#discrete_df.drop(labels='blueWins',axis=1)
'''for c in df.columns[1:]: # 遍历每一列特征,跳过标签列(label)
print(c)
print(Sheet_discrete_dict[c])
for index, row in df.iterrows():
#print(index)
#print(row[c])
tmp = get_discrete_value(Sheet_discrete_dict[c], row[c])
#print(tmp)
discrete_df[c][index] = tmp
discrete_df.to_csv('data.csv')'''
数据集准备
构建机器学习模型前要构建训练和测试的数据集。在本例中首先需要分开标签和特征,标签是不能作为模型的输入特征的,就好比作业和试卷答案不能在做题和考试前就告诉学生。测试一个模型在一个任务上的效果至少需要训练集和测试集,训练集用来训练模型的参数,好比学生做作业获得知识,测试集用来测试模型效果,好比期末考试考察学生学习情况。测试集的样本不应该出现在训练集中,否则会造成模型效果估计偏高,好比考试时出的题如果是作业题中出现过的,会造成考试分数不能准确衡量学生的学习情况,估计值偏高。划分训练集和测试集有多种方法,下面首先介绍的是随机取一部分如20%作测试集,剩下作训练集。sklearn提供了相关工具函数train_test_split
。sklearn的输入输出一般为numpy的array矩阵,需要先将pandas的DataFrame取出为numpy的array矩阵。
all_y = discrete_df['blueWins'].values # 所有标签数据
feature_names = discrete_df.columns[1:] # 所有特征的名称
print("feature_names:",feature_names)
print("feature_len:",len(feature_names))
all_x = discrete_df[feature_names].values # 所有原始特征值,pandas的DataFrame.values取出为numpy的array矩阵
# 划分训练集和测试集
x_train, x_test, y_train, y_test = train_test_split(all_x, all_y, test_size=0.2, random_state=RANDOM_SEED)
all_y.shape, all_x.shape, x_train.shape, x_test.shape, y_train.shape, y_test.shape # 输出数据行列信息
决策树模型的实现
本小节要求实现决策树模型,请补全算法代码
# 定义决策树类
from math import log
import operator
def majorityCnt(classList):
classCount = {}
for vote in classList: #统计classList中每个元素出现的次数
if vote not in classCount.keys():
classCount[vote] = 0
classCount[vote] += 1
sortedClassCount = sorted(classCount.items(), key = operator.itemgetter(1), reverse = True) #根据字典的值降序排序
return sortedClassCount[0][0] #返回classList中出现次数最多的元素
def splitDataSet(dataSet, axis, value):
retDataSet = [] #创建返回的数据集列表
for featVec in dataSet: #遍历数据集
if featVec[axis] == value:
reducedFeatVec = featVec[:axis] #去掉axis特征
reducedFeatVec.extend(featVec[axis+1:]) #将符合条件的添加到返回的数据集
retDataSet.append(reducedFeatVec)
return retDataSet #返回划分后的数据集
def calcShannonEnt(dataSet):
numEntires = len(dataSet) #返回数据集的行数
labelCounts = {} #保存每个标签(Label)出现次数的字典
for featVec in dataSet: #对每组特征向量进行统计
currentLabel = featVec[-1] #提取标签(Label)信息
if currentLabel not in labelCounts.keys(): #如果标签(Label)没有放入统计次数的字典,添加进去
labelCounts[currentLabel] = 0
labelCounts[currentLabel] += 1 #Label计数
shannonEnt = 0.0 #经验熵(香农熵)
for key in labelCounts: #计算香农熵
prob = float(labelCounts[key]) / numEntires #选择该标签(Label)的概率
shannonEnt -= prob * log(prob, 2) #利用公式计算
return shannonEnt #返回经验熵(香农熵)
def chooseBestFeatureToSplit(dataSet):
numFeatures = len(dataSet[0]) - 1 #特征数量
baseEntropy = calcShannonEnt(dataSet) #计算数据集的香农熵
bestInfoGain = 0.0 #信息增益
bestFeature = -1 #最优特征的索引值
for i in range(numFeatures): #遍历所有特征
#获取dataSet的第i个所有特征
featList = [example[i] for example in dataSet]
uniqueVals = set(featList) #创建set集合{},元素不可重复
newEntropy = 0.0 #经验条件熵
for value in uniqueVals: #计算信息增益
subDataSet = splitDataSet(dataSet, i, value) #subDataSet划分后的子集
#print(subDataSet)
prob = len(subDataSet) / float(len(dataSet)) #计算子集的概率
newEntropy += prob * calcShannonEnt(subDataSet) #根据公式计算经验条件熵
infoGain = baseEntropy - newEntropy #信息增益
#print("第%d个特征的增益为%.3f" % (i, infoGain)) #打印每个特征的信息增益
if (infoGain > bestInfoGain): #计算信息增益
bestInfoGain = infoGain #更新信息增益,找到最大的信息增益
bestFeature = i #记录信息增益最大的特征的索引值
return bestFeature #返回信息增益最大的特征的索引值
def traverse_node(inputTree, featLabels, testVec):
firstStr = next(iter(inputTree)) #获取决策树结点
secondDict = inputTree[firstStr] #下一个字典
featIndex = featLabels.index(firstStr)
for key in secondDict.keys():
if testVec[featIndex] == key:
if type(secondDict[key]).__name__ == 'dict':
classLabel = traverse_node(secondDict[key], featLabels, testVec)
else:
classLabel = secondDict[key]
return classLabel
class DecisionTree(object):
def __init__(self, classes, features,
max_depth=10, min_samples_split=10,
impurity_t='entropy'
):
'''
传入一些可能用到的模型参数,也可能不会用到
classes表示模型分类总共有几类
features是每个特征的名字,也方便查询总的共特征数
max_depth表示构建决策树时的最大深度
min_samples_split表示构建决策树分裂节点时,如果到达该节点的样本数小于该值则不再分裂
impurity_t表示计算混杂度(不纯度)的计算方式,例如entropy或gini
'''
self.classes = classes
self.features = features
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.impurity_t = impurity_t
self.root = None # 定义根节点,未训练时为空
'''
请实现决策树算法,使得fit函数和predict函数可以正常调用,跑通之后的测试代码,
要求之后测试代码输出的准确率大于0.6。
提示:
可以定义额外一些函数,例如
impurity()用来计算混杂度
gain()调用impurity用来计算信息增益
expand_node()训练时递归函数分裂节点,考虑不同情况
1. 无需分裂 或 达到分裂阈值
2. 调用gain()找到最佳分裂特征,递归调用expand_node
3. 找不到有用的分裂特征
fit函数调用该函数返回根节点
traverse_node()预测时遍历节点,考虑不同情况
1. 已经到达叶节点,则返回分类结果
2. 该特征取值在训练集中未出现过
3. 依据特征取值进入相应子节点,递归调用traverse_node
当然也可以有其他实现方式。
'''
def fit(self, dataSet, labels, featLabels = []):
print(len(labels))
print(len(dataSet[0]))
print(dataSet[0])
assert len(labels) == (len(dataSet[0])-1) # 输入数据的特征数目应该和模型定义时的特征数目相同
classList = [example[-1] for example in dataSet]
if len(dataSet[0]) <= self.min_samples_split: #遍历完所有特征时返回出现次数最多的类标签
return majorityCnt(classList)
if (len(self.features)-len(labels))>=self.max_depth-2:
return majorityCnt(classList)
if classList.count(classList[0]) == len(classList): #如果类别完全相同则停止继续划分
return classList[0]
if len(dataSet[0]) == 1: #遍历完所有特征时返回出现次数最多的类标签
return majorityCnt(classList)
bestFeat = chooseBestFeatureToSplit(dataSet) #选择最优特征
bestFeatLabel = labels[bestFeat] #最优特征的标签
featLabels.append(bestFeatLabel)
myTree = {bestFeatLabel:{}} #根据最优特征的标签生成树
del(labels[bestFeat]) #删除已经使用特征标签
featValues = [example[bestFeat] for example in dataSet] #得到训练集中所有最优特征的属性值
uniqueVals = set(featValues) #去掉重复的属性值
for value in uniqueVals:
subLabels=labels[:]
#递归调用函数createTree(),遍历特征,创建决策树。
myTree[bestFeatLabel][value] = DT.fit(splitDataSet(dataSet, bestFeat, value), subLabels, featLabels)
return myTree
def predict(self, inputTree, features, dataSet):
assert len(dataSet.shape) == 1 or len(dataSet.shape) == 2 # 只能是1维或2维
if len(dataSet.shape) == 1:
return traverse_node(inputTree, features, dataSet)
else:
return np.array([traverse_node(inputTree, features, f) for f in dataSet])
'''
预测
输入feature可以是一个一维numpy数组也可以是一个二维numpy数组
如果是一维numpy(m)数组则是一个样本,包含m个特征,返回一个类别值
如果是二维numpy(n*m)数组则表示n个样本,每个样本包含m个特征,返回一个numpy一维数组
提示:一种可能的实现方式为
if len(feature.shape) == 1: # 如果是一个样本
return self.traverse_node(self.root, feature) # 从根节点开始路由
return np.array([self.traverse_node(self.root, f) for f in feature]) # 如果是很多个样本
'''
# 定义决策树模型,传入算法参数
DT = DecisionTree(classes=[0,1], features=feature_names.tolist(), max_depth=5, min_samples_split=10, impurity_t='gini')
featLabels = []
#myTree = createTree(dataSet, label, features, featLabels)
#features = feature_names
#print(len(features))
#dataSet = list(x_train)
#print(len(dataSet[0]))
dataSet = np.hstack([np.matrix(x_train), np.matrix(y_train).T])
print(dataSet.shape)
dataSet = dataSet.tolist()
print(len(feature_names))
myTree = DT.fit(dataSet, feature_names.tolist()) # 在训练集上训练
print(myTree)
p_test = DT.predict(myTree, list(feature_names), x_test) # 在测试集上预测,获得预测值
print(p_test) # 输出预测值
#test_acc = accuracy_score(p_test, y_test) # 将测试预测值与测试集标签对比获得准确率
#print('accuracy: {:.4f}'.format(test_acc)) # 输出准确率
模型效果计算
第一次模型测试结果可能不够好,可以先检查调试代码是否有bug,再尝试调整参数或者优化计算方法。
Err = p_test-y_test
ErrNum = 0
for i in range(len(Err)):
if Err[i] != 0:
ErrNum = ErrNum + 1
ErrRate = float(ErrNum)/len(Err)
print(ErrRate)
print(1-ErrRate)