ある研究者の手記

セキュリティとかゲームとかプログラミングとかそのへん

テキスト出力されたログファイルから元のログフォーマットを分析するツールを作った

タイトルの通りなのですが、昔ちょっとやっていたテーマに関連したツールをGo言語の練習がてら作ってみました*1

ログファイルから元のログフォーマットを分析するとは

ここで言うログのフォーマットというのは所謂フォーマット文のことを指します。

log.Printf("Requested from %s", ipAddr)

このコードから以下のようなログが出力されます。

2018/05/23 23:25:00 Requested from 10.0.2.1
2018/05/23 23:25:10 Requested from 192.168.1.5
2018/05/23 23:25:24 Requested from 10.0.1.5

元になったフォーマットは %s の部分にIPアドレスらしきものが埋め込まれて下図のようなテキストとして出力されます。この例は非常に簡単なので下から上を推測するのは容易ですが、内容が複雑になってくると「これ値なのか固定文なのかどっちだ?」ということがまれによくあります。この下の出力から上のフォーマット文(に近いもの)を推測するのが今回作成したツールになります。このツールは 1) すでに出力されたログファイルからフォーマットを推定する、そして 2) 推定したフォーマットを利用し、そのフォーマットに該当するログがログファイルのどのあたりに出現したのかを示す という2つの機能を実装しています。

なんでこんなツールが必要なのか

実際には、正規化・構造化されたログデータのみを扱う環境であればこのツールは不要ですが、以下のような状況で役立ちます。

  • ログの全体像を把握したい場合 : セキュリティ分析の文脈で特に多いと思いますが、今まで見たことのない大量のログをみてそこから知見を導き出さないと行けない場合があります。そういったときにひたすら less コマンドで眺めようとしても人間には厳しいので、全体としてどういうログがあるのか? そしてどういう分布をしているのか? ということがわかると分析のとっかかりが非常に楽になります。特にセキュリティ分析で必要なのは多くの場合全体の99%を占める通常のサービスに関するログではなく、何か異常が起こったポイントになります。異常が起こった際のログというのは通常見られないエラーや処理が発生しやすいため、異常なログ=珍しいフォーマットのログがどこに出現するのかを把握できると、そこにまず注目して分析するという足がかりを作ることができます。
  • テキスト形式で出力されるログを再利用しないとならない場合 : すでにサービスなどが稼働しておりテキストではログを出力するという場合、そのログを正規表現にかけるなどして中に含まれている値を抽出する必要があります。仕様書がある場合はいいですが、そうでない場合はソースコードを見るか、もしくは正規表現を書く→網羅でいているか確認する→正規表現を直す、みたいなことを繰り返さないとならずかなり面倒です*2。このツールだと抜くべき値の正規表現の推定まではしてくれませんが、既存のログから分かる範囲ではどこまでやればいいかを網羅できるので作業的に楽になります。

使い方

Go言語を使う環境が整っていれば go get github.com/m-mizutani/logptn でインストールされます。

GitHub - m-mizutani/logptn: Generate Log Format from real log data

例として(短いものですが)以下のようなログをツールに入力してみます。

