基于ELK的AccessLog分析
通过分析Accesslog我们可以通过多种纬度进行分析线上网站的请求和相应情况但是常规的文件io已经不能适用于当前请求量,一般accesslog都会关闭或者保留一小段时间的log,如何优化解决这个问题?
在平时的开发中经常会需要通过各种纬度评测当前系统的状态
例如:
- 接口请求的频率 成功的可以统计到 ,但是异常失败的怎么办?
- 接口的速度响应时间、结果如何分析?目前只能统计程序里面只是程序执行的耗时,并不能对接口在整个请求期间的状态进行分析
- 目前用户的客户端浏览器的分部概况,业务是否有必要化功夫支持ie7 8 9
- 异常的请求 分析等等
通过分析Accesslog我们可以通过多种纬度进行分析线上网站的请求和相应情况但是常规的文件io已经不能适用于当前请求量,一般accesslog都会关闭或者保留一小段时间的log,如何优化解决这个问题?
本次采用的整体架构为
nginx(1.7+)+go(udpServer)+kafka+logstash+elasticsearch+kibana
简单描述小流程
1:ngingx产生log通过udp请求发送到指定server
2:server把通过udp请求过来的log暂存到kafka里面
3:通过logstash把kafka里面的日志拉取到elasticsearch里面
4:通过kibana管理es里面的数据
整体架构图如下
Nginx-accesslog上报
在nginx这一层通过UDP上报accesslog到server,这里要注意nginx版本要大于1.7否则,不支持上报
accesslog可以自定义格式
这里示例一段
http { include mime.types; default_type application/octet-stream; log_format main '$remote_addr $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for" "$upstream_response_time" '; access_log syslog:server=127.0.0.1:5000,facility=local6,severity=debug,tag=na main; sendfile on; #tcp_nopush on; #keepalive_timeout 0; keepalive_timeout 65;
UDPserver接收传入到Kafka
上报到后台server接收udp服务,这里可以用go或者swoole来写都可以,
这里在server 上报到kafka的最好不要一次一条转发分批次定时转发线上的代码就不发了发一个demo做的小测试 ,其他在此基础上进行分批(channel)定时(time.ticker)处理就好
package main import ( "os" "fmt" "net" "github.com/optiopay/kafka" "github.com/optiopay/kafka/proto" "log" ) const ( topic = "accesslog" partition = 0 ) var kafkaAddrs = []string{"localhost:9092"} func checkError(err error){ if err != nil { fmt.Println("Error: %s", err.Error()) os.Exit(1) } } func produceStdin(broker kafka.Client,logmsg string) { producer := broker.Producer(kafka.NewProducerConf()) msg := &proto.Message{Value: []byte(logmsg)} if _, err := producer.Produce(topic, partition, msg); err != nil { log.Fatalf("cannot produce message to %s:%d: %s", topic, partition, err) } } func recvUDPMsg(conn *net.UDPConn)(string){ var buf [1024]byte n, raddr, err := conn.ReadFromUDP(buf[0:]) if err != nil { return err.Error() } //fmt.Println("msg is ", string(buf[0:n])) //sendToKafka(string(buf[0:n])) //WriteToUDP //func (c *UDPConn) WriteToUDP(b []byte, addr *UDPAddr) (int, error) _, err = conn.WriteToUDP([]byte("nice to see u"), raddr) checkError(err) return string(buf[0:n]) } func main() { conf := kafka.NewBrokerConf("accesslog-client") conf.AllowTopicCreation = true // connect to kafka cluster broker, err := kafka.Dial(kafkaAddrs, conf) if err != nil { log.Fatalf("cannot connect to kafka cluster: %s", err) } defer broker.Close() //produceStdin(broker) // udp_addr, err := net.ResolveUDPAddr("udp", ":10001") checkError(err) conn, err := net.ListenUDP("udp", udp_addr) defer conn.Close() checkError(err) //go recvUDPMsg(conn) for { produceStdin(broker,recvUDPMsg(conn)) } }
Logstash从Kafka传入到ElasticSearch
对应的配置conf文件内容,这里主要要做好filter的格式化json处理
input { kafka { id => "accesslog" topics => ["accesslog"] } } filter { grok { match => [ "message", "%{IPV4:remote_addr} - \[%{HTTPDATE:log_timestamp}\] \"(%{WORD:request_method}|-) %{NOTSPACE:http_request} (?:HTTP/%{NUMBER:http_version})\" %{NUMBER:http_status_code} (?:%{NUMBER:bytes_read}|-) %{QS:referrer} %{QS:agent} \"(?:%{IPV4:http_x_forwarded_for}|-)\" \"(?:%{BASE16FLOAT:upstream_response_time}|-)\"" ] } } output { if [tags] and "_grokparsefailure" in [tags] { stdout { codec => rubydebug } } else { elasticsearch { index => "elk_access_log" document_type => "nginx_accesslog" } } }
Kibana分析
在kibana里面执行对应的分析报表查询和生成各种报表,这里举两个栗子
例如基于响应时间
查询时间最长和最小的