一、集群应用场景

第1节 消息传递

  • Kafka可以很好地替代传统邮件代理。消息代理的使用有多种原因(将处理与数据生产者分离,缓冲未处理的消息等)。与大多数邮件系统相比,Kafka具有更好的吞吐量,内置的分区,复制和容错功能,这使其成为大规模邮件处理应用程序的理想解决方案。 根据我们的经验,消息传递的使用通常吞吐量较低,但是可能需要较低的端到端延迟,并且通常取决于Kafka提供强大的持久性保证。在这个领域,Kafka与ActiveMQ或 RabbitMQ等传统消息传递系统相当。

第2节 网站活动路由

  • Kafka最初的用例是能够将用户活动跟踪管道重建为一组实时的发布-订阅。这意味着将网站活动(页面浏览,搜索或用户可能采取的其他操作)发布到中心主题,每种活动类型只有一个主题。这些提要可用于一系列用例的订阅,包括实时处理,实时监控,以及加载到Hadoop或脱机数据仓库系统中以进行脱机处理和报告。活动跟踪通常量很大,因为每个用户页面视图都会生成许多活动消息。

第3节 监控指标

  • Kafka通常用于操作监控数据。这涉及汇总来自分布式应用程序的统计信息,以生成操作数据的集中。

第4节 日志汇总

  • 许多人使用Kafka代替日志聚合解决方案。日志聚合通常从服务器收集物理日志文件,并将它们放在中央位置(也许是文件服务器或HDFS)以进行处理。Kafka提取文件的详细信息,并以日志流的形式更清晰地抽象日志或事件数据。这允许较低延迟的处理,并更容易支持多个数据源和分布式数据消耗。与以日志为中心的系统(例如Scribe或Flume)相比,Kafka具有同样出色的性能,由于复制而提供的更强的耐用性保证以及更低的端到端延迟。

第5节 流处理

  • Kafka的许多用户在由多个阶段组成的处理管道中处理数据,其中原始输入数据从Kafka主题中使用,然后进行汇总,充实或以其他方式转换为新主题,以供进一步使用或后续处理。例如,用于推荐新闻文章的处理管道可能会从RSS提要中检索文章内容,并将其发布到“文章”主题中。进一步的处理可能会使该内容规范化或重复数据删除,并将清洗后的文章内容发布到新主题中;最后的处理阶段可能会尝试向用户推荐此内容。这样的处理管道基于各个主题创建实时数据流的图形。从0.10.0.0开始,一个轻量但功能强大的流处理库称为Kafka Streams 可以在Apache Kafka中使用来执行上述数据处理。除了Kafka Streams以外,其他开源流处理工具还包括Apache Storm和 Apache Samza。

第6节 活动采集

  • 事件源是一种应用程序,其中状态更改以时间顺序记录记录。Kafka对大量存储的日志数据的支持使其成为以这种样式构建的应用程序的绝佳后端。

第7节 提交日志

  • Kafka可以用作分布式系统的一种外部提交日志。该日志有助于在节点之间复制数据,并充当故障节点恢复其数据的重新同步机制。Kafka中的日志压缩功能有助于支持此用法。在这种用法中,Kafka类似于Apache BookKeeper项目。
    1. 横向扩展,提高Kafka的处理能力
    2. 镜像,副本,提供高可用。

二、集群搭建

  • 前提:三台机器上均安装 JDK、zookeeper【搭建zookeeper集群】

