需求:使用Kafka做日志收集
需要收集的信息:
1、用户ID(user_id)
2、时间(act_time)
3、操作(action,可以是:点击:click,收藏:job_collect,投简历:cv_send,上传简历:cv_upload)
4、对方企业编码(job_code)
1、HTML可以理解为拉勾的职位浏览页面
2、Nginx用于收集用户的点击数据流,日志在/usr/local/nginx/logs/access.log
3、将Nginx收集的日志数据发送到Kafka主题:mytrack
架构:HTML+Nginx+ngx_kafka_module+Kafka
ngx_kafka_module网址:https://github.com/brg-liuwei/ngx_kafka_module
注意问题:由于使用ngx_kafka_module,只能接收POST请求,同时一般Web服务器不会和数据收集的Nginx在同一个域名,会涉及到使用ajax发送请求的跨域问题,可以在nginx中配置跨域来解决

nginx的安装

  1. # 安装git
  2. yum install git
  3. # 切换到/usr/local/src目录,下载Kafka的c客户端源码
  4. cd /usr/local/src
  5. git clone https://github.com/edenhill/librdkafka
  6. # 进入到librdkafka
  7. cd librdkafka
  8. # 如果缺少c的编译软件,先安装gcc等
  9. yum install -y gcc gcc-c++ pcre-devel zlib-devel
  10. # 编译
  11. ./configure
  12. make && make install
  13. #下载ngx_kafka_module的源码
  14. cd /usr/local/src
  15. git clone https://github.com/brg-liuwei/ngx_kafka_module
  16. #进入到nginx的源码包目录下,同时编译nginx 及 ngx_kafka_module插件
  17. cd /usr/local/src/nginx-1.17.8
  18. ./configure --add-module=/usr/local/src/ngx_kafka_module/
  19. make & make install

nginx配置

修改nginx的配置文件nginx.conf

  1. http {
  2. kafka;
  3. kafka_broker_list centos7-1:9092 centos7-2:9092 centos7-3:9092;
  4. server {
  5. # topic 消息转发到kafka
  6. # 地址栏输入的路径
  7. location = /kafka/track {
  8. # 开启跨域 解决adjx
  9. add_header 'Access-Control-Allow-Origin' '*';
  10. add_header 'Access-Control-Allow-Credentials' 'true';
  11. # 记得在Kafka中先创建主题
  12. kafka_topic mytrack;
  13. }
  14. }
  15. }

异常处理

  1. #【启动nginx,报错,找不到 librdkafka.so.1的文件】
  2. error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory
  3. # 解决办法:加载so库
  4. echo "/usr/local/lib" >> /etc/ld.so.conf
  5. ldconfig

测试消息发送

  • 启动zookeeper,Kafka,nginx
  • 创建一个消费者

    1. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytrack
  • 使用curl发送消息

    1. # 启动成功后,向kafka集群发送消息测试
    2. curl localhost/kafka/track -d "message send to kafka topic"
  • 消费者打印出消息

image.png
html页面

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <meta name="viewport" content="width=device-width, initial-scale=1.0">
  6. <title>kafkaTest</title>
  7. <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.1/jquery.min.js"></script>
  8. </head>
  9. <body>
  10. <button onclick="producer('click')">点击</button>
  11. <button onclick="producer('job_collect')">收藏</button>
  12. <button onclick="producer('cv_send')">投简历</button>
  13. <button onclick="producer('cv_upload')">上传简历</button>
  14. </body>
  15. <script>
  16. function producer(act) {
  17. let message = {};
  18. message.userid = 1001;
  19. message.time = new Date();
  20. message.action = act;
  21. message.jobCode = "lagou";
  22. $.ajax({
  23. type: "post",
  24. url: "/kafka/track",
  25. data: JSON.stringify(message),
  26. success: function(res) {
  27. console.log("记录成功");
  28. }
  29. });
  30. }
  31. </script>
  32. </html>