mamori017.log

歴史的クソブログ

ヤマハRTX1200から出力したログをLogstashを使用してelasticsearchに登録、Kibanaで可視化してみた

ヤマハRTX1200のsyslogにURLフィルターのログを出力するようにしています。 このURLフィルターのログをLogstashを使用してelasticsearchに登録し、kibanaで可視化してみました。

使用するログ

syslog全体から以下のログを抜き出しテストで使用しました。

2018/02/08 00:50:07: [SD] Logfile is opened("sd1:rt_syslog.log", maximum size: 455524352 bytes)
2018/02/08 00:50:32: [URL_FILTER] Passed at LAN1 IN(2) filter: 192.168.1.2 : http://www.hatena.ne.jp/
2018/02/08 00:50:32: [URL_FILTER] Passed at LAN1 OUT(1) filter: 192.168.1.2 : http://www.hatena.ne.jp/
2018/02/08 01:03:39: [URL_FILTER] Passed at LAN1 IN(2) filter: 192.168.1.2 : http://weathernews.jp/pinpoint/cgi/search_result.fcgi?service=11&lat=&lon=&ameno=67437&name=広島&pref=67
2018/02/08 01:03:39: [URL_FILTER] Passed at LAN1 OUT(1) filter: 192.168.1.2 : http://weathernews.jp/pinpoint/cgi/search_result.fcgi?service=11&lat=&lon=&ameno=67437&name=広島&pref=67
2018/02/13 19:43:51: LAN1: PORT8 link down
2018/02/13 19:43:53: LAN1: PORT8 link up (10BASE-T Full Duplex)
2018/02/13 19:44:33: LAN1: PORT8 link down
2018/02/13 19:44:35: LAN1: PORT8 link up (1000BASE-T Full Duplex)
2018/02/12 17:36:34: [URL_FILTER] Passed at LAN1 IN(2) filter: 192.168.1.2 : http://b.hatena.ne.jp/js/hatena_dfp2.js
2018/02/12 17:36:34: [URL_FILTER] Passed at LAN1 OUT(1) filter: 192.168.1.2 : http://b.hatena.ne.jp/js/hatena_dfp2.js
2018/02/14 19:10:32: [EXTMEMBOOT] Configuration file was not found.
2018/02/14 19:10:32: [EXTMEMBOOT] Firmware file was not found.
2018/02/15 01:14:05: [URL_FILTER] Passed at LAN1 IN(2) filter: 192.168.1.2 : http://www.yahoo.co.jp/
2018/02/15 01:14:05: [URL_FILTER] Passed at LAN1 OUT(1) filter: 192.168.1.2 : http://www.yahoo.co.jp/
2018/02/15 01:18:23: [URL_FILTER] Passed at LAN1 IN(2) filter: 192.168.1.2 : http://hatenablog.com/?via=201002
2018/02/15 01:18:23: [URL_FILTER] Passed at LAN1 OUT(1) filter: 192.168.1.2 : http://hatenablog.com/?via=201002

Grokフィルターパターンの作成

このログをGrokフィルターで項目ごとにパースします。 ログの中でYYYY/MM/DD HH:MM:SS: [URL_FILTER]...からはじまるデータのみ対象になるようフィルターパターンを作成します。

Grokのパターンは以下を参照しました。 logstash-patterns-core/grok-patterns at master · logstash-plugins/logstash-patterns-core · GitHub

フィルターパターンはGrok Debuggerを使用しながら作成しました。 Grok Debuggerは入力したいログとパターンを入力することでテストしながら作成ができて便利です。

Grok Debugger

今回は以下のようなフィルターパターンを作成しました。*1

%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}\s%{HOUR:hour}:%{MINUTE:min}:%{SECOND:sec}:\s*\[URL_FILTER\]\s*Passed\s*at\s*LAN%{INT:lan}\s*%{WORD:inout}\(%{INT:filter}\)\s*filter:\s*%{IP:clientip}\s*:\s*%{URIPROTO:uriproto}://(?:%{USER:user}(?::[^@]*)?@)?(?:%{URIHOST:urihost})?(?:%{URIPATHPARAM:uriparam})

Logstashの設定・起動