linux121、linux122、linux123上安装 kafka
① 上传并解压 Kafka
② 配置kafka

  • 三台机器的配置区别在:broker.idadvertised.listeners ```shell

    配置环境变量,三台Linux都要配置

    vim /etc/profile

    添加以下内容:

    export KAFKA_HOME=/opt/kafka_2.12-1.0.2 export PATH=$PATH:$KAFKA_HOME/bin

    让配置生效

    source /etc/profile

linux121配置

vim /opt/kafka_2.12-1.0.2/config/server.properties

broker.id=0 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://linux121:9092 log.dirs=/var/lagou/kafka/kafka-logs zookeeper.connect=linux121:2181,linux122:2181,linux123:2181/myKafka

其他使用默认配置


linux122配置

vim /opt/kafka_2.12-1.0.2/config/server.properties

broker.id=1 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://linux122:9092 log.dirs=/var/lagou/kafka/kafka-logs zookeeper.connect=linux121:2181,linux122:2181,linux123:2181/myKafka

其他使用默认配置


linux123配置

vim /opt/kafka_2.12-1.0.2/config/server.properties

broker.id=2 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://linux123:9092 log.dirs=/var/lagou/kafka/kafka-logs zookeeper.connect=linux121:2181,linux122:2181,linux123:2181/myKafka

其他使用默认配置

  1. **③ 启动kafka**
  2. - 先打开zookeeper集群
  3. - 前台启动kafka,确认启动【确认无误后,方可后台启动】
  4. ```shell
  5. [root@linux121 ~]# kafka-server-start.sh /opt/kafka_2.12- 1.0.2/config/server.properties
  6. [root@linux122 ~]# kafka-server-start.sh /opt/kafka_2.12- 1.0.2/config/server.properties
  7. [root@linux123 ~]# kafka-server-start.sh /opt/kafka_2.12- 1.0.2/config/server.properties
  • 后台启动:kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

④ 验证集群 ID

  • 三台机器的 cluster id 均一样,表示属于同一个集群

image.png

  1. Cluster Id是一个唯一的不可变的标志符,用于唯一标志一个Kafka集群。
  2. 该Id最多可以有 22 个字符组成,字符对应于URL-safe Base64。
  3. Kafka 0.10.1版本及之后的版本中,在集群第一次启动的时候,Broker从 Zookeeper的/cluster/id节点获取。如果该Id不存在,就自动生成一个新的。

    image.png
    image.png


三、集群监控

1、监控度量指标

  • Kafka使用Yammer Metrics在服务器和Scala客户端中报告指标。Java客户端使用Kafka Metrics,它是一个内置的度量标准注册表,可最大程度地减少拉入客户端应用程序的传递依赖项。两者都通过 JMX 公开指标,并且可以配置为使用可插拔的统计报告器报告统计信息,以连接到您的监视系统。
  • 具体的监控指标可以查看 http://kafka.apache.org/10/documentation.html#monitoring

    JMX

  • Kafka开启Jmx端口

    • vim /opt/lagou/servers/kafka_2.12-1.0.2/bin/kafka-server-start.sh
    • 所有kafka机器上添加一个 JMX_PORT,并重启 kafka

      image.png

  • 验证JMX开启

    • 首先打印 9581 端口占用的进程信息,然后使用进程编号对应到 Kafka 的进程号【一一对应

image.png

  • 可以查看Kafka启动日志,确定启动参数

    • Dcom.sun.management.jmxremote.port=9581 存在即可

      使用 JConsole 链接 JMX 端口

    1. win/mac,找到 jconsole工具并打开,可以直接在命令行上输入 jconsole 来打开

image.png
image.png
image.png
image.png

