词频统计
需求:
读取指定目录的数据,并实现单词计数功能
实现方案:
- Spout来读取指定目录的数据,作为后续Bolt处理的input
- 使用一个Bolt把input的数据,切割开,我们按照逗号进行分割
- 使用一个Bolt来进行最终的单词次数统计操作
- 输出
涉及到文件操作,为了简单使用commons-io
<dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.4</version></dependency>在E:\stormtest 创建文件test.txt,作为我们的测试数据
内容为:
a,b,c,d
a,b,c
a,c
a,d
f,b
拓扑设计:DataSourceSpout 》SplitBolt》CountBolt
1.定义数据源DataSourceSpout
创建DataSourceSpout继承BaseRichSpout重写open、nextTuple、declareOutputFields
public static class DataSourceSpout extends BaseRichSpout{private SpoutOutputCollector spoutOutputCollector;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector = spoutOutputCollector;}@Overridepublic void nextTuple() {Collection<File> files = FileUtils.listFiles(new File("E:\\stormtest"),new String[]{"txt"},true);for (File file:files){try {List<String> lines = FileUtils.readLines(file);for (String words:lines){spoutOutputCollector.emit(new Values(words));}//此方法一直循环(nextTuple),所以要把处理完的文件改名FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis()));} catch (IOException e) {e.printStackTrace();}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("lines"));}}
2.定义SplitBolt
创建SplitBolt继承BaseRichBolt,重写prepare、execute、declareOutputFields
public static class SplitBolt extends BaseRichBolt{private OutputCollector outputCollector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.outputCollector = outputCollector;}@Overridepublic void execute(Tuple tuple) {String line = tuple.getStringByField("lines");String[] words = line.split(",");for (String word : words){outputCollector.emit(new Values(word));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word"));}}
3.定义CountBolt
创建CountBolt继承BaseRichBolt,重写prepare、execute、declareOutputFields
public static class CountBolt extends BaseRichBolt{@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {}private Map<String,Integer> map = new HashMap<>();@Overridepublic void execute(Tuple tuple) {String word = tuple.getStringByField("word");Integer count = map.get(word);if (null==count){count=0;}count++;map.put(word,count);System.out.println("=====");Set<Map.Entry<String,Integer>> entrySet = map.entrySet();for (Map.Entry<String,Integer> entry:entrySet){System.out.println(entry);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}}
4.Topology提交功能
TopologyBuilder topologyBuilder = new TopologyBuilder();topologyBuilder.setSpout("DataSourceSpout",new DataSourceSpout());topologyBuilder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("DataSourceSpout");topologyBuilder.setBolt("CountBolt",new CountBolt()).shuffleGrouping("SplitBolt");LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("LocalWordCountStormTopology",new Config(),topologyBuilder.createTopology());
5.注意
在本案例中,运行完一次后要把文件改名,因为nextTuple()方法一直执行。所以要每发送一个文件后都要给这个文件改名,避免再次发送造成统计错误。