作成したフィルターパターンをlogstash/conf.d/pipeline.confに記述します。

$ sudo /etc/logstash/conf.d/pipeline.conf

ログファイルは先頭から読み込みます。 テストなのでインデックスも特に指定していません。

input {
    file {
        path => "/home/urlFilter.log"
        start_position => "beginning"
    }
}

filter{
  grok {
    match => ["message", "%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}\s%{HOUR:hour}:%{MINUTE:min}:%{SECOND:sec}:\s*\[URL_FILTER\]\s*Passed\s*at\s*LAN%{INT:lan}\s*%{WORD:inout}\(%{INT:filter}\)\s*filter:\s*%{IP:clientip}\s*:\s*%{URIPROTO:uriproto}://(?:%{USER:user}(?::[^@]*)?@)?(?:%{URIHOST:urihost})?(?:%{URIPATHPARAM:uriparam})"]
  }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
   }
}

Logstashを起動しelasticsearchにデータを登録します。

$ sudo service logstash start

elasticsearchの確認

elasticsearchにデータが登録されているか見てみます。

$ curl -XGET localhost:9200/_cat/indices?v

テストで使用したログ17行が取り込まれています。 インデックスは指定しなかったのでlogstash-2018.02.17で作成されています。

$ curl -XGET localhost:9200/_cat/indices?v
health status index               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   .kibana             9fT-tqMkRfq0mG30uGXBug   1   1          1            0      3.2kb          3.2kb
yellow open   logstash-2018.02.17 lvliGqYrSV6GFuVyW2DKcQ   5   1         17            0     57.5kb         57.5kb

kibanaで可視化

kibanaで取り込んだデータを見てみます。 Discoverでelasticsearchにデータを取り込んだ時間ごとの件数と生のデータを見ることができます。 f:id:mamori017:20180217194022p:plain

取り込んだデータを選択するとGrokフィルターに記述した通りにデータがパースされている事が確認できます。 f:id:mamori017:20180217194257p:plain

Grokフィルターでパース出来なかったデータについては、tagsに_grokparsefailureと記述されています。 取り込んだテストデータの1行目、6~9行目、12、13行目が該当していました。 f:id:mamori017:20180217194310p:plain

取り込んだデータが想定通りにパースされていたのでVisualizeで可視化してみます。 Pieチャートでデータを見てみたところ、初期状態では取り込んだ全17件がCountで表示されています。

f:id:mamori017:20180217200619p:plain

urlhost単位で件数を見るため、buckets項目でSplit Slicesを選択し、AggregationでTerms、Fieldでurlhost.keywordを選択します。 設定後、Apply Changes(▷ボタン)をクリックするとチャートが分割されます。 ここでチャート内にカーソルを当てると各データのカウントを見ることが出来ます。

このとき、テストで使用したデータの17件のうち7件はGrokフィルターに当てはまらなかったデータになるのでチャートの対象は10件になります。 逆にTermsでtags.keywordを選択すると、10件のデータにはtagsに値が無いので_grokparsefailureの7件が結果として表示されます。

f:id:mamori017:20180217200627p:plain

同じ要領でVisualizeからData Tableを選択します。 Split RowsでAggregationでTerms、Fieldでurlhost.keywordを選択すると各データの件数が表示されます。 f:id:mamori017:20180217200813p:plain

Visualizeは画面右上のSaveから設定を保存することができます。

さらにデータを取り込んだ結果

Grokフィルターでデータをパースできることを確認できたので、とりあえず手元にあるデータ150000件を登録してみました。 パッと見た感じやはりというか広告、CDNと思われる場所とのアクセス数が多い気がします。 f:id:mamori017:20180217235147p:plain

Pieチャートとは別にVertical Barで日ごとのアクセス先が可視化できるようにしました。*2 Kibanaの運用は当面これでやっていこうと思います。

f:id:mamori017:20180218031301p:plain

データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]

データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]

*1:syslogで対象としたいログはこれだけではないので、フィルターはGistで更新していく。https://gist.github.com/mamori017/14b3ea1b809a797042bb0232214e4a52

*2:横軸をSplit Chartでログ出力日、出力月ごとに、URIでグラフを分割するようにしています。