flume
收集日志、移动、聚合框架。
基于事件。
agent
source //接收数据,生产者
//put()
//NetcatSource
//ExecSource,实时收集 tail -F xxx.txt
//spooldir
//seq
//Stress
//avroSource
channel //暂存数据,缓冲区,
//非永久性:MemoryChannel
//永久性 :FileChannel,磁盘.
//SpillableMemoryChannel :Mem + FileChannel.Capacity
sink //输出数据,消费者
//从channel提取take()数据,write()destination.
//HdfsSink
//HbaseSink
//avroSink
JMS
java message service,java消息服务。
queue //只有能有一个消费者。P2P模式(点对点).
//发布订阅(publish-subscribe,主题模式),
kafka
分布式流处理平台。
在系统之间构建实时数据流管道。
以topic分类对记录进行存储
每个记录包含key-value+timestamp
每秒钟百万消息吞吐量。
producer //消息生产者
consumer //消息消费者
consumer group //消费者组
kafka server //broker,kafka服务器
topic //主题,副本数,分区.
zookeeper //hadoop namenoade + RM HA | hbase | kafka
安装kafka
0.选择s202 ~ s204三台主机安装kafka
1.准备zk
略
2.jdk
略
3.tar文件
4.环境变量
略
5.配置kafka
[kafka/config/server.properties]
...
broker.id=202
...
listeners=PLAINTEXT://:9092
...
log.dirs=/home/centos/kafka/logs
...
zookeeper.connect=s201:2181,s202:2181,s203:2181
6.分发server.properties,同时修改每个文件的broker.id
7.启动kafka服务器
a)先启动zk
b)启动kafka
[s202 ~ s204]
$>bin/kafka-server-start.sh config/server.properties
c)验证kafka服务器是否启动
$>netstat -anop | grep 9092
8.创建主题
$>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test
9.查看主题列表
$>bin/kafka-topics.sh --list --zookeeper s201:2181
10.启动控制台生产者
$>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test
11.启动控制台消费者
$>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:2181
12.在生产者控制台输入hello world
kafka集群在zk的配置
/controller ===> {"version":1,"brokerid":202,"timestamp":"1490926369148"
/controller_epoch ===> 1
/brokers
/brokers/ids
/brokers/ids/202 ===> {"jmx_port":-1,"timestamp":"1490926370304","endpoints":["PLAINTEXT://s202:9092"],"host":"s202","version":3,"port":9092}
/brokers/ids/203
/brokers/ids/204
/brokers/topics/test/partitions/0/state ===>{"controller_epoch":1,"leader":203,"version":1,"leader_epoch":0,"isr":[203,204,202]}
/brokers/topics/test/partitions/1/state ===>...
/brokers/topics/test/partitions/2/state ===>...
/brokers/seqid ===> null
/admin
/admin/delete_topics/test ===>标记删除的主题
/isr_change_notification
/consumers/xxxx/
/config
容错
创建主题
repliation_factor 2 partitions 5
$>kafka-topic.sh --zookeeper s202:2181 --replication_factor 3 --partitions 4 --create --topic test3
2 x 5 = 10 //是个文件夹
[s202]
test2-1 //
test2-2 //
test2-3 //
[s203]
test2-0
test2-2
test2-3
test2-4
[s204]
test2-0
test2-1
test2-4
重新布局分区和副本,手动再平衡
$>kafka-topics.sh --alter --zookeeper s202:2181 --topic test2 --replica-assignment 203:204,203:204,203:204,203:204,203:204
副本
broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。
支持到n-1故障。每个分区都有leader,follow.
leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。
新leader的选举是通过isr进行,第一个注册的follower成为leader。
kafka支持副本模式
[同步复制]
1.producer联系zk识别leader
2.向leader发送消息
3.leadr收到消息写入到本地log
4.follower从leader pull消息
5.follower向本地写入log
6.follower向leader发送ack消息
7.leader收到所有follower的ack消息
8.leader向producer回传ack
[异步副本]
和同步复制的区别在与leader写入本地log之后,
直接向client回传ack消息,不需要等待所有follower复制完成。
通过java API实现消息生产者,发送消息
package com.it18zhang.kafkademo.test;
import org.junit.Test;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.HashMap;
import java.util.Properties;
/**
* Created by Administrator on 2017/3/31.
*/
public class TestProducer {
@Test
public void testSend(){
Properties props = new Properties();
//broker列表
props.put("metadata.broker.list", "s202:9092");
//串行化
props.put("serializer.class", "kafka.serializer.StringEncoder");
//
props.put("request.required.acks", "1");
//创建生产者配置对象
ProducerConfig config = new ProducerConfig(props);
//创建生产者
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> msg = new KeyedMessage<String, String>("test3","100" ,"hello world tomas100");
producer.send(msg);
System.out.println("send over!");
}
}
消息消费者
/**
* 消费者
*/
@Test
public void testConumser(){
//
Properties props = new Properties();
props.put("zookeeper.connect", "s202:2181");
props.put("group.id", "g3");
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
//创建消费者配置对象
ConsumerConfig config = new ConsumerConfig(props);
//
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("test3", new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)).createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test3");
for(KafkaStream<byte[],byte[]> stream : msgList){
ConsumerIterator<byte[],byte[]> it = stream.iterator();
while(it.hasNext()){
byte[] message = it.next().message();
System.out.println(new String(message));
}
}
}
flume集成kafka
1.KafkaSink
[生产者]
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test3
a1.sinks.k1.kafka.bootstrap.servers = s202:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.KafkaSource
[消费者]
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = s202:9092
a1.sources.r1.kafka.topics = test3
a1.sources.r1.kafka.consumer.group.id = g4
a1.sinks.k1.type = logger
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.Channel
生产者 + 消费者
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sinks.k1.type = logger
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = s202:9092
a1.channels.c1.kafka.topic = test3
a1.channels.c1.kafka.consumer.group.id = g6
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1