flume
收集日志、移动、聚合框架。基于事件。
agent
source //接收数据,生产者 //put() //NetcatSource //ExecSource,实时收集 tail -F xxx.txt //spooldir //seq //Stress //avroSourcechannel //暂存数据,缓冲区, //非永久性:MemoryChannel //永久性 :FileChannel,磁盘. //SpillableMemoryChannel :Mem + FileChannel.Capacitysink //输出数据,消费者 //从channel提取take()数据,write()destination. //HdfsSink //HbaseSink //avroSink
JMS
java message service,java消息服务。queue //只有能有一个消费者。P2P模式(点对点). //发布订阅(publish-subscribe,主题模式),
kafka
分布式流处理平台。在系统之间构建实时数据流管道。以topic分类对记录进行存储每个记录包含key-value+timestamp每秒钟百万消息吞吐量。producer //消息生产者consumer //消息消费者consumer group //消费者组kafka server //broker,kafka服务器topic //主题,副本数,分区.zookeeper //hadoop namenoade + RM HA | hbase | kafka
安装kafka
0.选择s202 ~ s204三台主机安装kafka1.准备zk 略2.jdk 略3.tar文件4.环境变量 略5.配置kafka [kafka/config/server.properties] ... broker.id=202 ... listeners=PLAINTEXT://:9092 ... log.dirs=/home/centos/kafka/logs ... zookeeper.connect=s201:2181,s202:2181,s203:21816.分发server.properties,同时修改每个文件的broker.id7.启动kafka服务器 a)先启动zk b)启动kafka [s202 ~ s204] $>bin/kafka-server-start.sh config/server.properties c)验证kafka服务器是否启动 $>netstat -anop | grep 90928.创建主题 $>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test9.查看主题列表 $>bin/kafka-topics.sh --list --zookeeper s201:218110.启动控制台生产者 $>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test11.启动控制台消费者 $>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:218112.在生产者控制台输入hello world
kafka集群在zk的配置
/controller ===> {"version":1,"brokerid":202,"timestamp":"1490926369148"/controller_epoch ===> 1/brokers/brokers/ids/brokers/ids/202 ===> {"jmx_port":-1,"timestamp":"1490926370304","endpoints":["PLAINTEXT://s202:9092"],"host":"s202","version":3,"port":9092}/brokers/ids/203/brokers/ids/204 /brokers/topics/test/partitions/0/state ===>{"controller_epoch":1,"leader":203,"version":1,"leader_epoch":0,"isr":[203,204,202]}/brokers/topics/test/partitions/1/state ===>.../brokers/topics/test/partitions/2/state ===>.../brokers/seqid ===> null/admin/admin/delete_topics/test ===>标记删除的主题/isr_change_notification/consumers/xxxx//config
容错
创建主题
repliation_factor 2 partitions 5$>kafka-topic.sh --zookeeper s202:2181 --replication_factor 3 --partitions 4 --create --topic test32 x 5 = 10 //是个文件夹[s202]test2-1 //test2-2 //test2-3 //[s203]test2-0test2-2test2-3test2-4[s204]test2-0test2-1test2-4
重新布局分区和副本,手动再平衡
$>kafka-topics.sh --alter --zookeeper s202:2181 --topic test2 --replica-assignment 203:204,203:204,203:204,203:204,203:204
副本
broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。 支持到n-1故障。每个分区都有leader,follow. leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。 新leader的选举是通过isr进行,第一个注册的follower成为leader。
kafka支持副本模式
[同步复制] 1.producer联系zk识别leader 2.向leader发送消息 3.leadr收到消息写入到本地log 4.follower从leader pull消息 5.follower向本地写入log 6.follower向leader发送ack消息 7.leader收到所有follower的ack消息 8.leader向producer回传ack[异步副本] 和同步复制的区别在与leader写入本地log之后, 直接向client回传ack消息,不需要等待所有follower复制完成。
通过java API实现消息生产者,发送消息
package com.it18zhang.kafkademo.test;import org.junit.Test;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import java.util.HashMap;import java.util.Properties;/** * Created by Administrator on 2017/3/31. */public class TestProducer { @Test public void testSend(){ Properties props = new Properties(); //broker列表 props.put("metadata.broker.list", "s202:9092"); //串行化 props.put("serializer.class", "kafka.serializer.StringEncoder"); // props.put("request.required.acks", "1"); //创建生产者配置对象 ProducerConfig config = new ProducerConfig(props); //创建生产者 Producer<String, String> producer = new Producer<String, String>(config); KeyedMessage<String, String> msg = new KeyedMessage<String, String>("test3","100" ,"hello world tomas100"); producer.send(msg); System.out.println("send over!"); }}
消息消费者
/** * 消费者 */@Testpublic void testConumser(){ // Properties props = new Properties(); props.put("zookeeper.connect", "s202:2181"); props.put("group.id", "g3"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); //创建消费者配置对象 ConsumerConfig config = new ConsumerConfig(props); // Map<String, Integer> map = new HashMap<String, Integer>(); map.put("test3", new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)).createMessageStreams(map); List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test3"); for(KafkaStream<byte[],byte[]> stream : msgList){ ConsumerIterator<byte[],byte[]> it = stream.iterator(); while(it.hasNext()){ byte[] message = it.next().message(); System.out.println(new String(message)); } }}
flume集成kafka
1.KafkaSink [生产者] a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = test3 a1.sinks.k1.kafka.bootstrap.servers = s202:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.channels.c1.type=memory a1.sources.r1.channels = c1 a1.sinks.k1.channel = c12.KafkaSource [消费者] a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = s202:9092 a1.sources.r1.kafka.topics = test3 a1.sources.r1.kafka.consumer.group.id = g4 a1.sinks.k1.type = logger a1.channels.c1.type=memory a1.sources.r1.channels = c1 a1.sinks.k1.channel = c13.Channel 生产者 + 消费者 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 a1.sinks.k1.type = logger a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = s202:9092 a1.channels.c1.kafka.topic = test3 a1.channels.c1.kafka.consumer.group.id = g6 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1