$ cat test.log
Feb  1 07:56:49 pylon sshd[5153]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.0.3  user=root
Feb  1 07:56:51 pylon sshd[5153]: Failed password for root from 192.168.0.3 port 7176 ssh2
Feb  1 07:56:51 pylon sshd[5153]: Connection closed by 192.168.0.3 [preauth]
Feb  1 08:01:26 pylon sshd[5156]: Invalid user upload from 192.168.0.3
Feb  1 08:01:26 pylon sshd[5156]: input_userauth_request: invalid user upload [preauth]
Feb  1 08:01:26 pylon sshd[5156]: pam_unix(sshd:auth): check pass; user unknown
Feb  1 08:01:26 pylon sshd[5156]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.0.3
Feb  1 08:01:28 pylon sshd[5156]: Failed password for invalid user upload from 192.168.0.3 port 51058 ssh2
Feb  1 08:01:28 pylon sshd[5156]: Connection closed by 192.168.0.3 [preauth]
Feb  1 08:05:01 pylon CRON[5159]: pam_unix(cron:session): session opened for user root by (uid=0)
Feb  1 08:05:01 pylon CRON[5159]: pam_unix(cron:session): session closed for user root
Feb  1 08:05:54 pylon sshd[5162]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.0.3  user=root
Feb  1 08:05:56 pylon sshd[5162]: Failed password for root from 192.168.0.3 port 33005 ssh2
Feb  1 08:05:56 pylon sshd[5162]: Connection closed by 192.168.0.3 [preauth]
Feb  1 08:10:28 pylon sshd[5165]: Invalid user mythtv from 192.168.0.3
Feb  1 08:10:28 pylon sshd[5165]: input_userauth_request: invalid user mythtv [preauth]
Feb  1 08:10:28 pylon sshd[5165]: pam_unix(sshd:auth): check pass; user unknown
Feb  1 08:10:28 pylon sshd[5165]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.0.3
Feb  1 08:10:30 pylon sshd[5165]: Failed password for invalid user mythtv from 192.168.0.3 port 59978 ssh2
Feb  1 08:10:30 pylon sshd[5165]: Connection closed by 192.168.0.3 [preauth]
Feb  1 08:15:01 pylon CRON[5168]: pam_unix(cron:session): session opened for user root by (uid=0)
Feb  1 08:15:01 pylon CRON[5168]: pam_unix(cron:session): session closed for user root
Feb  1 08:15:26 pylon sshd[5171]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=10.2.3.4  user=root
Feb  1 08:15:28 pylon sshd[5171]: Failed password for root from 10.2.3.4 port 60733 ssh2
Feb  1 08:15:28 pylon sshd[5171]: Connection closed by 10.2.3.4 [preauth]
Feb  1 08:17:01 pylon CRON[5173]: pam_unix(cron:session): session opened for user root by (uid=0)
Feb  1 08:17:01 pylon CRON[5173]: pam_unix(cron:session): session closed for user root
Feb  1 08:20:35 pylon sshd[5177]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=10.2.3.4  user=root
Feb  1 08:20:37 pylon sshd[5177]: Failed password for root from 10.2.3.4 port 44877 ssh2
Feb  1 08:20:37 pylon sshd[5177]: Connection closed by 10.2.3.4 [preauth]
Feb  1 08:25:01 pylon CRON[5180]: pam_unix(cron:session): session opened for user root by (uid=0)
Feb  1 08:25:01 pylon CRON[5180]: pam_unix(cron:session): session closed for user root
Feb  1 08:25:16 pylon sshd[5183]: Invalid user user from 10.2.3.4

このデータを入力させると以下のような出力をします。

./logptn test.log
2018/05/20 13:30:55 arg:test.log
     4 [4ffb267b] Feb  1 *:*:* pylon sshd[*]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=*  user=root
     4 [845f4659] Feb  1 *:*:* pylon sshd[*]: Failed password for root from * port * ssh2
     6 [847ccf35] Feb  1 *:*:* pylon sshd[*]: Connection closed by * [preauth]
     3 [de051cd9] Feb  1 08:*:* pylon sshd[*]: Invalid user * from *
     2 [8e9e2a13] Feb  1 08:*:* pylon sshd[*]: input_userauth_request: invalid user * [preauth]
     2 [22190c74] Feb  1 08:*:* pylon sshd[*]: pam_unix(sshd:auth): check pass; user unknown
     2 [83fba2bf] Feb  1 08:*:* pylon sshd[*]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.0.3
     2 [f1ba83ea] Feb  1 08:*:* pylon sshd[*]: Failed password for invalid user * from 192.168.0.3 port * ssh2
     4 [e4a6f815] Feb  1 08:*:01 pylon CRON[*]: pam_unix(cron:session): session opened for user root by (uid=0)
     4 [5256845b] Feb  1 08:*:01 pylon CRON[*]: pam_unix(cron:session): session closed for user root

この出力では、左から「そのフォーマットが出現した回数」「フォーマットID」「推定されたフォーマット」になっています。また、推定されたフォーマットにおいて、値として埋め込まれると思われる部分を * という記号に置き換えています。この例ではサンプルが少ないため、IPアドレスの部分が * になっていないものもありますが、サンプル数が増えるとこれも * に置き換わります。上記は人間が読みやすいテキスト形式での出力になっていますが、別のプログラムで扱えるようにjson形式でも出力できます。

