FileBeat+LogStash实现MySQL慢查询日志解析
背景是一个大型营销系统经常出现mysql的慢查询,导致线上服务频繁出现故障,为了查看是哪些sql有问题,并且要支持各种维度的统计查询,所以使用FileBeat+LogStash+ElasticSearch+Kibana实现此需求。本文仅描述如何配置FileBeath和LogStash实现MySQL慢查询日志解析。
FileBeat配置
filebeat.inputs:
- type: log
enabled: true
# 忽略在指定的时间跨度之前被修改的文件
ignore_older: 30000h
# mysql慢查询日志目录,支持*通配符匹配多级目录
paths:
- /opt/slow-sql/*.log
# 文档类型是mysqlslow,这是filebeat内置的一套规则
document_type: mysqlslow
multiline:
pattern: "^# User@Host: "
negate: true
match: after
tail_files: false
output.logstash:
# logstash的地址,我是部署在同一台机器上的
hosts: ["127.0.0.1:5044"]
LogStash配置
input {
# 使用filebeat推送日志
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
grok {
# 有数据库名,有schema
match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Schema: (?<schema>\w+)([\s\S]*)\s+#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)use\s(?<dbname>\w+);([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
# 无数据库名,有schema
match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Schema: (?<schema>\w+)([\s\S]*)\s+#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
# 有数据库名,无schema
match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)use\s(?<dbname>\w+);([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
# 无数据库名,无schema
match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
overwrite => ["message"]
}
grok{
# 匹配 source中的ip
match => {"source" => "(?m)\s*%{IP:server_ip}"}
}
# 时间戳格式化并只保留日期
ruby {
code => "
require 'time'
event.set('date_tag', Time.at(event.get('sql_time')).to_date.to_s.delete!('-'))
"
}
# 索引时间戳使用sql生成的时间,不再使用当前时间
date {
match => ["sql_time", "yyyy-MM-dd HH:mm:ss", "UNIX"]
target => "@timestamp"
locale => "cn"
}
}
output {
# 调试时使用,在控制台打印日志分割结果
stdout {
codec => rubydebug {}
}
# es配置
elasticsearch {
hosts => "localhost:9200"
# 索引名称
index => "slow-sql-%{+YYYY.MM.dd}"
}
}
一键安装ELFK
#!/bin/bash
echo "
#-----------------------------------------
#@Author: linvaux
#@Email: [email protected]
#@Desc: Auto install ELFK
#----------------------------------------
"
INFO()
{
echo -e "\033[0;32m[INFO] $* \033[0m"
}
ERROR()
{
echo -e "\033[0;31m[ERROR] $* \033[0m"
}
WARN()
{
echo -e "\033[0;33m[WARN] $* \033[0m"
}
# booster the docker-hub
booster()
{
daemon="/etc/docker/daemon.json"
if [[ -e ${daemon} ]];then
INFO Backup ${daemon} success!
mv ${daemon} ${daemon}.bak
echo "{\"registry-mirrors\" : [\"https://hub-mirror.c.163.com\"]}" > ${daemon}
else
echo "{\"registry-mirrors\" : [\"https://hub-mirror.c.163.com\"]}" > ${daemon}
fi
INFO Config docker-hub booster success!
}
check_env()
{
if [[ -z "$(which docker)" ]]
then
WARN No docker were found,try to install!
INFO Start to install docker
source /etc/os-release
if [[ "$ID" == "ubuntu" ]] || [[ "$ID" == "debain" ]]
then
apt update
apt install curl wget -y
curl -fsSL https://get.docker.com | sh
booster
systemctl daemon-reload
systemctl restart docker
if [[ -z "$(which java)" ]];then
apt install openjdk-8-jdk -y
fi
elif [[ "$ID" == "centos" ]]
then
yum update -y
yum install wget curl net-tools -y
curl -fsSL https://get.docker.com | sh
booster
systemctl daemon-reload
systemctl restart docker
if [[ -z "$(which java)" ]];then
yum install java-1.8.0-openjdk -y
fi
else
ERROR Could not support $ID platform!
exit 1
fi
fi
}
install_elasticsearch()
{
INFO Start to install elasticsearch
echo "vm.max_map_count=655360" >> /etc/sysctl.conf
sysctl -p
docker pull docker.elastic.co/elasticsearch/elasticsearch:6.5.4
docker run -d --restart=always -p 9200:9200 -p 9300:9300 --name es -h es -e cluster.name=kiki -e node.name=node1 -e http.cors.enabled=true -e http.cors.allow-origin="*" -e xpack.security.enabled=false docker.elastic.co/elasticsearch/elasticsearch:6.5.4
}
install_kibana()
{
INFO Start to install kibana
docker pull kibana:6.5.4;
docker run --restart=always -p 5601:5601 --name kibana -e ELASTICSEARCH_URL=http://127.0.0.1:9200 --network=host -d kibana:6.5.4
}
install_filebeat_and_logstash()
{
INFO Start to install filebeat and logstash
source /etc/os-release
if [[ "$ID" == "ubuntu" ]] || [[ "$ID" == "debain" ]]
then
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
apt-get install apt-transport-https -y
echo "deb https://artifacts.elastic.co/packages/6.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-6.x.list
apt-get update && apt install filebeat logstash -y
if [[ $? -ne 0 ]]
then
ERROR Install filebeat and logstash failed!
exit 1
fi
elif [[ "$ID" == "centos" ]]
then
rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
echo -e '
[elastic-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
' > /etc/yum.repos.d/elastic.io.repo
yum makecache && yum install filebeat logstash -y
if [[ $? -ne 0 ]]
then
ERROR Install filebeat and logstash failed!
exit 1
fi
else
ERROR Could not support $ID platform!
exit 1
fi
}
start_filebeat()
{
INFO Start to call filebeat
filebeat_yaml="/etc/filebeat/filebeat.yml"
if [[ -f ${filebeat_yaml} ]];then
mv ${filebeat_yaml} ${filebeat_yaml}.bak
cp ./filebeat.yml ${filebeat_yaml}
systemctl restart filebeat
if [[ $? -ne 0 ]]
then
ERROR Start filebeat failed!
exit 1
fi
else
ERROR ${filebeat_yaml} not found, please check!
exit 1
fi
}
start_logstash()
{
INFO Start to call logstash
logstash_conf_dir="/etc/logstash/conf.d/"
if [[ -e ${logstash_conf_dir} ]];then
cp ./slow_sql_by_query.conf ${logstash_conf_dir}
systemctl restart logstash
if [[ $? -ne 0 ]]
then
ERROR Start logstash failed!
exit 1
fi
else
ERROR ${logstash_conf_dir} not found, please check!
exit 1
fi
}
run()
{
if [[ "root" == $(whoami) ]]
then
INFO Start to run...
check_env
install_elasticsearch
install_kibana
install_filebeat_and_logstash
# download_log
start_logstash
start_filebeat
# check_index
INFO Run success!
else
ERROR Run as root please!
fi
}
run