flume

  1. 收集日志、移动、聚合框架。
  2. 基于事件。

agent

  1. source //接收数据,生产者
  2. //put()
  3. //NetcatSource
  4. //ExecSource,实时收集 tail -F xxx.txt
  5. //spooldir
  6. //seq
  7. //Stress
  8. //avroSource
  9. channel //暂存数据,缓冲区,
  10. //非永久性:MemoryChannel
  11. //永久性 :FileChannel,磁盘.
  12. //SpillableMemoryChannel :Mem + FileChannel.Capacity
  13. sink //输出数据,消费者
  14. //从channel提取take()数据,write()destination.
  15. //HdfsSink
  16. //HbaseSink
  17. //avroSink

JMS

  1. java message service,java消息服务。
  2. queue //只有能有一个消费者。P2P模式(点对点).
  3. //发布订阅(publish-subscribe,主题模式),

kafka

  1. 分布式流处理平台。
  2. 在系统之间构建实时数据流管道。
  3. topic分类对记录进行存储
  4. 每个记录包含key-value+timestamp
  5. 每秒钟百万消息吞吐量。
  6. producer //消息生产者
  7. consumer //消息消费者
  8. consumer group //消费者组
  9. kafka server //broker,kafka服务器
  10. topic //主题,副本数,分区.
  11. zookeeper //hadoop namenoade + RM HA | hbase | kafka

安装kafka

  1. 0.选择s202 ~ s204三台主机安装kafka
  2. 1.准备zk
  3. 2.jdk
  4. 3.tar文件
  5. 4.环境变量
  6. 5.配置kafka
  7. [kafka/config/server.properties]
  8. ...
  9. broker.id=202
  10. ...
  11. listeners=PLAINTEXT://:9092
  12. ...
  13. log.dirs=/home/centos/kafka/logs
  14. ...
  15. zookeeper.connect=s201:2181,s202:2181,s203:2181
  16. 6.分发server.properties,同时修改每个文件的broker.id
  17. 7.启动kafka服务器
  18. a)先启动zk
  19. b)启动kafka
  20. [s202 ~ s204]
  21. $>bin/kafka-server-start.sh config/server.properties
  22. c)验证kafka服务器是否启动
  23. $>netstat -anop | grep 9092
  24. 8.创建主题
  25. $>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test
  26. 9.查看主题列表
  27. $>bin/kafka-topics.sh --list --zookeeper s201:2181
  28. 10.启动控制台生产者
  29. $>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test
  30. 11.启动控制台消费者
  31. $>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:2181
  32. 12.在生产者控制台输入hello world

