需求:使用Kafka做日志收集
需要收集的信息:
1、用户ID(user_id)
2、时间(act_time)
3、操作(action,可以是:点击:click,收藏:job_collect,投简历:cv_send,上传简历:cv_upload)
4、对方企业编码(job_code)
1、HTML可以理解为拉勾的职位浏览页面
2、Nginx用于收集用户的点击数据流,日志在/usr/local/nginx/logs/access.log
3、将Nginx收集的日志数据发送到Kafka主题:mytrack
架构:HTML+Nginx+ngx_kafka_module+Kafka
ngx_kafka_module网址:https://github.com/brg-liuwei/ngx_kafka_module
注意问题:由于使用ngx_kafka_module,只能接收POST请求,同时一般Web服务器不会和数据收集的Nginx在同一个域名,会涉及到使用ajax发送请求的跨域问题,可以在nginx中配置跨域来解决
nginx的安装
# 安装gityum install git# 切换到/usr/local/src目录,下载Kafka的c客户端源码cd /usr/local/srcgit clone https://github.com/edenhill/librdkafka# 进入到librdkafkacd librdkafka# 如果缺少c的编译软件,先安装gcc等yum install -y gcc gcc-c++ pcre-devel zlib-devel# 编译./configuremake && make install#下载ngx_kafka_module的源码cd /usr/local/srcgit clone https://github.com/brg-liuwei/ngx_kafka_module#进入到nginx的源码包目录下,同时编译nginx 及 ngx_kafka_module插件cd /usr/local/src/nginx-1.17.8./configure --add-module=/usr/local/src/ngx_kafka_module/make & make install
nginx配置
修改nginx的配置文件nginx.conf
http {kafka;kafka_broker_list centos7-1:9092 centos7-2:9092 centos7-3:9092;server {# topic 消息转发到kafka# 地址栏输入的路径location = /kafka/track {# 开启跨域 解决adjxadd_header 'Access-Control-Allow-Origin' '*';add_header 'Access-Control-Allow-Credentials' 'true';# 记得在Kafka中先创建主题kafka_topic mytrack;}}}
异常处理
#【启动nginx,报错,找不到 librdkafka.so.1的文件】error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory# 解决办法:加载so库echo "/usr/local/lib" >> /etc/ld.so.confldconfig
测试消息发送
- 启动zookeeper,Kafka,nginx
创建一个消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytrack
使用curl发送消息
# 启动成功后,向kafka集群发送消息测试curl localhost/kafka/track -d "message send to kafka topic"
消费者打印出消息

html页面
<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>kafkaTest</title><script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.1/jquery.min.js"></script></head><body><button onclick="producer('click')">点击</button><button onclick="producer('job_collect')">收藏</button><button onclick="producer('cv_send')">投简历</button><button onclick="producer('cv_upload')">上传简历</button></body><script>function producer(act) {let message = {};message.userid = 1001;message.time = new Date();message.action = act;message.jobCode = "lagou";$.ajax({type: "post",url: "/kafka/track",data: JSON.stringify(message),success: function(res) {console.log("记录成功");}});}</script></html>
