logstash進階實戰
前言
- 前一篇我們介紹了logstash常用的一些pattern
- 但對於logstash的一些應用以及grok的一些語法不是那麼熟悉
- 本篇筆記將針對學到、使用到的做一個收斂
log解析實戰
[2021-02-26T16:58:29.716] [INFO] indexbackup - Backup /opt/auditbeat-self_monitor-2021.02.25.log successfully
[2021-02-26T16:58:30.052] [INFO] indexbackup - Compress /opt/auditbeat-self_monitor-2021.02.25.log successfully
[2021-02-26T17:01:49.332] [INFO] indexbackup - Backup /opt/fortinet-2021.02.25.log successfully
[2021-02-26T17:01:49.345] [INFO] indexbackup - Compress /opt/fortinet-2021.02.25.log successfully
[2021-02-26T17:01:51.095] [ERROR] indexbackup - Backup /opt/readonlyrest_audit-2021-02-25.log:connect ECONNREFUSED 192.168.0.131:1859
[2021-02-26T17:01:51.110] [ERROR] indexbackup - Compression: gzip: /opt/readonlyrest_audit-2021-02-25.log: No such file or directory
[2021-02-26T17:01:51.127] [ERROR] indexbackup - Backup /opt/metricbeat-self_monitor-2021.02.25.log:connect ECONNREFUSED 192.168.0.131:1859
[2021-02-26T17:01:51.135] [ERROR] indexbackup - Compression: gzip: /opt/metricbeat-self_monitor-2021.02.25.log: No such file or directory
[2021-02-26T17:01:51.149] [ERROR] indexbackup - Backup /opt/auditbeat-self_monitor-2021.02.25.log:connect ECONNREFUSED 192.168.0.131:1859
[2021-02-26T17:01:51.160] [ERROR] indexbackup - Compression: gzip: /opt/auditbeat-self_monitor-2021.02.25.log: No such file or directory
[2021-02-26T17:01:51.175] [ERROR] indexbackup - Backup /opt/clm-history-2021.02.25.log:connect ECONNREFUSED 192.168.0.131:1859
[2021-02-26T17:01:51.183] [ERROR] indexbackup - Compression: gzip: /opt/clm-history-2021.02.25.log: No such file or directory
[2021-02-26T17:01:51.195] [ERROR] indexbackup - Backup /opt/clm-self_monitor-2021.02.25.log:connect ECONNREFUSED 192.168.0.131:1859
[2021-02-26T17:01:51.205] [ERROR] indexbackup - Compression: gzip: /opt/clm-self_monitor-2021.02.25.log: No such file or directory
[2021-03-04T17:28:56.878] [ERROR] indexbackup - Backup /opt/kevin/indexBackup/auditbeat-self_monitor-2021.03.04:connect ECONNREFUSED
假設今天有一個log長這樣該怎麼解?
我的解法
- 首先這個log可以被切成幾個部分: Time, Status, action, path, result
- indexbackup則因為都相同所以不特別列項,沒意義。
input{
file{
path => "/root/indexbackup.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter{
if [message] == ""{
drop{}
}
grok{
match => {"message"=> "\[%{TIMESTAMP_ISO8601:time}\] \[%{DATA:status}\] %{DATA} \- %{DATA:action} %{PATH:route} %{GREEDYDATA:result}"}
}
}
output{
stdout{}
}
常見的if使用
- if “xxx” in [message]{xxx}
- if [message] == “xxx”{xxx}
- if “xxx” in [message] and “xxx” in [xxx]{xxx}
- if “xxx” in [message] or “xxx” in [xxx]{xxx}
File
- 適用場景: 將檔案餵給logstash
file{
path => "" #absolute path
type => "" #for free
start_position => "beginning" #從頭還從尾
sincedb_path => "" # 若設定/dev/null則每次都從頭
}
Multiline
- 適用情境: 讀取多行資料
# 放在file裡
codec => multiline {
pattern => "正規表達式或grok語法"
negate => "true" or "false"
what => "previous" or "next"
}
Date
- 適用情境:將logstash的時間轉為log檔內的時間
date{
target => "" # 指定的名稱,沒指定就是@datestamp
locale => "en"
match => ["", "MMM dd yyyy HH:mm:ss","ISO8601" ] #把哪個屬性丟到date裡面做轉換
remove_field => [""] #移除項目
}
Elasticsearch
- 適用情境: 將資料傳進ES讓整個ELK活絡
elasticsearch{
hosts => [""] # host domain name or ip
index => "" # index name
user => "" # 沒有使用者跟密碼就不需要
password => "" # 沒有使用者跟密碼就不需要
ilm_enabled => flase # 有權限問題(401)的話就設定一下
}
Aggregate
- 適用情境: 有一組unique id 想進行差值計算時使用
- 例如有一份log 如以下狀況 ``` 2021-02-24 00:02:19,054 - [[[37minfo[0m]] - from accessLogger in application-akka.actor.default-dispatcher-1333571 [7482e260-9b45-4a9b-9c74-e7ed47d598af START][192.168.28.53][POST][/api/user/verifyEcToken] AHC/2.0
2021-02-24 00:02:19,054 - [[[36mdebug[0m]] - from application in application-akka.actor.default-dispatcher-1335575 [dca02456-0f09-4706-a627-948a4a07208c]Send Request to UUPay http://epap:8080/uupay-api-appsrv/member/verify/verifyEcToken [AUTH eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ7XCJ0eXBlXCI6XCJFQ1RcIixcInVzZXJcIjpcIkIyMzg1RERBQ0M0NDRFQkM1RTYzMEVFRDIzNkQ0MURCNDYxNkIzRUQxQkZDQjc5RDkxNjRFNEJEM0ExMjAxQzlcIixcImlzc1wiOlwiQXByIDUsIDIwMjAsIDI6MjU6MDIgUE1cIixcInJlZlwiOlwiNTExYzNjNzQtYjc3Ny00ZjFkLTg3NjgtYmJiMzJhMDE1ZjFmXCIsXCJkZXZpY2VJZFwiOlwiNmQyMTU3NzgtZWRhZC00MTk0LWFiM2YtNjJiOTE3MWVhOGZlXCJ9In0.FbSLeCAk0MBtD_jyCIqBGZqpdChYEqR-S8aNRxAD5zs] {“clientHostIp”:”112.78.74.244”,”requestDateTime”:”20210224000217”,”deviceId”:”6d215778-edad-4194-ab3f-62b9171ea8fe”,”locale”:”zh_TW”,”requestUuid”:”dca02456-0f09-4706-a627-948a4a07208c”,”encryptMemberId”:”B2385DDACC444EBC5E630EED236D41DB4616B3ED1BFCB79D9164E4BD3A1201C9”,”token”:”eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ7XCJ0eXBlXCI6XCJFQ1RcIixcInVzZXJcIjpcIkIyMzg1RERBQ0M0NDRFQkM1RTYzMEVFRDIzNkQ0MURCNDYxNkIzRUQxQkZDQjc5RDkxNjRFNEJEM0ExMjAxQzlcIixcImlzc1wiOlwiQXByIDUsIDIwMjAsIDI6MjU6MDIgUE1cIixcInJlZlwiOlwiNTExYzNjNzQtYjc3Ny00ZjFkLTg3NjgtYmJiMzJhMDE1ZjFmXCIsXCJkZXZpY2VJZFwiOlwiNmQyMTU3NzgtZWRhZC00MTk0LWFiM2YtNjJiOTE3MWVhOGZlXCJ9In0.FbSLeCAk0MBtD_jyCIqBGZqpdChYEqR-S8aNRxAD5zs”}
2021-02-24 00:02:19,057 - [[[36mdebug[0m]] - from application in ForkJoinPool.commonPool-worker-3 [dca02456-0f09-4706-a627-948a4a07208c][3 ms]Received response from UUPay http://epap:8080/uupay-api-appsrv/member/verify/verifyEcToken 200 application/json;charset=UTF-8 {“returnCode”:”00000”,”returnMsg”:”成功”,”data”:null}
2021-02-24 00:02:19,057 - [[[37minfo[0m]] - from accessLogger in ForkJoinPool.commonPool-worker-3 [7482e260-9b45-4a9b-9c74-e7ed47d598af END][200][3ms] /api/user/verifyEcToken
* 我們想計算以上log從start到stop的時間
* 作法如下:
if [status] == “START” { aggregate{ task_id => “%{key}” code => “map[‘start_to_end’] = Time.parse((event.get(‘time’))).to_f” map_action => “create” } } if [status] == “END” { aggregate{ task_id => “%{key}” code => “event.set(‘start_to_end_period’,Time.parse((event.get(‘time’))).to_f - map[‘start_to_end’])” map_action => “update” end_of_task => true timeout => 120 } } ```
- 說明:
- aggregate中的task_id代表你想處理的物件
- code裡面是ruby的語法,用map的方式保存一個叫start_to_end的變數,透過用event.get抓值, 最後再用Time.parse轉成ruby看得懂的時間再轉成浮點數
- map_action可以告訴aggregate要create or update
- event.set的第一個值是你要創造的變數名,然後後面是他的內容,有點類似 var start_to_end_period = Time.parse((event.get(‘time’))).to_f - map[‘start_to_end’])
- end_of_task告知是否結束
- timeout 宣告timeout的時間