西南财经大学《数据库原理》实验报告
    2021年11月5日

    实验题目 Mapreduce基础编程
    选课课号 CST327
    学 院 经济信息工程学院 班 级 2019级计算机科学与技术2班
    姓 名 李文基 学 号 41912064
    理论课教师 黄宇 上机指导教师 黄宇
    实验目的及要求:
    修改WordCount源代码,使得输出的词频统计满足
    数据清洗: 对每个词的格式进行规范化(去除不以英文字母开头的所有词)
    词频少于3次的数据不在结果中显示
    结果以有限数量的“+”表示词频统计
    自定义一个Combiner类,在其reduce方法中,将相同key的所有值进行相加后乘以2输出。
    自定义一个Partitioner类和getPartition()方法,将大写字母开头的词分配到一个reducer,将小写字母开头的词分配到另一个reducer。

    完成矩阵乘法的Map函数和Reduce函数
    设计两个矩阵(3050,50100),在每个单元格中填入一个0-99的随机数,并写入两个文件中,作为Map函数的输入
    测试运行矩阵乘法的MapReduce框架,并将结果输出到新的结果文件中
    完成实验报告

    完成倒排索引的MapReduce程序代码,最终输出以下键值对形式:
    词,(文件名@位置,次数)
    自定义Combine方法,用于压缩Map输出数量。
    自定义Partition方法,对“词”进行Hash值计算并以此进行分区。
    自定义停词表,用于筛选合理词汇。
    生成一个input文件夹,搜索一个文本文件的集合,拷贝到input文件夹中。
    运行MapReduce框架将生成的倒排文档索引放入目标文件中。
    完成实验报告。

    | | | | | | 实验操作步骤:
    WordCount.java:
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    public class WordCount {
    public static class TokenizerMapper
    extends Mapper{
    private final static IntWritable one =new IntWritable(1);
    private Text word =new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    StringTokenizer itr =new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
    String words = itr.nextToken();
    if(words.matches(“^[A-Za-z]+$”)) {
    word.set(words);
    context.write(word, one);
    }

    }
    }
    }

    public static class IntSumReducer extends Reducer {
    private IntWritable result =new IntWritable();

    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
    int sum =0;
    StringBuffer b = new StringBuffer(“”);
    for (IntWritable val : values) {
    sum += val.get();
    for(int i = 0;i b.append(“+”); //以“+”代替单词出现的次数
    }

    }
    if(sum >3) {
    context.write(key, new Text(b.toString())); //输出出现次数大于3的单词
    }

    }
    }

    public static class MyCombiner extends Reducer {

    @Override
    protected void reduce(Text key, Iterable value, Reducer.Context context)
    throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : value) {
    sum += val.get();
    }
    context.write(key, new IntWritable(sum2)); //自定义Combiner,实现词频统计结果2
    }

    }

    public static class MyPartitioner extends Partitioner {
    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
    String preNum = key.toString();
    int partition = 2;
    char c = preNum.charAt(0);
    if(c>=’A’ && c<=’Z’) {
    partition = 0;
    } //第一个词大写输出part-r-00000
    if(c>=’a’ && c<=’z’) {
    partition = 1;
    } //第一个词为小写输出part-r-00001
    return partition;
    }
    }

    public static void main(String[] args) throws Exception {
    Configuration conf =new Configuration();
    String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();

    Job job =new Job(conf, “word count”); //设置一个用户定义的job名称
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class); //为job设置Mapper类
    job.setCombinerClass(MyCombiner.class); //为job设置Combiner类
    job.setReducerClass(IntSumReducer.class); //为job设置Reducer类
    job.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
    job.setOutputValueClass(IntWritable.class); //为job输出设置value类
    job.setPartitionerClass(MyPartitioner.class);
    job.setNumReduceTasks(2);
    FileInputFormat.addInputPath(job, new Path(args[0])); //为job设置输入路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));//为job设置输出路径
    System.exit(job.waitForCompletion(true) ?0 : 1); //运行job
    }
    }

    运行步骤:
    pscp D:/PuTTY/WordCount.java root@47.100.188.84:/usr/local/hadoop/wordcount
    通过cmd和putty远程上传至服务器

    编译:javac -classpath /usr/local/hadoop/share/hadoop/common/hadoop-common-2.9.2.jar:/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.9.2.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar -d /usr/local/hadoop/wordcount/ /usr/local/hadoop/wordcount/WordCount.java

    打包:jar -cvf WordCount.jar ./wordcount/.class

    运行:hadoop jar ./wordcount/WordCount.jar WordCount input output
    *注:打包和运行的路径都需要使用相对路径,否则会提示找不到主类main函数的报错!


    查看输出文件:hadoop fs -ls output 可判断运行输出是否成功以及选择结果集
    选择结果集:hadoop fs -cat output/part-r-00000
    删除输出文件:hadoop fs -rmr output 一定要进行的步骤,以免报错

    运行结果截图:

    MatrixMultiply.java:
    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class MatrixMultiply {
    / mapper和reducer需要的三个必要变量,由conf.get()方法得到 /
    public static int rowM = 0;
    public static int columnM = 0;
    public static int columnN = 0;

    public static class MatrixMapper extends Mapper {
    private Text mapKey = new Text();
    private Text mapValue = new Text();
    /
    执行map()函数前先由conf.get()得到main函数中提供的必要变量, 这也是MapReduce中共享变量的一种方式
    /
    public void setup(Context context) throws IOException {
    Configuration conf = context.getConfiguration();
    columnN = Integer.parseInt(conf.get(“columnN”));
    rowM = Integer.parseInt(conf.get(“rowM”));
    };

    public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
    /
    得到输入文件名,从而区分输入矩阵M和N /
    FileSplit fileSplit = (FileSplit) context.getInputSplit();
    String fileName = fileSplit.getPath().getName();
    String line = value.toString();
    String[] tuple = line.split(“,”);
    if (tuple.length != 2) {
    throw new RuntimeException(“MatrixMapper tuple error”);
    }
    int row = Integer.parseInt(tuple[0]);
    String[] tuples = tuple[1].split(“\t”);
    if (tuples.length != 2) {
    throw new RuntimeException(“MatrixMapper tuples error”);
    } //
    if (fileName.contains(“M”)) {
    matrixM(row, Integer.parseInt(tuples[0]), Integer.parseInt(tuples[1]), context);
    //TODO:行号i,列号j,数字Mij,根据矩阵N的任意列号k,输出(i,k)->(M,j,Mij)
    }

    else if (fileName.contains(“N”)) {
    matrixN(row, Integer.parseInt(tuples[0]), Integer.parseInt(tuples[1]), context);
    //TODO:行号j,列号k,数字Njk,根据矩阵M的任意行号i,输出(i,k)->(N,j,Njk)
    }
    };

    private void matrixM(int row, int column, int value, Context context) throws IOException, InterruptedException {
    for (int i = 1; i < columnN + 1; i++) {
    mapKey.set(row + “,” + i);
    mapValue.set(“M,” + column + “,” + value);
    context.write(mapKey, mapValue);
    }
    }

    private void matrixN(int row, int column, int value, Context context) throws IOException, InterruptedException {
    for (int i = 1; i < rowM + 1; i++) {
    mapKey.set(i + “,” + column);
    mapValue.set(“N,” + row + “,” + value);
    context.write(mapKey, mapValue);
    }
    }
    }

    public static class MatrixReducer extends Reducer {
    private int sum = 0;

    public void setup(Context context) throws IOException {
    Configuration conf = context.getConfiguration();
    columnM = Integer.parseInt(conf.get(“columnM”));
    };

    public void reduce(Text key, Iterable values, Context context)
    throws IOException, InterruptedException {
    int sum = 0;
    int[] M = new int[columnM + 1];
    int[] N = new int[columnM + 1];
    for (Text val : values) {
    String[] tuple = val.toString().split(“,”);
    if (tuple.length != 3) {
    throw new RuntimeException(“MatrixReducer tuple error”);
    }
    if (“M”.equals(tuple[0])) {
    M[Integer.parseInt(tuple[1])] = Integer.parseInt(tuple[2]);
    } else {
    N[Integer.parseInt(tuple[1])] = Integer.parseInt(tuple[2]);
    }
    }
    //获取同一个key=(i,k)下,Mij=M[j]和Njk=N[j]

    /
    根据j值,对M[j]和N[j]进行相乘累加得到乘积矩阵的数据 /
    for (int j = 1; j < columnM + 1; j++) {
    sum += M[j] * N[j];
    }
    context.write(key, new Text(Integer.toString(sum)));
    sum = 0;
    }
    }

    /

    main函数


    Usage:


    MatrixMultiply inputPathM inputPathN outputPath


    从输入文件名称中得到矩阵M的行数和列数,以及矩阵N的列数,作为重要参数传递给mapper和reducer

    @param args 输入文件目录地址M和N以及输出目录地址

    @throws Exception
    /

    public static void main(String[] args) throws Exception {

    if (args.length != 3) {
    System.err
    .println(“Usage: MatrixMultiply “);
    System.exit(2);
    } else {
    String[] infoTupleM = args[0].split(““);
    rowM = Integer.parseInt(infoTupleM[1]);
    columnM = Integer.parseInt(infoTupleM[2]);
    String[] infoTupleN = args[1].split(“
    “);
    columnN = Integer.parseInt(infoTupleN[2]);
    }

    Configuration conf = new Configuration();
    / 设置三个全局共享变量 /
    conf.setInt(“rowM”, rowM);
    conf.setInt(“columnM”, columnM);
    conf.setInt(“columnN”, columnN);

    Job job = new Job(conf, “MatrixMultiply”);
    job.setJarByClass(MatrixMultiply.class);
    job.setMapperClass(MatrixMapper.class);
    job.setReducerClass(MatrixReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]), new Path(args[1]));
    FileOutputFormat.setOutputPath(job, new Path(args[2]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    }
    编译运行步骤同上,但需提前建好题目所需矩阵:
    1.sudo chmod +x ./genMatrix.sh (添加执行权限)
    2./genMatrix.sh 30 50 100 (生成3050和50100的两个矩阵文件:M_30_50, N_50_100)
    3.将两个矩阵文件从本地发送至hdfs文件系统:hdfs dfs -put ./M_30_50 input
    注:有两个输入hadoop jar MatrixMultiply.jar MatrixMultiply input/M_30_50 input/N_50_100 output


    运行结果截图:

    matrix.png



    SimpleInvertedIndex.java:
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;

    public class SimpleInvertedIndex {

    public static class Map extends Mapper {
    private Text keyInfo = new Text(); // 存储单词和URL组合
    private Text valueInfo = new Text(); // 存储词频
    private FileSplit split; // 存储Split对象
    // 实现map函数
    public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
    split = (FileSplit) context.getInputSplit();
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
    int splitIndex = split.getPath().toString().indexOf(“test”);
    keyInfo.set(itr.nextToken() + “:”
    + split.getPath().toString().substring(splitIndex));
    valueInfo.set(“1”);
    context.write(keyInfo, valueInfo);
    }
    }
    }
    public static class Combine extends Reducer {
    private Text info = new Text();
    // 实现reduce函数
    public void reduce(Text key, Iterable values, Context context)
    throws IOException, InterruptedException {
    // System.out.println(key);
    // 统计词频
    int sum = 0;
    for (Text value : values) {
    sum += Integer.parseInt(value.toString());
    }
    int splitIndex = key.toString().indexOf(“:”);
    // 重新设置value值由URL和词频组成
    info.set(key.toString().substring(splitIndex + 1) + “,” + sum);
    // 重新设置key值为单词
    key.set(key.toString().substring(0, splitIndex));
    context.write(key, info);
    //System.out.println(key+” “+info);
    }
    }
    public static class Reduce extends Reducer {
    private Text result = new Text();
    // 实现reduce函数
    public void reduce(Text key, Iterable values, Context context)
    throws IOException, InterruptedException {
    // System.out.println(key);
    // 生成文档列表
    String fileList = new String();
    for (Text value : values) {
    fileList += value.toString() + “;”;
    }
    result.set(fileList);
    context.write(key, result);
    System.out.println(key+” “+result);
    }
    }



    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    Job job = new Job(conf, “Inverted Index”);
    job.setJarByClass(SimpleInvertedIndex.class);
    job.setMapperClass(Map.class);
    job.setCombinerClass(Combine.class);
    job.setReducerClass(Reduce.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    }


    编译运行步骤同上

    运行结果截图:


    | | | | | | 教师评语 |
    | | | | | 成 绩 |
    | | | |