词频统计

需求:

读取指定目录的数据,并实现单词计数功能

实现方案:

  1. Spout来读取指定目录的数据,作为后续Bolt处理的input
  2. 使用一个Bolt把input的数据,切割开,我们按照逗号进行分割
  3. 使用一个Bolt来进行最终的单词次数统计操作
  4. 输出

涉及到文件操作,为了简单使用commons-io

  1. <dependency>
  2. <groupId>commons-io</groupId>
  3. <artifactId>commons-io</artifactId>
  4. <version>2.4</version>
  5. </dependency>

在E:\stormtest 创建文件test.txt,作为我们的测试数据

内容为:

a,b,c,d
a,b,c
a,c
a,d
f,b

拓扑设计:DataSourceSpout 》SplitBolt》CountBolt

1.定义数据源DataSourceSpout

创建DataSourceSpout继承BaseRichSpout重写opennextTupledeclareOutputFields

  1. public static class DataSourceSpout extends BaseRichSpout{
  2. private SpoutOutputCollector spoutOutputCollector;
  3. @Override
  4. public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  5. this.spoutOutputCollector = spoutOutputCollector;
  6. }
  7. @Override
  8. public void nextTuple() {
  9. Collection<File> files = FileUtils.listFiles(new File("E:\\stormtest"),new String[]{"txt"},true);
  10. for (File file:files){
  11. try {
  12. List<String> lines = FileUtils.readLines(file);
  13. for (String words:lines){
  14. spoutOutputCollector.emit(new Values(words));
  15. }
  16. //此方法一直循环(nextTuple),所以要把处理完的文件改名
  17. FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis()));
  18. } catch (IOException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. @Override
  24. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  25. outputFieldsDeclarer.declare(new Fields("lines"));
  26. }
  27. }

2.定义SplitBolt

创建SplitBolt继承BaseRichBolt,重写prepareexecutedeclareOutputFields

  1. public static class SplitBolt extends BaseRichBolt{
  2. private OutputCollector outputCollector;
  3. @Override
  4. public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  5. this.outputCollector = outputCollector;
  6. }
  7. @Override
  8. public void execute(Tuple tuple) {
  9. String line = tuple.getStringByField("lines");
  10. String[] words = line.split(",");
  11. for (String word : words){
  12. outputCollector.emit(new Values(word));
  13. }
  14. }
  15. @Override
  16. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  17. outputFieldsDeclarer.declare(new Fields("word"));
  18. }
  19. }

3.定义CountBolt

创建CountBolt继承BaseRichBolt,重写prepareexecutedeclareOutputFields

  1. public static class CountBolt extends BaseRichBolt{
  2. @Override
  3. public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  4. }
  5. private Map<String,Integer> map = new HashMap<>();
  6. @Override
  7. public void execute(Tuple tuple) {
  8. String word = tuple.getStringByField("word");
  9. Integer count = map.get(word);
  10. if (null==count){
  11. count=0;
  12. }
  13. count++;
  14. map.put(word,count);
  15. System.out.println("=====");
  16. Set<Map.Entry<String,Integer>> entrySet = map.entrySet();
  17. for (Map.Entry<String,Integer> entry:entrySet){
  18. System.out.println(entry);
  19. }
  20. }
  21. @Override
  22. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  23. }
  24. }

4.Topology提交功能

  1. TopologyBuilder topologyBuilder = new TopologyBuilder();
  2. topologyBuilder.setSpout("DataSourceSpout",new DataSourceSpout());
  3. topologyBuilder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("DataSourceSpout");
  4. topologyBuilder.setBolt("CountBolt",new CountBolt()).shuffleGrouping("SplitBolt");
  5. LocalCluster localCluster = new LocalCluster();
  6. localCluster.submitTopology("LocalWordCountStormTopology",new Config(),topologyBuilder.createTopology());

5.注意

在本案例中,运行完一次后要把文件改名,因为nextTuple()方法一直执行。所以要每发送一个文件后都要给这个文件改名,避免再次发送造成统计错误。