./logptn test.log -d sjson | jq . 
{
  "formats": [
    {
      "segments": [
        "Feb  1 ",
        null,
        ":",
        null,
        ":",
        null,
        " pylon sshd[",
        null,
        "]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=",
        null,
        "  user=root"
      ],
      "count": 4
    },
(snip)

さらに、そのフォーマットがログ全体のどの行数あたりに現れたのかをHTML形式で一覧にして表示することもできます。

$ ./logptn  ./var/log/secure -d heatmap -o secure.html

上記のコマンドでログのフォーマット、およびそれが何行目あたりに出現したのかを示すヒートマップを作成できます。ヒートマップは左が推定されたフォーマット、上のヘッダが行数(何行目〜何行目)、右が合計ログ数になっています。下の画像がちょっと小さくて見づらいですが、HTMLファイル自体はここからもダウンロード可能です。

f:id:mztnex:20180520141045p:plain 大きい画像

性能

計算量としては O(NM) になり、N がログファイルに含まれるログの総数、 M が推定されたフォーマットの数になります。いろいろなログファイルで試してみましたが M は10〜100ぐらいに収束するのでだいたいは N 、つまりログの総数が影響します。雑にしか計測していませんが、M=40 ほどになるデータに対して MacBookPro Early 2015 (2.7 GHz Intel Core i5) で動かして、おおよそ30,000 logs/sec 程度で動作しました。多分コード的にはもっと最適化できるんですが、まだそこまでは手を付けてません。

動作の仕組み

このあとはどうやってフォーマットを作成しているかという話なので、興味のある方だけどうぞ。

f:id:mztnex:20180524021502p:plain

すでに出力されているテキストログから元になったログフォーマットを推定する話は昔から研究としてありますが*3、今回 logptn で実装したのは非常に簡易なアルゴリズムになっています。昔は自分もいろいろとこねくり回した方法を考えたのですが、どれだけ複雑なアルゴリズムを使ったところで「まあ所詮は推測にすぎないよね」という割り切りを得たので、極力シンプルに実装しました。

手順

このフォーマット推定のアルゴリズムは4段階に別れており、それぞれ順番に解説します。

Phase1) Import logs

このアルゴリズムはバッチ型(ある程度の固まった量のログデータをまとめてから処理するタイプ)になります。一方で次々到着するログを逐次的に処理するオンライン型やストリーム型と呼ばれる手法もありますが、今回はもとになるデータセットは事前に決まっている(あとから増えない)ものとします。

取り込みに関しては全く難しいことはしておらず、現在は完全に1行1ログとして分割して切り分けています。全体の流れとしては複数行のログでも対応できないことはないアルゴリズムになっていますが、複数行で切り分ける基準がログごとに様々すぎるので、現状では対応していません。

Phase2) Chunking

データを1つずつのログ(現在は1行のテキスト、と同じ)に分割したあとは、そのログに含まれる単語などに分解する Chunking を実施します。これはログ内に埋め込まれる値は単語などの短い単位として出力される、という前提をおくことで、値として出力される単語の長さが違うことでフォーマットがばらつくのを防ぐのが目的です。例えば 0.0.0.0255.255.255.255 という2つが別のログに現れたとしても同じ「IPアドレス」として認識してほしいですが、これを一文字ずつ比較しようとすると2つはかなり異なる文字列長になります(0.0.0.0 が7文字、255.255.255.255 が15文字)もちろんこういう文字列長の違いをうまく吸収してくれるアルゴリズムなら気にしなくていいのですが、だいたいは問題を単純化するために事前にログを単語(ここではchunkとよんでいます)に分割しています。

分割については、これが自然言語的な英語であれば単純に空白で区切ればいいのですが、ログファイルというのは様々な記号が含まれてそれによって区切られている場合もあるので、なかなか空白だけというわけにはいきません。実際にはある特定の記号が出てきた場合に区切るという実装にしており、現状 logptn だと \t!,:;[]{}()<>=|\\*\"' がデフォルトの区切り文字となっています。これらのうちどれかがでてきたらchunkとして切り分けてきます。(詳しくはこちら を参照)この記号の選び方は完全に自分の経験に基づくヒューリスティックなものなので明確な根拠はないですが、まあだいたいうまくいっている感じです。一方、これらの記号にもとづいてこのフェーズである程度正しく文を分割できることが前提となっているため、日本語のようなマルチバイト文字のログについてはおそらくうまくいきません。