编程手段来获取监控指标

  • 后续不用手写,有专门的工具使用

    image.png

    public class JMXMonitorDemo { 
      public static void main(String[] args) throws IOException, MalformedObjectNameException, AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException { 
          String jmxServiceURL = "service:jmx:rmi:///jndi/rmi://192.168.100.103:9581/jmxrmi"; 
          JMXServiceURL jmxURL = null; 
          JMXConnector jmxc = null; 
          MBeanServerConnection jmxs = null; 
          ObjectName mbeanObjName = null; 
          Iterator sampleIter = null; 
          Set sampleSet = null;
          // 创建JMXServiceURL对象,参数是 
          jmxURL = new JMXServiceURL(jmxServiceURL); 
          // 建立到指定URL服务器的连接 
          jmxc = JMXConnectorFactory.connect(jmxURL); 
          // 返回代表远程MBean服务器的MBeanServerConnection对象 
          jmxs = jmxc.getMBeanServerConnection(); 
          // 根据传入的字符串,创建ObjectName对象 
          mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=t p_eagle_01"); 
          // 获取指定ObjectName对应的MBeans 
          sampleSet = jmxs.queryMBeans(null, mbeanObjName); 
          // 迭代器 
          sampleIter = sampleSet.iterator(); 
          if (sampleSet.isEmpty()) { 
    
          } else { 
              // 如果返回了,则打印信息 
              while (sampleIter.hasNext()) { 
                  // Used to represent the object name of an MBean and its class name. 
                  // If the MBean is a Dynamic MBean the class name should be retrieved from the MBeanInfo it provides. 
                  // 用于表示MBean的ObjectName和ClassName 
                  ObjectInstance sampleObj = (ObjectInstance) sampleIter.next();
                  ObjectName objectName = sampleObj.getObjectName(); 
                  // 查看指定MBean指定属性的值 
                  String count = jmxs.getAttribute(objectName, "Count").toString(); 
                  System.out.println(count); 
              } 
          }
          // 关闭 jmxc.close(); 
      } 
    }
    

    2、监控工具 Kafka Eagle

    我们可以使用Kafka-eagle管理Kafka集群

核心模块:

  • 面板可视化
  • 主题管理,包含创建主题、删除主题、主题列举、主题配置、主题查询等
  • 消费者应用:对不同消费者应用进行监控,包含Kafka API、Flink API、Spark API、Storm、API、Flume API、LogStash API等
  • 集群管理:包含对Kafka集群和Zookeeper集群的详情展示,其内容包含Kafka启动时间、Kafka端口号、Zookeeper Leader角色等。同时,还有多集群切换管理,Zookeeper Client 操作入口
  • 集群监控:包含对Broker、Kafka核心指标、Zookeeper核心指标进行监控,并绘制历史趋势图
  • 告警功能:对消费者应用数据积压情况进行告警,以及对Kafka和Zookeeper监控度进行告警。同时,支持邮件、微信、钉钉告警通知
  • 系统管理:包含用户创建、用户角色分配、资源访问进行管理

架构:

  • 可视化:负责展示主题列表、集群健康、消费者应用等
  • 采集器:数据采集的来源包含Zookeeper、Kafka JMX & 内部Topic、Kafka API(Kafka 2.x以后版本)
  • 数据存储:目前Kafka Eagle存储采用MySQL或SQLite,数据库和表的创建均是自动完成的,按照官方文档进行配置好,启动Kafka Eagle就会自动创建,用来存储元数据和监控数据
  • 监控:负责见消费者应用消费情况、集群健康状态
  • 告警:对监控到的异常进行告警通知,支持邮件、微信、钉钉等方式
  • 权限管理:对访问用户进行权限管理,对于管理员、开发者、访问者等不同角色的用户,分配不用的访问权限

前面讲过了,需要Kafka节点开启JMX

# 下载编译好的包 
wget http://pkgs-linux.cvimer.com/kafka-eagle.zip 

# 配置kafka-eagle 
unzip kafka-eagle.zip 
cd kafka-eagle/kafka-eagle-web/target 
mkdir -p test 
cp kafka-eagle-web-2.0.1-bin.tar.gz test/ 
tar xf kafka-eagle-web-2.0.1-bin.tar.gz 
cd kafka-eagle-web-2.0.1

需要配置环境变量:

  • KE_HOME=
  • PATH=

conf 下的配置文件:system-config.properties
image.png
image.png
image.png
image.png
image.png
image.png

  • 也可以自行编译,https://github.com/smartloli/kafka-eagle 创建Eagel的存储目录:mkdir -p /hadoop/kafka-eagle

  • 启动kafka-eagle

    • ./bin/ke.sh ``start
  • 会提示我们登陆地址和账号密码