logstash進階實戰

前言

  • 前一篇我們介紹了logstash常用的一些pattern
  • 但對於logstash的一些應用以及grok的一些語法不是那麼熟悉
  • 本篇筆記將針對學到、使用到的做一個收斂

log解析實戰

[2021-02-26T16:58:29.716] [INFO] indexbackup - Backup /opt/auditbeat-self_monitor-2021.02.25.log successfully
[2021-02-26T16:58:30.052] [INFO] indexbackup - Compress /opt/auditbeat-self_monitor-2021.02.25.log successfully
[2021-02-26T17:01:49.332] [INFO] indexbackup - Backup /opt/fortinet-2021.02.25.log successfully
[2021-02-26T17:01:49.345] [INFO] indexbackup - Compress /opt/fortinet-2021.02.25.log successfully
[2021-02-26T17:01:51.095] [ERROR] indexbackup - Backup /opt/readonlyrest_audit-2021-02-25.log:connect ECONNREFUSED 192.168.0.131:1859
[2021-02-26T17:01:51.110] [ERROR] indexbackup - Compression: gzip: /opt/readonlyrest_audit-2021-02-25.log: No such file or directory

[2021-02-26T17:01:51.127] [ERROR] indexbackup - Backup /opt/metricbeat-self_monitor-2021.02.25.log:connect ECONNREFUSED 192.168.0.131:1859
[2021-02-26T17:01:51.135] [ERROR] indexbackup - Compression: gzip: /opt/metricbeat-self_monitor-2021.02.25.log: No such file or directory

[2021-02-26T17:01:51.149] [ERROR] indexbackup - Backup /opt/auditbeat-self_monitor-2021.02.25.log:connect ECONNREFUSED 192.168.0.131:1859
[2021-02-26T17:01:51.160] [ERROR] indexbackup - Compression: gzip: /opt/auditbeat-self_monitor-2021.02.25.log: No such file or directory

[2021-02-26T17:01:51.175] [ERROR] indexbackup - Backup /opt/clm-history-2021.02.25.log:connect ECONNREFUSED 192.168.0.131:1859
[2021-02-26T17:01:51.183] [ERROR] indexbackup - Compression: gzip: /opt/clm-history-2021.02.25.log: No such file or directory

[2021-02-26T17:01:51.195] [ERROR] indexbackup - Backup /opt/clm-self_monitor-2021.02.25.log:connect ECONNREFUSED 192.168.0.131:1859
[2021-02-26T17:01:51.205] [ERROR] indexbackup - Compression: gzip: /opt/clm-self_monitor-2021.02.25.log: No such file or directory

[2021-03-04T17:28:56.878] [ERROR] indexbackup - Backup /opt/kevin/indexBackup/auditbeat-self_monitor-2021.03.04:connect ECONNREFUSED

假設今天有一個log長這樣該怎麼解?

我的解法

  • 首先這個log可以被切成幾個部分: Time, Status, action, path, result
  • indexbackup則因為都相同所以不特別列項,沒意義。
input{
        file{
                path => "/root/indexbackup.log"
                start_position => "beginning"
                sincedb_path => "/dev/null"
        }
}
filter{
if [message] == ""{
        drop{}
}
        grok{
        match => {"message"=> "\[%{TIMESTAMP_ISO8601:time}\] \[%{DATA:status}\] %{DATA} \- %{DATA:action} %{PATH:route} %{GREEDYDATA:result}"}

}

}

output{
        stdout{}
}

常見的if使用

  1. if “xxx” in [message]{xxx}
  2. if [message] == “xxx”{xxx}
  3. if “xxx” in [message] and “xxx” in [xxx]{xxx}
  4. if “xxx” in [message] or “xxx” in [xxx]{xxx}

File

  • 適用場景: 將檔案餵給logstash
file{
                path => "" #absolute path
                type => "" #for free
                start_position => "beginning" #從頭還從尾
                sincedb_path => "" # 若設定/dev/null則每次都從頭
        }

Multiline

  • 適用情境: 讀取多行資料
# 放在file裡
codec => multiline {
          pattern => "正規表達式或grok語法"
          negate => "true" or "false"
          what => "previous" or "next"
        }

Date

  • 適用情境:將logstash的時間轉為log檔內的時間
date{
        target => "" # 指定的名稱,沒指定就是@datestamp
        locale => "en"
        match => ["", "MMM dd yyyy HH:mm:ss","ISO8601" ] #把哪個屬性丟到date裡面做轉換
        remove_field => [""] #移除項目
}

Elasticsearch

  • 適用情境: 將資料傳進ES讓整個ELK活絡
 elasticsearch{

                hosts => [""] # host domain name or ip
                index => "" # index name
                user => ""  # 沒有使用者跟密碼就不需要
                password => "" # 沒有使用者跟密碼就不需要
                ilm_enabled => flase # 有權限問題(401)的話就設定一下
        }

