flume
收集日志、移动、聚合框架。基于事件。
agent
source        //接收数据,生产者            //put()            //NetcatSource            //ExecSource,实时收集 tail -F xxx.txt            //spooldir            //seq            //Stress            //avroSourcechannel        //暂存数据,缓冲区,            //非永久性:MemoryChannel            //永久性  :FileChannel,磁盘.             //SpillableMemoryChannel :Mem + FileChannel.Capacitysink        //输出数据,消费者            //从channel提取take()数据,write()destination.            //HdfsSink            //HbaseSink            //avroSink
JMS
java message service,java消息服务。queue        //只有能有一个消费者。P2P模式(点对点).            //发布订阅(publish-subscribe,主题模式),
kafka
分布式流处理平台。在系统之间构建实时数据流管道。以topic分类对记录进行存储每个记录包含key-value+timestamp每秒钟百万消息吞吐量。producer            //消息生产者consumer            //消息消费者consumer group        //消费者组kafka server        //broker,kafka服务器topic                //主题,副本数,分区.zookeeper            //hadoop namenoade + RM HA | hbase | kafka
安装kafka
0.选择s202 ~ s204三台主机安装kafka1.准备zk    略2.jdk    略3.tar文件4.环境变量    略5.配置kafka    [kafka/config/server.properties]    ...    broker.id=202    ...    listeners=PLAINTEXT://:9092    ...    log.dirs=/home/centos/kafka/logs    ...    zookeeper.connect=s201:2181,s202:2181,s203:21816.分发server.properties,同时修改每个文件的broker.id7.启动kafka服务器    a)先启动zk    b)启动kafka        [s202 ~ s204]        $>bin/kafka-server-start.sh config/server.properties    c)验证kafka服务器是否启动        $>netstat -anop | grep 90928.创建主题     $>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test9.查看主题列表    $>bin/kafka-topics.sh --list --zookeeper s201:218110.启动控制台生产者    $>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test11.启动控制台消费者    $>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:218112.在生产者控制台输入hello world
kafka集群在zk的配置
/controller            ===>    {"version":1,"brokerid":202,"timestamp":"1490926369148"/controller_epoch    ===>    1/brokers/brokers/ids/brokers/ids/202    ===>    {"jmx_port":-1,"timestamp":"1490926370304","endpoints":["PLAINTEXT://s202:9092"],"host":"s202","version":3,"port":9092}/brokers/ids/203/brokers/ids/204    /brokers/topics/test/partitions/0/state ===>{"controller_epoch":1,"leader":203,"version":1,"leader_epoch":0,"isr":[203,204,202]}/brokers/topics/test/partitions/1/state ===>.../brokers/topics/test/partitions/2/state ===>.../brokers/seqid        ===> null/admin/admin/delete_topics/test        ===>标记删除的主题/isr_change_notification/consumers/xxxx//config
容错
创建主题
repliation_factor 2 partitions 5$>kafka-topic.sh --zookeeper s202:2181 --replication_factor 3 --partitions 4 --create --topic test32 x 5  = 10        //是个文件夹[s202]test2-1            //test2-2            //test2-3            //[s203]test2-0test2-2test2-3test2-4[s204]test2-0test2-1test2-4
重新布局分区和副本,手动再平衡
$>kafka-topics.sh --alter --zookeeper s202:2181 --topic test2 --replica-assignment 203:204,203:204,203:204,203:204,203:204
副本
 broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。 支持到n-1故障。每个分区都有leader,follow. leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。 新leader的选举是通过isr进行,第一个注册的follower成为leader。
kafka支持副本模式
[同步复制]    1.producer联系zk识别leader    2.向leader发送消息    3.leadr收到消息写入到本地log    4.follower从leader pull消息    5.follower向本地写入log    6.follower向leader发送ack消息    7.leader收到所有follower的ack消息    8.leader向producer回传ack[异步副本]    和同步复制的区别在与leader写入本地log之后,    直接向client回传ack消息,不需要等待所有follower复制完成。
通过java API实现消息生产者,发送消息
package com.it18zhang.kafkademo.test;import org.junit.Test;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import java.util.HashMap;import java.util.Properties;/** * Created by Administrator on 2017/3/31. */public class TestProducer {    @Test    public void testSend(){        Properties props = new Properties();        //broker列表        props.put("metadata.broker.list", "s202:9092");        //串行化        props.put("serializer.class", "kafka.serializer.StringEncoder");        //        props.put("request.required.acks", "1");        //创建生产者配置对象        ProducerConfig config = new ProducerConfig(props);        //创建生产者        Producer<String, String> producer = new Producer<String, String>(config);        KeyedMessage<String, String> msg = new KeyedMessage<String, String>("test3","100" ,"hello world tomas100");        producer.send(msg);        System.out.println("send over!");    }}
消息消费者
/** * 消费者 */@Testpublic void testConumser(){    //    Properties props = new Properties();    props.put("zookeeper.connect", "s202:2181");    props.put("group.id", "g3");    props.put("zookeeper.session.timeout.ms", "500");    props.put("zookeeper.sync.time.ms", "250");    props.put("auto.commit.interval.ms", "1000");    props.put("auto.offset.reset", "smallest");    //创建消费者配置对象    ConsumerConfig config = new ConsumerConfig(props);    //    Map<String, Integer> map = new HashMap<String, Integer>();    map.put("test3", new Integer(1));    Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)).createMessageStreams(map);    List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test3");    for(KafkaStream<byte[],byte[]> stream : msgList){        ConsumerIterator<byte[],byte[]> it = stream.iterator();        while(it.hasNext()){            byte[] message = it.next().message();            System.out.println(new String(message));        }    }}
flume集成kafka
1.KafkaSink    [生产者]    a1.sources = r1    a1.sinks = k1    a1.channels = c1    a1.sources.r1.type=netcat    a1.sources.r1.bind=localhost    a1.sources.r1.port=8888    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink    a1.sinks.k1.kafka.topic = test3    a1.sinks.k1.kafka.bootstrap.servers = s202:9092    a1.sinks.k1.kafka.flumeBatchSize = 20    a1.sinks.k1.kafka.producer.acks = 1    a1.channels.c1.type=memory    a1.sources.r1.channels = c1    a1.sinks.k1.channel = c12.KafkaSource    [消费者]    a1.sources = r1    a1.sinks = k1    a1.channels = c1    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource    a1.sources.r1.batchSize = 5000    a1.sources.r1.batchDurationMillis = 2000    a1.sources.r1.kafka.bootstrap.servers = s202:9092    a1.sources.r1.kafka.topics = test3    a1.sources.r1.kafka.consumer.group.id = g4    a1.sinks.k1.type = logger    a1.channels.c1.type=memory    a1.sources.r1.channels = c1    a1.sinks.k1.channel = c13.Channel    生产者 + 消费者    a1.sources = r1    a1.sinks = k1    a1.channels = c1    a1.sources.r1.type = avro    a1.sources.r1.bind = localhost    a1.sources.r1.port = 8888    a1.sinks.k1.type = logger    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel    a1.channels.c1.kafka.bootstrap.servers = s202:9092    a1.channels.c1.kafka.topic = test3    a1.channels.c1.kafka.consumer.group.id = g6    a1.sources.r1.channels = c1    a1.sinks.k1.channel = c1