storm
免费、开源、分布式、实时计算系统。 吞吐量高。 每秒每节点百万元组。
Spout //水龙头
Cloujr
JStorm
storm hadoop
实时流处理 批处理
无状态 有状态
使用zk协同的主 无zk的主从架构。
从架构
每秒处理数万消息 HDFS MR数分钟、数小时
不会主动停止 终有完成的时候。
storm优点
1.跨语言
2.可伸缩的
3.低延迟,秒级/分钟级
4.容错。
核心概念
1.Tuple
主要的数据结构,有序元素的列表。
2.Stream
Tuple的序列。
3.Spouts
数据流源头。可以读取kafka队列消息。可以自定义。
4.Bolts
转接头.
逻辑处理单元。spout的数据传递个bolt,bolt计算,完成后产生新的数据。
IBolt是接口。
Topology
Spout + bolt连接在一起形成一个top,形成有向图,定点就是计算,边是数据流。
task
Bolt中每个Spout或者bolt都是一个task.
Storm架构
1.Nimbus(灵气)
master节点。
核心组件,运行top。
分析top并收集运行task。分发task给supervisor.
监控top。
无状态,依靠zk监控top的运行状况。
2.Supervisor(监察)
每个supervisor有n个worker进程,负责代理task给worker。
worker在孵化执行线程最终运行task。
storm使用内部消息系统在nimbus和supervisor之间进行通信。
接受nimbus指令,管理worker进程完成task派发。
3.worker
执行特定的task,worker本身不执行任务,而是孵化executors,
让executors执行task。
4.Executor
本质上有worker进程孵化出来的一个线程而已。
executor运行task都属于同一spout或者bolt.
5.task
执行实际上的任务处理。或者是Spout或者是bolt.
storm工作流程
1.nimbus等待提交的top
2.提交top后,nimbus收集task,
3.nimbus分发task给所有可用的supervisor
4.supervisor周期性发送心跳给nimbus表示自己还活着。
5.如果supervisor挂掉,不会发送心跳给nimubs,nimbus将task发送给其他的supervisor
6.nimubs挂掉,super会继续执行自己task。
7.task完成后,supervisor等待新的task
8.同时,挂掉的nimbus可以通过监控工具软件自动重启。
安装storm集群
[s201 ~ s204]
1.jdk
2.tar
3.环境变量
4.验证安装
$>source /etc/profile
$>./storm version
5.分发安装文件到其他节点。
6.配置
[storm/conf/storm.yaml]
## Nimbus 和 Supervisor 后台进程都需要一个用于存放一些状态数据(比如 jar 包、配置文件等等)的目录
storm.local.dir: "/home/centos/storm"
## Storm 关联的 ZooKeeper 集群的地址列表
storm.zookeeper.servers:
- "s202"
- "s203"
storm.zookeeper.port: 2181
### nimbus.* configs are for the master
##用于配置主控节点的地址,可以配置多个。从Storm1.0开始,支持Nimbus的HA。
nimbus.seeds : ["s201"]
### ui.* configs are for the master
ui.host: 0.0.0.0
ui.port: 8080
##配置每个 Supervisor 机器能够运行的工作进程(worker)数。
每个 worker 都需要一个单独的端口来接收消息,
##这个配置项就定义了 worker 可以使用的端口列表。
如果你在这里定义了 5 个端口,那么 Storm 就会在该机器上分配最多 5 个worker。
##如果定义 3 个端口,那 Storm 至多只会运行三个 worker。
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
7.分发
8.启动进程
a)启动s201 nimbus进程
$>storm nimbus &
b)启动s202 ~ s204 supervisor进程
$>storm supervisor &
c)启动s201的ui进程
$>storm ui &
9.通过webui查看
http://s201:8080/
编程实现CallLog日志统计
0.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.it18zhang</groupId>
<artifactId>StormDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>
</project>
1.创建Spout
package com.it18zhang.stormdemo;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* Spout类,负责产生数据流
*/
public class CallLogSpout implements IRichSpout{
//Spout输出收集器
private SpoutOutputCollector collector;
//是否完成
private boolean completed = false;
//上下文
private TopologyContext context;
//随机发生器
private Random randomGenerator = new Random();
//
private Integer idx = 0;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
public void close() {
}
public void activate() {
}
public void deactivate() {
}
/**
* 下一个元组
*/
public void nextTuple() {
if (this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while (localIdx++ < 100 && this.idx++ < 1000) {
//取出主叫
String caller = mobileNumbers.get(randomGenerator.nextInt(4));
//取出被叫
String callee = mobileNumbers.get(randomGenerator.nextInt(4));
while (caller == callee) {
//重新取出被叫
callee = mobileNumbers.get(randomGenerator.nextInt(4));
}
//模拟通话时长
Integer duration = randomGenerator.nextInt(60);
//输出元组
this.collector.emit(new Values(caller, callee, duration));
}
}
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
/**
* 定义输出的字段名称
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
2.创建CreatorBolt
package com.it18zhang.stormdemo;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* 创建CallLog日志的Bolt
*/
public class CallLogCreatorBolt implements IRichBolt {
//
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector ;
}
public void execute(Tuple tuple) {
//处理通话记录
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
//产生新的tuple
collector.emit(new Values(from + " - " + to, duration));
}
public void cleanup() {
}
/**
* 设置输出字段的名称
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
3.创建CounterBolt
package com.it18zhang.stormdemo;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* 通话记录计数器Bolt
*/
public class CallLogCounterBolt implements IRichBolt{
Map<String, Integer> counterMap;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if (!counterMap.containsKey(call)) {
counterMap.put(call, 1);
} else {
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
public void cleanup() {
for (Map.Entry<String, Integer> entry : counterMap.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
4.App
package com.it18zhang.stormdemo;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
* App
*/
public class App {
public static void main(String[] args) throws InterruptedException {
TopologyBuilder builder = new TopologyBuilder();
//设置Spout
builder.setSpout("spout", new CallLogSpout());
//设置creator-Bolt
builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
//设置counter-Bolt
builder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());
Thread.sleep(10000);
//停止集群
cluster.shutdown();
}
}
5.在生产环境的集群上部署storm top
a)修改提交方式
[App.java]
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//设置Spout
builder.setSpout("spout", new CallLogSpout());
//设置creator-Bolt
builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
//设置counter-Bolt
builder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));
Config conf = new Config();
conf.setDebug(true);
/**
* 本地模式storm
*/
// LocalCluster cluster = new LocalCluster();
// cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());
// Thread.sleep(10000);
StormSubmitter.submitTopology("mytop", conf, builder.createTopology());
}
b)导入jar包.
maven ...
c)在centos上运行top
$>storm jar xxx.jar com.it18zhang.stormdemo.App
使用storm流计算实现wordcount
1.WordCountSpout
...
2.SplitBolt
String line = ...
String[] str = line.split(" ");
for(String s : str){
collector.emit(new Values("word","1"))
}
3.CounterBolt
设置top的并发程度和任务
配置并发度.
1.设置worker数据
conf.setNumWorkers(1);
2.设置executors个数
//设置Spout的并发暗示 (executor个数)
builder.setSpout("wcspout", new WordCountSpout(),3);
//设置bolt的并发暗示
builder.setBolt("split-bolt", new SplitBolt(),4)
3.设置task个数
每个线程可以执行多个task.
builder.setSpout("wcspout", new WordCountSpout(),3).setNumTasks(2);
//
builder.setBolt("split-bolt", new SplitBolt(),4).shuffleGrouping("wcspout").setNumTasks(3);
4.并发度 ==== 所有的task个数的总和。