kafka集群在zk的配置

  1. /controller ===> {"version":1,"brokerid":202,"timestamp":"1490926369148"
  2. /controller_epoch ===> 1
  3. /brokers
  4. /brokers/ids
  5. /brokers/ids/202 ===> {"jmx_port":-1,"timestamp":"1490926370304","endpoints":["PLAINTEXT://s202:9092"],"host":"s202","version":3,"port":9092}
  6. /brokers/ids/203
  7. /brokers/ids/204
  8. /brokers/topics/test/partitions/0/state ===>{"controller_epoch":1,"leader":203,"version":1,"leader_epoch":0,"isr":[203,204,202]}
  9. /brokers/topics/test/partitions/1/state ===>...
  10. /brokers/topics/test/partitions/2/state ===>...
  11. /brokers/seqid ===> null
  12. /admin
  13. /admin/delete_topics/test ===>标记删除的主题
  14. /isr_change_notification
  15. /consumers/xxxx/
  16. /config

容错

创建主题

  1. repliation_factor 2 partitions 5
  2. $>kafka-topic.sh --zookeeper s202:2181 --replication_factor 3 --partitions 4 --create --topic test3
  3. 2 x 5 = 10 //是个文件夹
  4. [s202]
  5. test2-1 //
  6. test2-2 //
  7. test2-3 //
  8. [s203]
  9. test2-0
  10. test2-2
  11. test2-3
  12. test2-4
  13. [s204]
  14. test2-0
  15. test2-1
  16. test2-4

重新布局分区和副本,手动再平衡

  1. $>kafka-topics.sh --alter --zookeeper s202:2181 --topic test2 --replica-assignment 203:204,203:204,203:204,203:204,203:204

副本

  1. broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。
  2. 支持到n-1故障。每个分区都有leaderfollow.
  3. leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。
  4. leader的选举是通过isr进行,第一个注册的follower成为leader

kafka支持副本模式

  1. [同步复制]
  2. 1.producer联系zk识别leader
  3. 2.leader发送消息
  4. 3.leadr收到消息写入到本地log
  5. 4.followerleader pull消息
  6. 5.follower向本地写入log
  7. 6.followerleader发送ack消息
  8. 7.leader收到所有followerack消息
  9. 8.leaderproducer回传ack
  10. [异步副本]
  11. 和同步复制的区别在与leader写入本地log之后,
  12. 直接向client回传ack消息,不需要等待所有follower复制完成。

通过java API实现消息生产者,发送消息

  1. package com.it18zhang.kafkademo.test;
  2. import org.junit.Test;
  3. import kafka.javaapi.producer.Producer;
  4. import kafka.producer.KeyedMessage;
  5. import kafka.producer.ProducerConfig;
  6. import java.util.HashMap;
  7. import java.util.Properties;
  8. /**
  9. * Created by Administrator on 2017/3/31.
  10. */
  11. public class TestProducer {
  12. @Test
  13. public void testSend(){
  14. Properties props = new Properties();
  15. //broker列表
  16. props.put("metadata.broker.list", "s202:9092");
  17. //串行化
  18. props.put("serializer.class", "kafka.serializer.StringEncoder");
  19. //
  20. props.put("request.required.acks", "1");
  21. //创建生产者配置对象
  22. ProducerConfig config = new ProducerConfig(props);
  23. //创建生产者
  24. Producer<String, String> producer = new Producer<String, String>(config);
  25. KeyedMessage<String, String> msg = new KeyedMessage<String, String>("test3","100" ,"hello world tomas100");
  26. producer.send(msg);
  27. System.out.println("send over!");
  28. }
  29. }

消息消费者

  1. /**
  2. * 消费者
  3. */
  4. @Test
  5. public void testConumser(){
  6. //
  7. Properties props = new Properties();
  8. props.put("zookeeper.connect", "s202:2181");
  9. props.put("group.id", "g3");
  10. props.put("zookeeper.session.timeout.ms", "500");
  11. props.put("zookeeper.sync.time.ms", "250");
  12. props.put("auto.commit.interval.ms", "1000");
  13. props.put("auto.offset.reset", "smallest");
  14. //创建消费者配置对象
  15. ConsumerConfig config = new ConsumerConfig(props);
  16. //
  17. Map<String, Integer> map = new HashMap<String, Integer>();
  18. map.put("test3", new Integer(1));
  19. Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)).createMessageStreams(map);
  20. List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test3");
  21. for(KafkaStream<byte[],byte[]> stream : msgList){
  22. ConsumerIterator<byte[],byte[]> it = stream.iterator();
  23. while(it.hasNext()){
  24. byte[] message = it.next().message();
  25. System.out.println(new String(message));
  26. }
  27. }
  28. }

flume集成kafka

  1. 1.KafkaSink
  2. [生产者]
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. a1.sources.r1.type=netcat
  7. a1.sources.r1.bind=localhost
  8. a1.sources.r1.port=8888
  9. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  10. a1.sinks.k1.kafka.topic = test3
  11. a1.sinks.k1.kafka.bootstrap.servers = s202:9092
  12. a1.sinks.k1.kafka.flumeBatchSize = 20
  13. a1.sinks.k1.kafka.producer.acks = 1
  14. a1.channels.c1.type=memory
  15. a1.sources.r1.channels = c1
  16. a1.sinks.k1.channel = c1
  17. 2.KafkaSource
  18. [消费者]
  19. a1.sources = r1
  20. a1.sinks = k1
  21. a1.channels = c1
  22. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  23. a1.sources.r1.batchSize = 5000
  24. a1.sources.r1.batchDurationMillis = 2000
  25. a1.sources.r1.kafka.bootstrap.servers = s202:9092
  26. a1.sources.r1.kafka.topics = test3
  27. a1.sources.r1.kafka.consumer.group.id = g4
  28. a1.sinks.k1.type = logger
  29. a1.channels.c1.type=memory
  30. a1.sources.r1.channels = c1
  31. a1.sinks.k1.channel = c1
  32. 3.Channel
  33. 生产者 + 消费者
  34. a1.sources = r1
  35. a1.sinks = k1
  36. a1.channels = c1
  37. a1.sources.r1.type = avro
  38. a1.sources.r1.bind = localhost
  39. a1.sources.r1.port = 8888
  40. a1.sinks.k1.type = logger
  41. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
  42. a1.channels.c1.kafka.bootstrap.servers = s202:9092
  43. a1.channels.c1.kafka.topic = test3
  44. a1.channels.c1.kafka.consumer.group.id = g6
  45. a1.sources.r1.channels = c1
  46. a1.sinks.k1.channel = c1