参考:一:场景描述对于线上大流量服务或者需要上报日志的nginx服务,每天会产生大量的日志,这些日志非常有价值。可用于计数上报、用户行为分析、接口质量、性能监控等需求。但传统nginx记录日志的方式数据会散落在各自nginx上,而且大流量日志本身对磁盘也是一种冲击。 二:技术方案得益于openresty和kafka的高性能,我们可以非常轻量高效的实现当前需求,架构如下: 三:相关技术 kafka: http://kafka. (版本:2.0.0) lua-resty-kafka: https://github.com/doujiang24/lua-resty-kafka 四:安装配置 brew install openresty 参考:http:///cn/installation.html 2)安装lua-resty-kafka wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip #拷贝lua-resty-kafka到openresty cp -rf /opt/nginx/lua-resty-kafka-master/lib/resty /usr/local/Cellar/openresty/1.13.6.2/lualib/kafka/ 4):安装单机kafka brew install kafka # 启动kafka服务
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties # 查看所有topic
/usr/local/Cellar/kafka/2.0.0/bin/kafka-topics --zookeeper localhost:2181 --list # 创建测试topic /usr/local/Cellar/kafka/2.0.0/bin/kafka-topics --zookeeper localhost:2181 --create --topic test --replication-factor 1 --partitions 1# 产生数据到topic /usr/local/Cellar/kafka/2.0.0/bin/kafka-console-producer --broker-list localhost:9092 --topic test 命令行输入测试数据 >aaa # 查看测试topic数据
/usr/local/Cellar/kafka/2.0.0/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning 编辑/usr/local/etc/openresty/nginx.conf 实现kafka记录nginx日志功能,源码如下: #其他不变,添加如下代码
lua_package_path "/usr/local/Cellar/openresty/1.13.6.2/lualib/kafka/?.lua;;"; #引入lua-resty-kafka server { listen 8080; root /data1/XXXX; index index.html; server_name test.com; access_log /data0/www/logs/test.com-access_log; error_log /data0/www/logs/test.com-error_log; lua_code_cache off; #关闭lua代码缓存,可以实时调试代码 location / { default_type text/html; content_by_lua 'ngx.say(package.path)'; } location ^~ /test/{ default_type text/html; content_by_lua_file $document_root/test/index.lua; }
} } 2 index.lua --ngx.say([[{"status":{"code":100000,"msg":"SUCCESS"},"result":{"version":"1.0.01"}}]]) --ngx.say(package.path) --引入 kafka 生产者 类库 local producer = require("resty.kafka.producer") --引入json 解析类库 local cjson = require("cjson") --构造kafka 集群节点 broker local broker_list = { { host = "127.0.0.1", port = 9092}, -- { host = "192.168.0.17", port = 9092}, -- { host = "192.168.0.18", port = 9092} } --定义上报对象 local log_obj = {} --自定义模块名称 log_obj["request_module"] = "product_detail_info" --获取请求头信息 log_obj["headers"] = ngx.req.get_headers() --获取请求uri 参数 log_obj["uri_args"] = ngx.req.get_uri_args() --获取请求body log_obj["body"] = ngx.req.read_body() --获取请求的http协议版本 log_obj["http_version"] = ngx.req.http_version() --获取请求方法 log_obj["method"] = ngx.req.get_method() --获取未解析的请求头字符串 log_obj["raw_reader"] = ngx.req.raw_header() --获取解析的请求body体内容字符串 log_obj["body_data"] = ngx.req.get_body_data() --上报对象json 字符串编码 local message = cjson.encode(log_obj) local uri_args = ngx.req.get_uri_args() local product_id = uri_args["productId"] --创建kafka producer 连接对象,producer_type = "async" 异步 local async_producer = producer:new(broker_list, {producer_type = "async"}) --请求上报kafka,kafka 生产者发送数据,async_prodecer:send(a,b,c),a : 主题名称,b:分区(保证相同id,全部到相同的kafka node 去,并且顺序一致),c:消息(上报数据) local ok, err = async_producer:send("test", product_id, message) --上报异常处理 if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end 六:检测&运行检测配置,只检测nginx配置是否正确,lua错误日志在nginx的error.log文件中 七:测试1:使用任意http请求发送给当前nginx,如: 2:查看upstream代理是否工作正常 效果监测: #单nginx+upstream测试: #结果 引用 #单nginx+upstream+log_lua_kafka接入测试: #结果 |
|