また、ヒューリスティックを入れていいと考えるなら正規表現などによって日付やURL、Eメールアドレスなど値として埋め込まれると考えられるような形式の値をを特定して切り出してしまえばより精度があげられます。実際、それを見越して正規表現でそういった機能も実装したのですが、Goの正規表現モジュールが想像以上に遅くて厳しかったので、現状デフォルトの機能としては外しています。(一応、 --enable-regex オプションを使うと有効化はされます)

実際にこの方法で文を分割すると、以下のようになります。

  • Before: pam_unix(cron:session): session closed for user root
  • After: pam_unix, (, cron, :, session, ), :, , session, , closed, , for, , user, , root

Phase3) Clustering

Chukingされたログが出揃ったら次は類似しているログをクラスタリングします。クラスタリングも非常にシンプルなアルゴリズムを使っています(名前知らないだけで既存のクラスタリング手法かもですが)。以下の手順をログ1つずつ順番に試します。

  1. Chunk長(1つのログから生成されたChunkの数)が同じクラスタがなかったら手順終了
  2. 同じChunk長のクラスタ全部に対して距離を計算する
  3. クラスタとログの距離は、クラスタの中心になっているログとどれくらい近いかで計算する。ログ間の距離は全体でChunkが一致する割合を見て、割合が高いほど近いと判断する。
  4. クラスとの距離がthreshold(デフォルトでは0.7)を超えていたら、もっとも近いクラスタに組み込まれて手順終了
  5. もしthresholdを下回るクラスタのみだったらそのまま終了

クラスタに組み込まれずに終了した場合は、そのログを元に新しいクラスタを生成します。これをすべてのログに対して実施します。

Phase4) Estimate Format

クラスタが生成されたらあとはフォーマットを推定するのみです。これも非常にシンプルなアプローチで、クラスタ内のすべてのログに対して積をとっているイメージです。クラスタ内のログを L1, L2, ... , Ln としたとき、まず L1L2 で積をとって、L' を生成し、そのあとは LiL' で積をとる、という処理を繰り返します。

  • L1: Requested from 10.0.2.3
  • L2: Requested from 192.168.0.1
  • L': Requested from null

同じクラスタだとChunkの長さがすべて同じになるので、互いのログのChunkを先頭から比較します。同じ内容であればそのまま、もし異なる内容であれば null とします。この null がフォーマットの中で値が入ると考えられる部分となります。このあと L3 と比較する際は null の部分はどのChunkと比較しても必ず null になります。

この操作をクラスタ内のすべてのログに対して実施すると、最終的にそのクラスタのすべてのログに適合できるフォーマットが生成(推定)できるということになります。

この手法の弱点

前述したとおり、このアルゴリズムはChunkingがある程度正しくできていて、かつ1 Chunk=埋め込まれる値になることが強い前提となっています。そのため、たとえばChunkに分割されやすい任意長の文字列が登場するようなログ(極端に言えば、例えばユーザによる入力をそのままログに書き出すようなログ)に対しては非常に低い精度になると見込まれます。また任意長でないとしても同様にChunk分割によって1つの値が複数に分割されてしまうようなログには耐性がないと言えます。

また、現状のアルゴリズムだと複数のクラスタから同じフォーマットが生成される可能性があります。クラスタを生成する時のアルゴリズムがわりと雑なのでたまたま距離の計算でしきい値を超えてしまったなどの場合に、本来同じクラスタであるべきログが2つ以上のクラスタに分離してしまいます。これについてはまだすっきりした方法を思いついてないですが、生成後のフォーマット同士を比較してマージするというような処理が必要かなと考えています。

参考文献

*1:言い訳がましいのですがまだGo言語まともに書き始めて1ヶ月ぐらいという有様なので実装物について、流儀的にこういうの違うよ、とかGoならこういうふうにも書けるよ、みたいなコメントは大歓迎です

*2:そんな環境のほうがおかしいだろというツッコミはあると思いますが、稀によくあるシチュエーションでした。特に前職

*3:参考文献として幾つか論文へのリンクを貼っておきました