数据获取
package com.jdxia.storm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class MySplitBolt extends BaseBasicBolt {
//处理函数
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//1. 数据用tuple获取
//和kafka接入,这边的名字就变为bytes了
byte[] juzi = (byte[]) tuple.getValueByField("bytes");
//2. 进行切割
String[] strings = new String(juzi).split(" ");
//3. 发送数据
for (String string : strings) {
basicOutputCollector.emit(new Values(string, 1));
}
}
//定义下我的输出
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "num"));
}
}
数据计算
package com.jdxia.storm;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import java.util.HashMap;
import java.util.Map;
public class MyWordCountAndPrintBolt extends BaseBasicBolt {
private Map<String, String> wordCountMap = new HashMap<String, String>();
private Jedis jedis;
//初始化连接redis
@Override
public void prepare(Map stormConf, TopologyContext context) {
//建立redis连接
jedis = new Jedis("0.0.0.0", 6379);
jedis.auth("root");
//调用本来的方法
super.prepare(stormConf, context);
}
//处理函数
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//根据之前定义的word和num
//强转为string
String word = (String) tuple.getValueByField("word");
Integer num = (Integer) tuple.getValueByField("num");
//1.查看单词对应的value是否存在
Integer integer = wordCountMap.get(word) == null ? 0 : Integer.parseInt(wordCountMap.get(word));
if (integer == 0) {
//如果不存在就直接放入新的
wordCountMap.put(word, num + "");
} else {
//如果之前已经有了,就把对应统计加上
wordCountMap.put(word, (integer + num) + "");
}
//保存数据到redis
// redis key wordCount:->Map
jedis.hmset("wordCount",wordCountMap);
}
//不需要定义输出字段了
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
任务提交
package com.jdxia.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
public class StormTopologyDriver {
public static void main(String[] args) {
//1. 描述任务
TopologyBuilder topologyBuilder = new TopologyBuilder();
//任务的名字自己定义
//kafka中第一个参数写broker对应的zk,第二个写topic,第三个写zk的节点,第四个写id
//参数3:zkRoot将offset值存放在zk的哪里
//参数4:zk的子目录,防止被覆盖和其他人冲突
topologyBuilder.setSpout("kafkaSpout", new KafkaSpout(new SpoutConfig(new ZkHosts("master:2181"), "wordCount", "/wc", "wc")));
//shuffleGrouping和前一个任务关联.shuffleGrouping可以连接多个任务
topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("kafkaSpout");
topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1");
//2. 任务提交
//提交给谁?提交什么内容?
Config config = new Config(); //Config类实际上是继承HashMap
//设置在几个work上运行,也就是在几个jvm中运行,如果不指定,默认是在一个work中
// config.setNumWorkers(2);
StormTopology stormTopology = topologyBuilder.createTopology();
//本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordCount", config, stormTopology);
//这种是集群模式
// StormSubmitter.submitTopology("worldCount1", config, stormTopology);
}
}
测试
我们创建对应的topic,然后往topic写入数据,数据用空格分开