西南财经大学《数据库原理》实验报告 2021年11月5日
实验题目
Mapreduce基础编程
选课课号
CST327
学 院
经济信息工程学院
班 级
2019级计算机科学与技术2班
姓 名
李文基
学 号
41912064
理论课教师
黄宇
上机指导教师
黄宇
实验目的及要求: 修改WordCount源代码,使得输出的词频统计满足 数据清洗: 对每个词的格式进行规范化(去除不以英文字母开头的所有词) 词频少于3次的数据不在结果中显示 结果以有限数量的“+”表示词频统计 自定义一个Combiner类,在其reduce方法中,将相同key的所有值进行相加后乘以2输出。 自定义一个Partitioner类和getPartition()方法,将大写字母开头的词分配到一个reducer,将小写字母开头的词分配到另一个reducer。
完成矩阵乘法的Map函数和Reduce函数 设计两个矩阵(3050,50 100),在每个单元格中填入一个0-99的随机数,并写入两个文件中,作为Map函数的输入 测试运行矩阵乘法的MapReduce框架,并将结果输出到新的结果文件中 完成实验报告
完成倒排索引的MapReduce程序代码,最终输出以下键值对形式: 词,(文件名@位置,次数) 。自定义Combine方法,用于压缩Map输出数量。 自定义Partition方法,对“词”进行Hash值计算并以此进行分区。 自定义停词表,用于筛选合理词汇。 生成一个input文件夹,搜索一个文本文件的集合,拷贝到input文件夹中。 运行MapReduce框架将生成的倒排文档索引放入目标文件中。 完成实验报告。
| | | | |
| 实验操作步骤: WordCount.java: import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper{ private final static IntWritable one =new IntWritable(1); private Text word =new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr =new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { String words = itr.nextToken(); if(words.matches(“^[A-Za-z]+$”)) { word.set(words); context.write(word, one); } } } } public static class IntSumReducer extends Reducer { private IntWritable result =new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum =0; StringBuffer b = new StringBuffer(“”); for (IntWritable val : values) { sum += val.get(); for(int i = 0;i b.append(“+”); //以“+”代替单词出现的次数 } } if(sum >3) { context.write(key, new Text(b.toString())); //输出出现次数大于3的单词 } } } public static class MyCombiner extends Reducer { @Override protected void reduce(Text key, Iterable value, Reducer.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : value) { sum += val.get(); } context.write(key, new IntWritable(sum2)); //自定义Combiner,实现词频统计结果 2 } } public static class MyPartitioner extends Partitioner { @Override public int getPartition(Text key, IntWritable value, int numPartitions) { String preNum = key.toString(); int partition = 2; char c = preNum.charAt(0); if(c>=’A’ && c<=’Z’) { partition = 0; } //第一个词大写输出part-r-00000 if(c>=’a’ && c<=’z’) { partition = 1; } //第一个词为小写输出part-r-00001 return partition; } } public static void main(String[] args) throws Exception { Configuration conf =new Configuration(); String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs(); Job job =new Job(conf, “word count”); //设置一个用户定义的job名称 job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); //为job设置Mapper类 job.setCombinerClass(MyCombiner.class); //为job设置Combiner类 job.setReducerClass(IntSumReducer.class); //为job设置Reducer类 job.setOutputKeyClass(Text.class); //为job的输出数据设置Key类 job.setOutputValueClass(IntWritable.class); //为job输出设置value类 job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(2); FileInputFormat.addInputPath(job, new Path(args[0])); //为job设置输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1]));//为job设置输出路径 System.exit(job.waitForCompletion(true) ?0 : 1); //运行job } } 运行步骤: pscp D:/PuTTY/WordCount.java root@47.100.188.84:/usr/local/hadoop/wordcount 通过cmd和putty远程上传至服务器 编译:javac -classpath /usr/local/hadoop/share/hadoop/common/hadoop-common-2.9.2.jar:/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.9.2.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar -d /usr/local/hadoop/wordcount/ /usr/local/hadoop/wordcount/WordCount.java 打包:jar -cvf WordCount.jar ./wordcount/.class 运行:hadoop jar ./wordcount/WordCount.jar WordCount input output*注:打包和运行的路径都需要使用相对路径,否则会提示找不到主类main函数的报错! 查看输出文件:hadoop fs -ls output 可判断运行输出是否成功以及选择结果集 选择结果集:hadoop fs -cat output/part-r-00000 删除输出文件:hadoop fs -rmr output 一定要进行的步骤,以免报错 运行结果截图:
MatrixMultiply.java: import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MatrixMultiply { / mapper和reducer需要的三个必要变量,由conf.get()方法得到 / public static int rowM = 0; public static int columnM = 0; public static int columnN = 0; public static class MatrixMapper extends Mapper { private Text mapKey = new Text(); private Text mapValue = new Text(); / 执行map()函数前先由conf.get()得到main函数中提供的必要变量, 这也是MapReduce中共享变量的一种方式 / public void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); columnN = Integer.parseInt(conf.get(“columnN”)); rowM = Integer.parseInt(conf.get(“rowM”)); }; public void map(Object key, Text value, Context context) throws IOException, InterruptedException { / 得到输入文件名,从而区分输入矩阵M和N / FileSplit fileSplit = (FileSplit) context.getInputSplit(); String fileName = fileSplit.getPath().getName(); String line = value.toString(); String[] tuple = line.split(“,”); if (tuple.length != 2) { throw new RuntimeException(“MatrixMapper tuple error”); } int row = Integer.parseInt(tuple[0]); String[] tuples = tuple[1].split(“\t”); if (tuples.length != 2) { throw new RuntimeException(“MatrixMapper tuples error”); } // if (fileName.contains(“M”)) { matrixM(row, Integer.parseInt(tuples[0]), Integer.parseInt(tuples[1]), context); //TODO:行号i,列号j,数字Mij,根据矩阵N的任意列号k,输出(i,k)->(M,j,Mij) } else if (fileName.contains(“N”)) { matrixN(row, Integer.parseInt(tuples[0]), Integer.parseInt(tuples[1]), context); //TODO:行号j,列号k,数字Njk,根据矩阵M的任意行号i,输出(i,k)->(N,j,Njk) } }; private void matrixM(int row, int column, int value, Context context) throws IOException, InterruptedException { for (int i = 1; i < columnN + 1; i++) { mapKey.set(row + “,” + i); mapValue.set(“M,” + column + “,” + value); context.write(mapKey, mapValue); } } private void matrixN(int row, int column, int value, Context context) throws IOException, InterruptedException { for (int i = 1; i < rowM + 1; i++) { mapKey.set(i + “,” + column); mapValue.set(“N,” + row + “,” + value); context.write(mapKey, mapValue); } } } public static class MatrixReducer extends Reducer { private int sum = 0; public void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); columnM = Integer.parseInt(conf.get(“columnM”)); }; public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; int[] M = new int[columnM + 1]; int[] N = new int[columnM + 1]; for (Text val : values) { String[] tuple = val.toString().split(“,”); if (tuple.length != 3) { throw new RuntimeException(“MatrixReducer tuple error”); } if (“M”.equals(tuple[0])) { M[Integer.parseInt(tuple[1])] = Integer.parseInt(tuple[2]); } else { N[Integer.parseInt(tuple[1])] = Integer.parseInt(tuple[2]); } } //获取同一个key=(i,k)下,Mij=M[j]和Njk=N[j] / 根据j值,对M[j]和N[j]进行相乘累加得到乘积矩阵的数据 / for (int j = 1; j < columnM + 1; j++) { sum += M[j] * N[j]; } context.write(key, new Text(Integer.toString(sum))); sum = 0; } } / main函数
Usage:
MatrixMultiply inputPathM inputPathN outputPath
从输入文件名称中得到矩阵M的行数和列数,以及矩阵N的列数,作为重要参数传递给mapper和reducer @param args 输入文件目录地址M和N以及输出目录地址 @throws Exception / public static void main(String[] args) throws Exception { if (args.length != 3) { System.err .println(“Usage: MatrixMultiply “); System.exit(2); } else { String[] infoTupleM = args[0].split(““); rowM = Integer.parseInt(infoTupleM[1]); columnM = Integer.parseInt(infoTupleM[2]); String[] infoTupleN = args[1].split(“ “); columnN = Integer.parseInt(infoTupleN[2]); } Configuration conf = new Configuration(); / 设置三个全局共享变量 / conf.setInt(“rowM”, rowM); conf.setInt(“columnM”, columnM); conf.setInt(“columnN”, columnN); Job job = new Job(conf, “MatrixMultiply”); job.setJarByClass(MatrixMultiply.class); job.setMapperClass(MatrixMapper.class); job.setReducerClass(MatrixReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0]), new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 编译运行步骤同上,但需提前建好题目所需矩阵: 1.sudo chmod +x ./genMatrix.sh (添加执行权限) 2./genMatrix.sh 30 50 100 (生成3050和50 100的两个矩阵文件:M_30_50, N_50_100) 3.将两个矩阵文件从本地发送至hdfs文件系统:hdfs dfs -put ./M_30_50 input 注:有两个输入hadoop jar MatrixMultiply.jar MatrixMultiply input/M_30_50 input/N_50_100 output 运行结果截图:
SimpleInvertedIndex.java: import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class SimpleInvertedIndex { public static class Map extends Mapper { private Text keyInfo = new Text(); // 存储单词和URL组合 private Text valueInfo = new Text(); // 存储词频 private FileSplit split; // 存储Split对象 // 实现map函数 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { split = (FileSplit) context.getInputSplit(); StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { int splitIndex = split.getPath().toString().indexOf(“test”); keyInfo.set(itr.nextToken() + “:” + split.getPath().toString().substring(splitIndex)); valueInfo.set(“1”); context.write(keyInfo, valueInfo); } } } public static class Combine extends Reducer { private Text info = new Text(); // 实现reduce函数 public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // System.out.println(key); // 统计词频 int sum = 0; for (Text value : values) { sum += Integer.parseInt(value.toString()); } int splitIndex = key.toString().indexOf(“:”); // 重新设置value值由URL和词频组成 info.set(key.toString().substring(splitIndex + 1) + “,” + sum); // 重新设置key值为单词 key.set(key.toString().substring(0, splitIndex)); context.write(key, info); //System.out.println(key+” “+info); } } public static class Reduce extends Reducer { private Text result = new Text(); // 实现reduce函数 public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // System.out.println(key); // 生成文档列表 String fileList = new String(); for (Text value : values) { fileList += value.toString() + “;”; } result.set(fileList); context.write(key, result); System.out.println(key+” “+result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, “Inverted Index”); job.setJarByClass(SimpleInvertedIndex.class); job.setMapperClass(Map.class); job.setCombinerClass(Combine.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 编译运行步骤同上 运行结果截图: | | | | |
| 教师评语 | | | | |
| 成 绩 | | | | |