- ES
- logstash
- yum安装
- 重要插件配置
- grok
- date插件
- mutate
- input => filter => output
- input常用支持插件
- file读取
- beats监听
- output输出插件
- elasticsearch
- file
- 示例
- kibana
- tar安装
- 如何管理索引
- **Advanced Settings**
- filebeat
- ELK配置示例
- 日志文件中分隔出普通日志和错误日志
- 日志源文件
- logstash
- filebeat
- kibana
- kibana搜索示例
es的安装忽略, 这一块随便搜索资料安装即可,或者参考之前写的elasticsearch安装
logstash yum安装Download and install the public signing key:
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
进入到/etc/yum.repos.d/目录下,新建文件logstash.repo,填入以下内容,注意不需要更改
[logstash-7.x] name=Elastic repository for 7.x packages baseurl=https://artifacts.elastic.co/packages/7.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
# 查看可以安装的版本 yum list logstash --showduplicates | sort -r # 安装7.15.1 yum install logstash-7.15.1 # 启动 sudo systemctl start logstash.service
- /etc/logstash/ 配置文件目录
- jvm.options jvm配置,默认启动-Xms -Xmx都是1g,可以按需调整
- logstash.yml logstash主配置文件
- path.logs: /var/log/logstash 日志存放目录
- path.data: /var/lib/logstash 数据存放目录
- path.config 配置文件存放目录
- pipelines.yml
- path.config: “/etc/logstash/conf.d/*.conf” 可以把收集每个内容的配置问价写到这个目录下,以.conf结尾
- /var/log/logstash/ 日志目录
- /usr/share/logstash/ 脚本执行目录
标准输入输出测试
先停掉服务 systemctl stop logstash.service cd /usr/share/logstash/bin # 需要等待一会 ./logstash -e 'input { stdin {} } output { stdout {} }'
在控制台输入的字符会被输出出来
主要是用来提炼消息内容,如果有些消息内容我们需要取出来后面要用到,可以用到表达式将内容解析出来
一个在线debug工具http://grokdebug.herokuapp.com/
日志原文
[2022-08-03 14:05:51.528] [2#2-1554710116417277952] INFO [http-nio-8086-exec-9] c.d.b.c.c.l.AccessLogAspect - [com.ddf.group.purchase.core.controller.AuthController]-[currentUser]请求参数: {}, 执行返回结果: {"userId":"2","username":"18356784598","credit":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36","lastModifyPasswordTime":null,"remarks":null,"disabled":false,"detail":null}, 共耗时: [0ms]
现在我想将这段文字结构化,内容如下,格式和正则很像,匹配到的内容后续都可以通过[]直接引用, 如后面会用到的output模块
- 转义字符
- s匹配空格
- ()用来包裹一个表达式的解析还有对应解析内容后的变量命名
?<>被包裹的内容即是变量名,后面跟解析表达式
- .匹配任意字符
[%{TIMESTAMP_ISO8601:logtime}]s[(?date插件.*)#(? .*)]s(? [A-Z]{4,5})s[(? .*)]s(? .*)s-s(? .*)
https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match
date插件用于对日期进行匹配和替换,拿前面grok到的数据来说,日志中我们打印的日期格式是认为很容易认识的yyyy-MM-dd HH:mm:ss.SSS,这种格式丢失了时区,由于es存储时间都统一存储为UTC格式,如果我们不对时间做处理,时间存储进去之后就已经乱掉了。
接上面grok到的内容,现在我需要匹配到时间,并重新将这个时间重写到@timestamp字段中
date { match=> [ "logtime", "yyyy-MM-dd HH:mm:ss.SSS"] target => "@timestamp" timezone => "Asia/Shanghai" }
其实match的真正含义就是去匹配字符而已, 它是一个数组, 如果你有多种时间格式,这里可以写多个,每个参数之间没有前后关系,像上面这个并不是代表logtime必须是后面那个 yyyy-MM-dd HH:mm:ss.SSS格式才算match.
target就是要将匹配的这个时间重写到es字段的字段@timestamp中去,这个字段会是后面非常重要的各种时间过滤用到的
timezone,可用的时区列表可以在这个链接都看到,http://joda-time.sourceforge.net/timezones.html, 这个timezone字段在别的文章里我都没看到,不明白怎么解决时区的问题的,这个还是在官方文档中看到有这个属性,然后由于我们日志打印的字符根本没有时区概念,所以这个时间打印出来的时候就已经确定是什么时区的,这里要指定,然后es才能正确转换为UTC存储。
经过上面两个步骤后,时间已经被写入到@timestamp了,那么我们自己的字段是不是就没有意义了,可以通过这个插件删掉,当然这不是必须的
mutate { remove_field => ["logtime"] }input => filter => output
https://www.cnblogs.com/JetpropelledSnake/p/9889275.html
logstash的处理分类三个阶段, 即输入(数据源) 、 过滤、 输出
通过这三个标签, 数据从哪个地方采集,经过什么样的处理,最终输出到哪里。对应的配置标签
input { } filter { } output { }
其中每个标签又都支持不同的插件
input常用支持插件 file读取用来抓取文件变化,然后发送数据
配置示例
input file { path => ["/data/logs/*.log", "/var/log/message"] type => "app" start_position => "beginning" } }
- path 配置格式array, 指定从哪个目录读取文件,支持通配符
- type 配置格式string, 类型, 如果输出插件为elasticsearch的话, 默认情况作为es的type
- start_position 配置格式string, 参考值"beginning"和"end", 从文件开头还是结尾开始导入数据,默认结尾。只有第一次导入有效, 如果已经导入,则该配置无效
- tags 配置格式array, 用来增加标签,以便于在后续的处理流程中使用
- delimiter 配置格式string,默认"n", 文件内容的行分隔符
Beats插件用于建立监听服务,接收Filebeat或者其他beat发送的Events
由于做日志收集时,logstash比filebeat占用更多的资源,因此logstash一般都使用beats作为输入源,然后其它主机安装filebeat收集自己的日志然后输出到logstash所在的主机
配置示例
input { beats { port => 5044 } }
- host 配置格式string, 监听的ip地址,默认"0.0.0.1"
- port 配置格式number, 监听服务监听的端口
用于将事件信息写入到Elasticsearch中
配置示例
output { elasticsearch { hosts => ["127.0.0.1:9200"] user => "elastic" password => "sssssss" index => "filebeat-%{type}-%{+yyyy.MM.dd}" template_overwrite => true } }
- hosts 配置格式array, 如果有多个配置为[“127.0.0.1:9200”,“127.0.0.1:9400”]
- user 配置格式string, 访问es集群的用户名
- password 配置格式string, 访问es集群的密码
- index 将数据解析到哪个索引,比如可以按照天创建索引, 方便删除历史数据或者查询指定范围内数据
- template_overwrite 是否使用覆盖先有模板
这里也可以写一些判断语法,引用前面或者filebeat传输过来的字段值,然后用来一些动态处理。比如可以使用grok解析出日志的级别, 如果是ERROR级别,可以将日志再单独写入到一个错误日志索引里去。
再比如一个logstash要监听很多个filebeat传输过来的文件,但是索引不同,那就需要filebeat传输的时候添加自定义字段进行标识,然后再这里进行判断写入到哪个索引里。
如下提供一个示例
- fields.idx 是filebeat添加的自定义字段,如果是层级关系,在logstash里的语法要写成[fields][idx]
- [level]这个则是通过grok插件解析出来的日志级别的变量,这里可以直接引用
output { # stdout { # codec => rubydebug # } if [fields][idx] == "boot-quick-error" { elasticsearch { hosts => "www.snowball.fans:9200" user => "elastic" password => "Aa&123456" # 因为索引无法有大写字母,这里没有直接使用%{level}引用 index => "boot-quick-error-%{+YYYY.MM.dd}" } } if [fields][idx] == "boot-quick" { elasticsearch { hosts => "www.snowball.fans:9200" user => "elastic" password => "Aa&123456" # 因为索引无法有大写字母,这里没有直接使用%{level}引用 index => "boot-quick-log-%{+YYYY.MM.dd}" } } if [fields][idx] == "group-purchase-manage" { elasticsearch { hosts => "192.168.0.7:9200" user => "elastic" password => "Aa&123456" # 因为索引无法有大写字母,这里没有直接使用%{level}引用 index => "group-purchase-manage-log-%{+YYYY.MM.dd}" } } if [fields][idx] == "group-purchase-manage" and [level] == "ERROR" { elasticsearch { hosts => "192.168.0.7:9200" user => "elastic" password => "111111" # 因为索引无法有大写字母,这里没有直接使用%{level}引用 index => "group-purchase-manage-error-%{+YYYY.MM.dd}" } } }file
用户将事件数据输出到文件内
output { file { path => ... codec => line { format => "custom format: %{message}"} } }示例
在logstash/conf.d下新建一个boot-quick.conf,用来手机日志直接投递到es
input{ file { path => "/opt/services/boot-quick/logs/boot-quick.log" start_position => "beginning" type => "boot-quick.log" add_field => { "log-type" => "log" } } file { path => "/opt/services/boot-quick/logs/boot-quick-error.log" start_position => "beginning" type => "boot-quick-error.log" add_field => { "log-type" => "error" } } } output{ elasticsearch { hosts => "www.snowball.fans:9200" user => "elastic" password => "Aa&123456" # 索引格式即为boot-quick-上面的log或者error-当天时间格式,如boot-quick-log-2022.03.30 index => "boot-quick-%{log-type}-%{+YYYY.MM.dd}" } }kibana tar安装
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.15.0-linux-x86_64.tar.gz tar -zxvf kibana-7.15.0-linux-x86_64.tar.gz # 启动,默认前台启动,默认不能用root账号启动,按照之前创建的es用户使用就行了 ./bin/kibana
Kibana 默认情况下从 $KIBANA_HOME/config/kibana.yml 加载配置文件。该配置文件的格式在 配置 Kibana 中做了说明。
.tar.gz安装目录
类型 | 描述 | 默认位置 |
---|---|---|
home | Kibana home 目录或 $KIBANA_HOME | 解压包时创建的目录 |
bin | 二进制脚本,包括 kibana 启动 Kibana 服务和 kibana-plugin 安装插件 | $KIBANA_HOMEbin |
config | 配置文件,包括 kibana.yml 。 | $KIBANA_HOMEconfig |
data | Kibana 和其插件写入磁盘的数据文件位置 | |
$KIBANA_HOMEdata | ||
plugins | 插件文件位置。每一个插件都有一个单独的二级目录 | $KIBANA_HOMEplugins |
optimize | 编译过的源码。某些管理操作(如,插件安装)导致运行时重新编译源码。 | $KIBANA_HOMEoptimize |
**kibana.yml**示例配置
# kibana http服务端口 server.port: 5601 # 默认localhost以及其它配置的回环地址, 都不能被远程访问。要配置成内网或外网ip server.host: xxxxx # es集群地址 elasticsearch.hosts: ["http://localhost:9200","http://localhost:9400"] # 如果es是收到基本权限保护的话,这里提供用户名和密码 elasticsearch.username: "elastic" elasticsearch.password: "xxxxxxx" # 配置国际化 # Specifies locale to be used for all localizable strings, dates and number formats. # Supported languages are the following: English - en , by default , Chinese - zh-CN . i18n.locale: "zh_CN"如何管理索引
左上角菜单展开->翻到最下面Management->Stack Management
进入Stack Management菜单后,选择Index Management, 这里即索引管理, 添加到es中的索引都会在这个列表展示。包括我们用filebeat或者logstash使用index指定的索引
进入Stack Management菜单后,选择Index Patterns,即索引匹配, 我们可以定义一个表达式来匹配已有的索引,这样可以将一个索引或多个索引的数据通过索引匹配给映射起来,这样再通过kibana就可以可视化对应索引规则里的索引里的数据了。
比如对应着上面索引管理里的索引,我们创建索引匹配规则
使用boot-quick-log* 去匹配所有boot-quick-log开头索引数据
使用boot-quick-error*去匹配所有boot-quick-error开头的索引数据
同时选择一个用来做数据过滤的字段,如下@timestamp
查看索引匹配规则到的数据
从左侧菜单展开,选择Discover即可
查看文档内容, 如下图,详细解释了每个布局的作用
如上图, 发现@timestamp的格式不太对,我们想要修改一下格式。除了logstash grok插件解析时间外, 也还是在这个地方要修改一下字段的格式
进入Stack Management菜单后,选择Index Patterns,在列表里找到我们的索引,然后点击进去,找到@timestamp字段的编辑按钮
打开Set format, 指定自己需要的格式, 如YYYY-MM-DD HH:mm:ss.SSS
如果每个索引匹配都改一下岂不是很麻烦,后面会在Advanced Setting统一修改
Advanced Settings在Kibana的Stack Management菜单下,有一个选项Advanced Setting, 这里可以修改一些全局的配置,如上面索引的时间格式
版本列表: https://www.elastic.co/cn/downloads/past-releases#filebeat
一般用来收集日志然后输出到logstash中,而不是直接输出到es,统一由logstash做格式转换和输出到es
tar安装
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.15.0-linux-x86_64.tar.gz tar -zxvf filebeat-7.15.0-linux-x86_64.tar.gz
解压后文件目录
filebeat | 启动文件 |
---|---|
filebeat.yml | 配置文件 |
modules.d | 简化一些日志配置的模块 |
fields.yml | 好像是一些预定义字段,用于收集到es |
启动
nohup /opt/filebeat/filebeat/filebeat -e -c /opt/filebeat/filebeat/filebeat.yml -path.logs /opt/filebeat/logs &
不过这种启动方式, 如果日志变动不频繁,隔断时间之后filebeat就会自动终止, 建议使用supervistor 来管理进程
收集日志发送到logstash
filebeat.yml
filebeat.inputs: - type: log # false禁用采集工作 enable: true # 指定采集目录 paths: - /opt/services/boot-quick/logs/boot-quick.log # 日志多行合并采集表达式, 一般用来匹配异常堆栈, at.... at... at... multiline.pattern: '^[s]+at|^org.*|^java.*|^c[a-zA-Z]+.[a-zA-Z]+.+Exception:.*' # 是否否定多行日志参数,false代表不否定,即如果匹配到上面的表达式就要做什么 multiline.negate: false # 用来确定匹配的多行日志是属于上一行的后面还是下一行的前面, 结合前两个参数则代表匹配的文本数据还是属于上一行的 multiline.match: after # 添加自定义字段 fields: env: dev author: ddf # 是否将上面添加的字段定义到根属性上,默认false, 即显示时字段都挂载到根属性fields下,如果为true,每个自定义字段都是在根属性下独立的属性 fields_under_root: false output: # 输出到logstash中, logstash更换为自己的ip logstash: hosts: ["logstash:5044"]
收集的日志附带了非常多的无用属性,都是一些主机信息的,比如host. agent``ecs等,这些属性太多严重影响查看关键字段,可以把这些字段去掉。如下图
进入filebeat.yml
找到如下内容,修改如下后重启filebeat即可
processors: # 注释掉默认元数据处理器,默认启用就注释,没有就算 # - add_host_metadata: # when.not.contains.tags: forwarded # - add_cloud_metadata: ~ # - add_docker_metadata: ~ # - add_kubernetes_metadata: ~ # 按需删除不要字段 - drop_fields: fields: ["ecs","host","agent"]
modules
默认在安装目录会有一个modules目录,里面有许多预制的模块日志采集模板,可以通过一下命令操作开启和关闭
# 查看模块列表 ./filebeat modules list # 关闭 ./filebeat modules disable nginx # 开启 ./filebeat modules enable nginxELK配置示例 日志文件中分隔出普通日志和错误日志 日志源文件
- /opt/services/boot-quick/logs/ 这个目录下采集两个日志,一个普通日志,一个错误日志,然后添加自定义字段,logstash输出的时候根据自定义字段判断,然后输出到不同的索引
- /data/logs/group-purchase-manage/group-purchase.log, 直接收集日志到logstash, 然后logstash根据grok解析出日志级别,根据日志级别决定将内容输出到哪个索引
启动beats监听, 配置output
进入logstash配置目录, 如/etc/logstash,在conf.d新建一个beats.conf,名字无所谓,主要是后缀和所在目录
日志原文内容
[2022-08-03 14:05:51.528] [2#2-1554710116417277952] INFO [http-nio-8086-exec-9] c.d.b.c.c.l.AccessLogAspect - [com.ddf.group.purchase.core.controller.AuthController]-[currentUser]请求参数: {}, 执行返回结果: {"userId":"2","username":"18356784598","credit":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36","lastModifyPasswordTime":null,"remarks":null,"disabled":false,"detail":null}, 共耗时: [0ms]
通过grok表达式去结构化数据,在线工具http://grokdebug.herokuapp.com/
表达式如下
[%{TIMESTAMP_ISO8601:logtime}]s[(?.*)#(? .*)]s(? [A-Z]{4,5})s[(? .*)]s(? .*)s-s(? .*)
结构化出来的数据如下
{ "logtime": [ [ "2022-08-03 14:05:51.528" ] ], "YEAR": [ [ "2022" ] ], "MONTHNUM": [ [ "08" ] ], "MONTHDAY": [ [ "03" ] ], "HOUR": [ [ "14", null ] ], "MINUTE": [ [ "05", null ] ], "SECOND": [ [ "51.528" ] ], "ISO8601_TIMEZONE": [ [ null ] ], "user": [ [ "2" ] ], "traceId": [ [ "2-1554710116417277952" ] ], "level": [ [ "INFO" ] ], "thread": [ [ "http-nio-8086-exec-9" ] ], "class": [ [ "c.d.b.c.c.l.AccessLogAspect" ] ], "message": [ [ "[com.ddf.group.purchase.core.controller.AuthController]-[currentUser]请求参数: {}, 执行返回结果: {"userId":"2","username":"18356784598","credit":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36","lastModifyPasswordTime":null,"remarks":null,"disabled":false,"detail":null}, 共耗时: [0ms]" ] ] }
beats.conf配置内容如下
input { beats { port => 5044 client_inactivity_timeout => 36000 } } filter { grok { match => { # [user#traceId] [2022-03-30 23:35:58.988] INFO [http-nio-8082-exec-5] c.d.b.c.c.l.AccessLogAspect - 消息内容blabla # "message" => "[%{TIMESTAMP_ISO8601:logtime}]s(?filebeat[A-Z]{4,5})s[(? .*)]s(? .*)s-s(? .*)" "message" => "[%{TIMESTAMP_ISO8601:logtime}]s[(? .*)#(? .*)]s(? [A-Z]{4,5})s[(? .*)]s(? .*)s-s(? .*)" } add_field => ["user","traceId"] # remove_field => ["@timestamp"] } date { match=> [ "logtime", "yyyy-MM-dd HH:mm:ss.SSS"] # match=> [ "logtime", "ISO8601"] target => "@timestamp" timezone => "Asia/Shanghai" } mutate { # 删除我们的字段。不删除的话es:如果正则匹配成功,一条记录就会有2个时间字段 # remove_field => ["logtime"] } } output { # stdout { # codec => rubydebug # } if [fields][idx] == "boot-quick-error" { elasticsearch { hosts => "www.snowball.fans:9200" user => "elastic" password => "Aa&123456" # 因为索引无法有大写字母,这里没有直接使用%{level}引用 index => "boot-quick-error-%{+YYYY.MM.dd}" } } if [fields][idx] == "boot-quick" { elasticsearch { hosts => "www.snowball.fans:9200" user => "elastic" password => "Aa&123456" # 因为索引无法有大写字母,这里没有直接使用%{level}引用 index => "boot-quick-log-%{+YYYY.MM.dd}" } } if [fields][idx] == "group-purchase-manage" { elasticsearch { hosts => "www.snowball.fans:9200" user => "elastic" password => "Aa&123456" # 因为索引无法有大写字母,这里没有直接使用%{level}引用 index => "group-purchase-manage-log-%{+YYYY.MM.dd}" } } if [fields][idx] == "group-purchase-manage" and [level] == "ERROR" { elasticsearch { hosts => "www.snowball.fans:9200" user => "elastic" password => "Aa&123456" # 因为索引无法有大写字母,这里没有直接使用%{level}引用 index => "group-purchase-manage-error-%{+YYYY.MM.dd}" } } }
修改filebeat.yml内容如下
filebeat.inputs: - type: log enabled: true paths: - /opt/services/boot-quick/logs/boot-quick.log multiline.pattern: '^[s]+at|^org.*|^java.*|^c[a-zA-Z]+.[a-zA-Z]+.+Exception:.*' multiline.max_lines: 500 # 不否定多行模式,即使用多行匹配模式 multiline.negate: false # 正则表达式匹配的数据是上一行记录的后半部分数据,即匹配到的数据和上面的记录要合并>到同一行 multiline.match: after # 添加自定义字段 fields: idx: boot-quick author: ddf env: dev - type: log enabled: true paths: - /opt/services/boot-quick/logs/boot-quick-error.log multiline.pattern: '^[s]+at|^org.*|^java.*|^c[a-zA-Z]+.[a-zA-Z]+.+Exception:.*' multiline.max_lines: 500 # 不否定多行模式,即使用多行匹配模式 multiline.negate: false # 正则表达式匹配的数据是上一行记录的后半部分数据,即匹配到的数据和上面的记录要合并>到同一行 multiline.match: after # 添加自定义字 fields: idx: boot-quick-error author: ddf env: dev - type: log enabled: true paths: - /data/logs/group-purchase-manage/group-purchase.log multiline.pattern: '^s+at|^org.*|^java.*|^c[a-zA-Z]+.[a-zA-Z]+.+Exception:.*' multiline.max_lines: 500 # 不否定多行模式,即使用多行匹配模式 multiline.negate: false # 正则表达式匹配的数据是上一行记录的后半部分数据,即匹配到的数据和上面的记录要合并>到同一行 multiline.match: after # 添加自定义字 fields: idx: group-purchase-manage author: ddf filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: falsekibana
按照kibana相关的配置增加索引,参考Kibana章节
kibana搜索示例特殊符号转移用
搜索字段如果要连贯要用""包裹
一段话,中间部分无法确定,可以使用and连接一个搜索满足多个条件