Aggregate

  • 適用情境: 有一組unique id 想進行差值計算時使用
  • 例如有一份log 如以下狀況 ``` 2021-02-24 00:02:19,054 - [[info]] - from accessLogger in application-akka.actor.default-dispatcher-1333571 [7482e260-9b45-4a9b-9c74-e7ed47d598af START][192.168.28.53][POST][/api/user/verifyEcToken] AHC/2.0

2021-02-24 00:02:19,054 - [[debug]] - from application in application-akka.actor.default-dispatcher-1335575 [dca02456-0f09-4706-a627-948a4a07208c]Send Request to UUPay http://epap:8080/uupay-api-appsrv/member/verify/verifyEcToken [AUTH eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ7XCJ0eXBlXCI6XCJFQ1RcIixcInVzZXJcIjpcIkIyMzg1RERBQ0M0NDRFQkM1RTYzMEVFRDIzNkQ0MURCNDYxNkIzRUQxQkZDQjc5RDkxNjRFNEJEM0ExMjAxQzlcIixcImlzc1wiOlwiQXByIDUsIDIwMjAsIDI6MjU6MDIgUE1cIixcInJlZlwiOlwiNTExYzNjNzQtYjc3Ny00ZjFkLTg3NjgtYmJiMzJhMDE1ZjFmXCIsXCJkZXZpY2VJZFwiOlwiNmQyMTU3NzgtZWRhZC00MTk0LWFiM2YtNjJiOTE3MWVhOGZlXCJ9In0.FbSLeCAk0MBtD_jyCIqBGZqpdChYEqR-S8aNRxAD5zs] {“clientHostIp”:”112.78.74.244”,”requestDateTime”:”20210224000217”,”deviceId”:”6d215778-edad-4194-ab3f-62b9171ea8fe”,”locale”:”zh_TW”,”requestUuid”:”dca02456-0f09-4706-a627-948a4a07208c”,”encryptMemberId”:”B2385DDACC444EBC5E630EED236D41DB4616B3ED1BFCB79D9164E4BD3A1201C9”,”token”:”eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ7XCJ0eXBlXCI6XCJFQ1RcIixcInVzZXJcIjpcIkIyMzg1RERBQ0M0NDRFQkM1RTYzMEVFRDIzNkQ0MURCNDYxNkIzRUQxQkZDQjc5RDkxNjRFNEJEM0ExMjAxQzlcIixcImlzc1wiOlwiQXByIDUsIDIwMjAsIDI6MjU6MDIgUE1cIixcInJlZlwiOlwiNTExYzNjNzQtYjc3Ny00ZjFkLTg3NjgtYmJiMzJhMDE1ZjFmXCIsXCJkZXZpY2VJZFwiOlwiNmQyMTU3NzgtZWRhZC00MTk0LWFiM2YtNjJiOTE3MWVhOGZlXCJ9In0.FbSLeCAk0MBtD_jyCIqBGZqpdChYEqR-S8aNRxAD5zs”}

2021-02-24 00:02:19,057 - [[debug]] - from application in ForkJoinPool.commonPool-worker-3 [dca02456-0f09-4706-a627-948a4a07208c][3 ms]Received response from UUPay http://epap:8080/uupay-api-appsrv/member/verify/verifyEcToken 200 application/json;charset=UTF-8 {“returnCode”:”00000”,”returnMsg”:”成功”,”data”:null}

2021-02-24 00:02:19,057 - [[info]] - from accessLogger in ForkJoinPool.commonPool-worker-3 [7482e260-9b45-4a9b-9c74-e7ed47d598af END][200][3ms] /api/user/verifyEcToken

* 我們想計算以上log從start到stop的時間
* 作法如下: 

if [status] == “START” { aggregate{ task_id => “%{key}” code => “map[‘start_to_end’] = Time.parse((event.get(‘time’))).to_f” map_action => “create” } } if [status] == “END” { aggregate{ task_id => “%{key}” code => “event.set(‘start_to_end_period’,Time.parse((event.get(‘time’))).to_f - map[‘start_to_end’])” map_action => “update” end_of_task => true timeout => 120 } } ```

  • 說明:
    1. aggregate中的task_id代表你想處理的物件
    2. code裡面是ruby的語法,用map的方式保存一個叫start_to_end的變數,透過用event.get抓值, 最後再用Time.parse轉成ruby看得懂的時間再轉成浮點數
    3. map_action可以告訴aggregate要create or update
    4. event.set的第一個值是你要創造的變數名,然後後面是他的內容,有點類似 var start_to_end_period = Time.parse((event.get(‘time’))).to_f - map[‘start_to_end’])
    5. end_of_task告知是否結束
    6. timeout 宣告timeout的時間