数据获取
package com.jdxia.storm;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class MySplitBolt extends BaseBasicBolt {//处理函数@Overridepublic void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {//1. 数据用tuple获取//和kafka接入,这边的名字就变为bytes了byte[] juzi = (byte[]) tuple.getValueByField("bytes");//2. 进行切割String[] strings = new String(juzi).split(" ");//3. 发送数据for (String string : strings) {basicOutputCollector.emit(new Values(string, 1));}}//定义下我的输出@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word", "num"));}}
数据计算
package com.jdxia.storm;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Tuple;import redis.clients.jedis.Jedis;import java.util.HashMap;import java.util.Map;public class MyWordCountAndPrintBolt extends BaseBasicBolt {private Map<String, String> wordCountMap = new HashMap<String, String>();private Jedis jedis;//初始化连接redis@Overridepublic void prepare(Map stormConf, TopologyContext context) {//建立redis连接jedis = new Jedis("0.0.0.0", 6379);jedis.auth("root");//调用本来的方法super.prepare(stormConf, context);}//处理函数@Overridepublic void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {//根据之前定义的word和num//强转为stringString word = (String) tuple.getValueByField("word");Integer num = (Integer) tuple.getValueByField("num");//1.查看单词对应的value是否存在Integer integer = wordCountMap.get(word) == null ? 0 : Integer.parseInt(wordCountMap.get(word));if (integer == 0) {//如果不存在就直接放入新的wordCountMap.put(word, num + "");} else {//如果之前已经有了,就把对应统计加上wordCountMap.put(word, (integer + num) + "");}//保存数据到redis// redis key wordCount:->Mapjedis.hmset("wordCount",wordCountMap);}//不需要定义输出字段了@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}}
任务提交
package com.jdxia.storm;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.generated.StormTopology;import backtype.storm.topology.TopologyBuilder;import storm.kafka.KafkaSpout;import storm.kafka.SpoutConfig;import storm.kafka.ZkHosts;public class StormTopologyDriver {public static void main(String[] args) {//1. 描述任务TopologyBuilder topologyBuilder = new TopologyBuilder();//任务的名字自己定义//kafka中第一个参数写broker对应的zk,第二个写topic,第三个写zk的节点,第四个写id//参数3:zkRoot将offset值存放在zk的哪里//参数4:zk的子目录,防止被覆盖和其他人冲突topologyBuilder.setSpout("kafkaSpout", new KafkaSpout(new SpoutConfig(new ZkHosts("master:2181"), "wordCount", "/wc", "wc")));//shuffleGrouping和前一个任务关联.shuffleGrouping可以连接多个任务topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("kafkaSpout");topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1");//2. 任务提交//提交给谁?提交什么内容?Config config = new Config(); //Config类实际上是继承HashMap//设置在几个work上运行,也就是在几个jvm中运行,如果不指定,默认是在一个work中// config.setNumWorkers(2);StormTopology stormTopology = topologyBuilder.createTopology();//本地模式LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("wordCount", config, stormTopology);//这种是集群模式// StormSubmitter.submitTopology("worldCount1", config, stormTopology);}}
测试
我们创建对应的topic,然后往topic写入数据,数据用空格分开
