FileBeat+LogStash实现MySQL慢查询日志解析

写 bug 的大耳朵图图
·

背景是一个大型营销系统经常出现mysql的慢查询,导致线上服务频繁出现故障,为了查看是哪些sql有问题,并且要支持各种维度的统计查询,所以使用FileBeat+LogStash+ElasticSearch+Kibana实现此需求。本文仅描述如何配置FileBeath和LogStash实现MySQL慢查询日志解析。

FileBeat配置

filebeat.inputs:
- type: log
  enabled: true
  # 忽略在指定的时间跨度之前被修改的文件
  ignore_older: 30000h
  # mysql慢查询日志目录,支持*通配符匹配多级目录
  paths:
    - /opt/slow-sql/*.log
  # 文档类型是mysqlslow,这是filebeat内置的一套规则
  document_type: mysqlslow
  multiline:
    pattern: "^# User@Host: "
    negate: true
    match: after
  tail_files: false

output.logstash:
  # logstash的地址,我是部署在同一台机器上的
  hosts: ["127.0.0.1:5044"]

LogStash配置

input {
  # 使用filebeat推送日志
  beats {
    port => 5044
    host => "0.0.0.0"
  }
}

filter {
  grok {
    # 有数据库名,有schema
    match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Schema: (?<schema>\w+)([\s\S]*)\s+#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)use\s(?<dbname>\w+);([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
    # 无数据库名,有schema
    match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Schema: (?<schema>\w+)([\s\S]*)\s+#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
    # 有数据库名,无schema
    match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)use\s(?<dbname>\w+);([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
    # 无数据库名,无schema
    match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
    overwrite => ["message"]
  }

  grok{
      # 匹配 source中的ip
    match => {"source" => "(?m)\s*%{IP:server_ip}"}
  }

  # 时间戳格式化并只保留日期
  ruby {
    code => "
      require 'time'
      event.set('date_tag', Time.at(event.get('sql_time')).to_date.to_s.delete!('-'))
    "
  }

  # 索引时间戳使用sql生成的时间,不再使用当前时间
  date {
      match => ["sql_time", "yyyy-MM-dd HH:mm:ss", "UNIX"]
      target => "@timestamp"
      locale => "cn"
    }

}

output {
  # 调试时使用,在控制台打印日志分割结果
  stdout {
    codec => rubydebug {}
  }
  # es配置
 elasticsearch { 
    hosts => "localhost:9200" 
    # 索引名称
    index => "slow-sql-%{+YYYY.MM.dd}"
  } 
}

一键安装ELFK

#!/bin/bash
echo "
#-----------------------------------------
#@Author: linvaux 
#@Email: linvaux@outlook.com
#@Desc: Auto install ELFK
#----------------------------------------
"
INFO()
{
    echo -e "\033[0;32m[INFO] $* \033[0m"
}

ERROR()
{
    echo -e "\033[0;31m[ERROR] $* \033[0m"
}

WARN()
{
    echo -e "\033[0;33m[WARN] $* \033[0m"
}

# booster the docker-hub
booster()
{   
    daemon="/etc/docker/daemon.json"
    if [[ -e ${daemon} ]];then
        INFO Backup ${daemon} success!
        mv ${daemon} ${daemon}.bak
        echo "{\"registry-mirrors\" : [\"https://hub-mirror.c.163.com\"]}" > ${daemon}
    else
        echo "{\"registry-mirrors\" : [\"https://hub-mirror.c.163.com\"]}" > ${daemon}
    fi
    INFO Config docker-hub booster success!
}

check_env()
{
    if [[ -z "$(which docker)" ]]
    then
        WARN  No docker were found,try to install! 
        INFO  Start to install docker 
        source /etc/os-release
        if [[ "$ID" == "ubuntu" ]] || [[ "$ID" == "debain" ]]
        then
            apt update
            apt install curl wget -y
            curl -fsSL https://get.docker.com | sh
            booster
            systemctl daemon-reload
            systemctl restart docker
            if [[ -z "$(which java)" ]];then
                apt install openjdk-8-jdk -y
            fi
        elif [[ "$ID" == "centos" ]]
        then
            yum update -y
            yum install  wget curl net-tools -y
            curl -fsSL https://get.docker.com | sh
            booster
            systemctl daemon-reload
            systemctl restart docker
            if [[ -z "$(which java)" ]];then
                yum install java-1.8.0-openjdk -y
            fi
        else
            ERROR  Could not support $ID platform! 
            exit 1
        fi
    fi
}

install_elasticsearch()
{
    INFO Start to install elasticsearch
	echo "vm.max_map_count=655360" >> /etc/sysctl.conf
	sysctl -p
    docker pull docker.elastic.co/elasticsearch/elasticsearch:6.5.4
    docker run -d --restart=always -p 9200:9200 -p 9300:9300 --name es -h es -e cluster.name=kiki -e node.name=node1 -e http.cors.enabled=true -e http.cors.allow-origin="*" -e xpack.security.enabled=false docker.elastic.co/elasticsearch/elasticsearch:6.5.4
}

install_kibana()
{
    INFO Start to install kibana
    docker pull kibana:6.5.4;
    docker run --restart=always -p 5601:5601 --name kibana -e ELASTICSEARCH_URL=http://127.0.0.1:9200 --network=host  -d kibana:6.5.4
}

install_filebeat_and_logstash()
{
	INFO Start to install filebeat and logstash
    source /etc/os-release
    if [[ "$ID" == "ubuntu" ]] || [[ "$ID" == "debain" ]]
    then
		wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
		apt-get install apt-transport-https -y
		echo "deb https://artifacts.elastic.co/packages/6.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-6.x.list
		apt-get update && apt install filebeat logstash -y
		if [[ $? -ne 0 ]]
		then
       		ERROR  Install filebeat and logstash failed! 
            exit 1
		fi
    elif [[ "$ID" == "centos" ]]
    then
        rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
		echo -e '
[elastic-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
' > /etc/yum.repos.d/elastic.io.repo
		yum makecache && yum install filebeat logstash -y
		if [[ $? -ne 0 ]]
        then
            ERROR  Install filebeat and logstash failed! 
            exit 1
        fi
    else
        ERROR  Could not support $ID platform! 
        exit 1
    fi

}

start_filebeat()
{
    INFO Start to call filebeat
    filebeat_yaml="/etc/filebeat/filebeat.yml"
    if [[ -f ${filebeat_yaml} ]];then
        mv ${filebeat_yaml} ${filebeat_yaml}.bak
        cp ./filebeat.yml ${filebeat_yaml}
        systemctl restart filebeat
        if [[ $? -ne 0 ]]
        then
            ERROR  Start filebeat failed! 
            exit 1
        fi
    else
        ERROR ${filebeat_yaml} not found, please check!
        exit 1
    fi

}

start_logstash()
{
    INFO Start to call logstash
    logstash_conf_dir="/etc/logstash/conf.d/"
    if [[ -e ${logstash_conf_dir} ]];then
        cp ./slow_sql_by_query.conf ${logstash_conf_dir}
        systemctl restart logstash
        if [[ $? -ne 0 ]]
        then
            ERROR  Start logstash failed! 
            exit 1
        fi
    else
        ERROR ${logstash_conf_dir} not found, please check!
        exit 1
    fi

}


run()
{
	if [[ "root" == $(whoami) ]]
	then
		INFO Start to run...
		check_env
		install_elasticsearch
		install_kibana
		install_filebeat_and_logstash
        # download_log
        start_logstash
        start_filebeat
        # check_index
        INFO Run success!
	else
		ERROR Run as root please!	
	fi
}

run
社区准则 博客 联系 社区 状态
主题