需求:使用Kafka做日志收集
需要收集的信息:
1、用户ID(user_id)
2、时间(act_time)
3、操作(action,可以是:点击:click,收藏:job_collect,投简历:cv_send,上传简历:cv_upload)
4、对方企业编码(job_code)
1、HTML可以理解为拉勾的职位浏览页面
2、Nginx用于收集用户的点击数据流,日志在/usr/local/nginx/logs/access.log
3、将Nginx收集的日志数据发送到Kafka主题:mytrack
架构:HTML+Nginx+ngx_kafka_module+Kafka
ngx_kafka_module网址:https://github.com/brg-liuwei/ngx_kafka_module
注意问题:由于使用ngx_kafka_module,只能接收POST请求,同时一般Web服务器不会和数据收集的Nginx在同一个域名,会涉及到使用ajax发送请求的跨域问题,可以在nginx中配置跨域来解决
nginx的安装
# 安装git
yum install git
# 切换到/usr/local/src目录,下载Kafka的c客户端源码
cd /usr/local/src
git clone https://github.com/edenhill/librdkafka
# 进入到librdkafka
cd librdkafka
# 如果缺少c的编译软件,先安装gcc等
yum install -y gcc gcc-c++ pcre-devel zlib-devel
# 编译
./configure
make && make install
#下载ngx_kafka_module的源码
cd /usr/local/src
git clone https://github.com/brg-liuwei/ngx_kafka_module
#进入到nginx的源码包目录下,同时编译nginx 及 ngx_kafka_module插件
cd /usr/local/src/nginx-1.17.8
./configure --add-module=/usr/local/src/ngx_kafka_module/
make & make install
nginx配置
修改nginx的配置文件nginx.conf
http {
kafka;
kafka_broker_list centos7-1:9092 centos7-2:9092 centos7-3:9092;
server {
# topic 消息转发到kafka
# 地址栏输入的路径
location = /kafka/track {
# 开启跨域 解决adjx
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Credentials' 'true';
# 记得在Kafka中先创建主题
kafka_topic mytrack;
}
}
}
异常处理
#【启动nginx,报错,找不到 librdkafka.so.1的文件】
error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory
# 解决办法:加载so库
echo "/usr/local/lib" >> /etc/ld.so.conf
ldconfig
测试消息发送
- 启动zookeeper,Kafka,nginx
创建一个消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytrack
使用curl发送消息
# 启动成功后,向kafka集群发送消息测试
curl localhost/kafka/track -d "message send to kafka topic"
消费者打印出消息
html页面
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>kafkaTest</title>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.1/jquery.min.js"></script>
</head>
<body>
<button onclick="producer('click')">点击</button>
<button onclick="producer('job_collect')">收藏</button>
<button onclick="producer('cv_send')">投简历</button>
<button onclick="producer('cv_upload')">上传简历</button>
</body>
<script>
function producer(act) {
let message = {};
message.userid = 1001;
message.time = new Date();
message.action = act;
message.jobCode = "lagou";
$.ajax({
type: "post",
url: "/kafka/track",
data: JSON.stringify(message),
success: function(res) {
console.log("记录成功");
}
});
}
</script>
</html>