Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Beatsのつくりかた - ヒープとGCを取得するJstatbeatsを実装してみた

Hello, world! @ です!
このエントリーはElasitcsearch Advent Calendarの20日目になります。


先日、Elastic{ON} Tour 2015 Tokyoが行われました。

世界各地で行われたイベントの中で最後となる東京では、無料イベントにも関わらず、
セッションは充実しているし、資料が印刷されて、しかも翻訳までされているし、
ランチに人形町今半のお弁当が配られるし
帰るときにもらったノベルティグッズが、なんとElasticロゴ入りモバイルバッテリーだったり、
とんでもないホスピタリティあふれるイベントでした。

Acroquestもelastic社のパートナーとして提携して、今回もデモスポンサーとして参加しましたが
それ以外にも何かイベントの盛り上げをお手伝いができないかなと思い、リアルタイムブログに挑戦してみました。

当日のブログ一覧 : http://acro-engineer.hatenablog.com/search?q=%23elasticon

同僚のサポートもあり、いずれもセッション終了から数分以内にはレポートをあげることができました!


ということでこのAdvent Calendar、元々はElastic{ON}のことを書こうかと思っていたのですが、
既に全部レポートしちゃったので、代わりに、Beatsの作り方について紹介したいと思います。

Beatsのつくりかた

Beatsとは何ぞやというところは、以前にPINOKIOが書いたエントリーを見てください。
acro-engineer.hatenablog.com

Beatsの作り方は、公式ドキュメントのDeveloper Guideにまとまっています。
https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html

この手順に沿いつつ、JavaのヒープサイズやGCの回数をjstatコマンドで取得する、
「Jstatbeat」を作ってみたいと思います。


なお、今回作成したコードは、Githubで公開しています。
https://github.com/cero-t/jstatbeat

こんな感じのダッシュボードを作ることができます。
f:id:acro-engineer:20151220045735p:plain
サクッと作れる割に、良い感じです。


ちなみにBeatsシリーズはGo言語で書かれているため、Goの開発環境を事前に作っておく必要があります。
僕はIntelliJ + golang-pluginを使っています。

Goは文法的にも取っつきやすく、チュートリアルも充実しており、習得しやすいと感じています。
クロスプラットフォーム向けコンパイルや、メモリフットプリントの小ささなどメリットも大きいので、
まだGoを学んでない方は、この機会にぜひ習得してみてください。

1. 概要を把握する

まずは簡単にBeatsの実装を知るために、Topbeatのソースコードを確認します。
https://github.com/elastic/beats/tree/master/topbeat

Topbeatの本体ソースコードは、4つしかありません。
各ソースの代表的な部分とともに紹介します。

1. /main.go (https://github.com/elastic/beats/blob/master/topbeat/main.go)
メイン関数です。実質的にはbeatを起動する処理を1行書くだけです。

func main() {
	beat.Run(Name, Version, topbeat.New())
}

2. /beat/config.go (https://github.com/elastic/beats/blob/master/topbeat/beat/config.go)
設定ファイルと1:1のパラメータを持つ構造体です。

type TopConfig struct {
	Period *int64
	Procs  *[]string
	Stats  struct {
		System     *bool `yaml:"system"`
		Proc       *bool `yaml:"process"`
		Filesystem *bool `yaml:"filesystem"`
		CpuPerCore *bool `yaml:"cpu_per_core"`
	}
}

3. /beat/sigar.go (https://github.com/elastic/beats/blob/master/topbeat/beat/sigar.go)
Topの情報を作成するための、gosigarのラッパー関数群です。詳細は割愛します。

4. /beat/topbeat.go (https://github.com/elastic/beats/blob/master/topbeat/beat/topbeat.go)
処理の本体です。一番重要な関数群です。

func (tb *Topbeat) Config(b *beat.Beat) error {
	// 略
}

func (tb *Topbeat) Setup(b *beat.Beat) error {
	// 略
}

func (t *Topbeat) Run(b *beat.Beat) error {
	// 略
}

func (tb *Topbeat) Cleanup(b *beat.Beat) error {
	return nil
}

func (t *Topbeat) Stop() {
	close(t.done)
}

この5つのメソッドを実装することで、beat本体となるBeaterを実装することができます。

参考: https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html#_developing_the_beat_specific_code


自分でbeatを作る際には、libbeatという共通ライブラリを利用することで実装量を抑えることができるため、
1、2、4に相当するコードだけを書けば良いわけです。

参考: https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html#_overview


要するに周辺作業はlibbeatがすべてやってくれるので、
個別のbeatはデータを取るところに注力すれば良いわけです。

2. libbeatのソースをgo get

よし作ってみよう、という気になったら、まずはlibbeatの取得をします。
参考: https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html#_getting_ready

公式ドキュメントの通りにtopbeatをgo getで取ってくれば、libbeatも一緒に取得できます。

go get github.com/elastic/topbeat

これで開発の準備ができました。

3. 設定ファイルと、設定の構造体を作る

まずは設定ファイルの作成と、その設定を読み込む処理を作成します。
参考: https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html#config-method


Jstatbeatでは「GC情報を取得する間隔(ミリ秒)」と、jpsを実行した時に取れる「プロセス名」の
2つを設定できるようにします。

設定ファイルはtopbeatのものをコピーして、設定部分だけ変えるのが良いでしょう。
https://github.com/elastic/beats/blob/master/topbeat/etc/topbeat.yml

/etc/jstatbeat/jstatbeat.yml

input:
  interval: 5000
  name: "Elasticsearch"

  # 略


そして、この設定ファイルに対応する構造体を作ります。

/beat/config.go

package beat

type JstatConfig struct {
	Interval *string `yaml:"interval"`
	Name     *string `yaml:"name"`
}

type ConfigSettings struct {
	Input JstatConfig
}


続いて、本体となるJstatbeatクラスのうち、設定ファイルを読み込むConfig関数を作成します。

/beat/jstatbeat.go

package beat

// import文は省略

type Jstatbeat struct {
	// from configuration
	interval string
	name     string // (1)

	// state
	pid     string
	isAlive bool
}

func (jsb *Jstatbeat) Config(b *beat.Beat) error { // (2)
	var config ConfigSettings
	err := cfgfile.Read(&config, "") // (3)
	if err != nil {
		logp.Err("Error reading configuration file: %v", err)
		return err
	}

	jsb.name = *config.Input.Name // (4)

	if config.Input.Interval != nil {
		jsb.interval = *config.Input.Interval
	} else {
		jsb.interval = "5000"
	}

	return nil
}


(1) Jstatbeat自身のフィールドとして interval と name を定義しておき
(2) Config関数の中で
(3) 設定ファイルを読み込んで
(4) 読み込んだ値を、フィールドに代入します。

これでConfig関数は完成です。
ね、簡単でしょう?

4. 起動時の処理を作る

続いて、起動時の処理を行うSetup関数を作成します。
起動時の処理では、jpsコマンドを実行して、監視対象となるJavaプロセスのプロセスIDを取得します。
参考: https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html#setup-method


/beat/jstatbeat.go

type Jstatbeat struct {
	// from configuration
	interval string
	name     string

	// state
	pid     string // (1)
	isAlive bool
}

func (jsb *Jstatbeat) Setup(b *beat.Beat) error { // (2)
	cmd := exec.Command("jps") // (3)

	stdout, err := cmd.StdoutPipe()
	if err != nil {
		logp.Err("Error get stdout pipe: %v", err)
		return err
	}

	cmd.Start()

	// TODO: handle error when 'jps' command cannot be executed.

	scanner := bufio.NewScanner(stdout)
	for scanner.Scan() {
		line := scanner.Text()
		items := strings.Split(line, " ")

		if len(items) == 2 && items[1] == jsb.name { // (4)
			jsb.pid = items[0]
			break
		}
	}
	cmd.Wait()

	if len(jsb.pid) == 0 {
		logp.Err("No target process: %v", jsb.name)
		return errors.New("No target process: " + jsb.name)
	}

	return nil
}

(1) Jstatbeatのフィールドとして pid を定義しておき
(2) Setup関数の中で
(3) jpsコマンドを実行して
(4) 設定で指定した name に一致するプロセスの pid を取得します

5. メインの処理を作る。

いよいよ、一番のメイン処理となるRun関数の実装です。
https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html#run-method


処理が少し長いので、いくつかのブロックに分けて説明します。

/beat/jstatbeat.go

func (jsb *Jstatbeat) Run(b *beat.Beat) error {
	jsb.isAlive = true

	cmd := exec.Command("jstat", "-gc", "-t", jsb.pid, jsb.interval) // (1)
	stdout, err := cmd.StdoutPipe()

	if err != nil {
		logp.Err("Error get stdout pipe: %v", err)
		return err
	}

	cmd.Start()
	defer killProcess(cmd) // (2)

	// 略

	return nil
}

func killProcess(cmd *exec.Cmd) {
	if cmd.Process != nil {
		err := cmd.Process.Kill()
		if err != nil {
			logp.Err("Error killing jstat process: %v", err)
		}
	}
}

(1) jstatコマンドに-gcオプションと-tオプションをつけて実行します。
また、pidやintervalは、それぞれSetupとConfigで取得したものを渡します。

(2) コマンドを実行した後は、エラーなどで処理が止まってしまって良いように
deferでプロセス停止の関数を呼ぶようにしておきます。


/beat/jstatbeat.go

var blanks = regexp.MustCompile(`\s+`)

func (jsb *Jstatbeat) Run(b *beat.Beat) error {
	jsb.isAlive = true

	// 略

	scanner := bufio.NewScanner(stdout)
	var keys []string
	var version string
	for jsb.isAlive && scanner.Scan() { // (3)
		line := scanner.Text()

		values := blanks.Split(line, -1) // (4)

		if len(values) > 2 && values[0] == "Timestamp" {
			keys = values // (5)

			if strings.Contains(line, "CCSC") { // (6)
				version = "java8"
			} else {
				version = "java5"
			}

			continue
		}

		// 略
	}

	return nil
}

(3) jstatの結果を一行ずつ読み出しながら
(4) 正規表現を使って、空白で値を分割しています。
(5) 1行目(見出し行)の場合は、分割した値をkeysとして保持しておきます。
(6) また念のため、jstatで取れた情報から、JVMJava5〜7なのか、Java8だったのかを判断しています。
この判断結果は、elasticsearchのtypeとして利用します。


/beat/jstatbeat.go

func (jsb *Jstatbeat) Run(b *beat.Beat) error {
	// 略

	for jsb.isAlive && scanner.Scan() {
		line := scanner.Text()

		values := blanks.Split(line, -1)

		// 略

		event := common.MapStr{ // (7)
			"@timestamp": common.Time(time.Now()),
			"type":       version,
		}

		for i, key := range keys { // (8)
			if len(key) == 0 {
				continue
			}
			event[key] = toFloat(values[i+1]) // (9)
		}
		b.Events.PublishEvent(event) // (10)
	}

	return nil
}

func toFloat(str string) float64 {
	value, err := strconv.ParseFloat(str, 64)

	if err != nil {
		logp.Err("Cannot parser to float. Ignore this value: %v", err)
		return 0
	}

	return value
}

(7) jstat実行結果の2行目以降に対して、行に対応する内容をMapとして作成し、
(8) すべての項目に対して
(9) 項目 = 値となるように、Mapに格納しています。また値はstringではなくfloat64に変換してから渡しています。
(10) そして、それをlibbeatのPublishEvent関数を使って送信します。


要はRun関数では、Mapを作ってPublishEventする、ということです。

Java5とJava8の両方に対応させるため、若干、処理がややこしくなっていますが、
簡単なbeatを作るだけなら、ホントに十数行で済むんじゃないかと思います。

6. 終了時の処理を作る

残るCleanup関数とStop関数は、まとめて紹介します。
https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html#cleanup-method
https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html#stop-method


Cleanup関数は処理が終わるとき(正常時、異常時とも)の処理を書きますが、
ここでは特にやることがないので、何の処理も書いていません。

Stop関数は、SIGTERMを受信した時の処理を書きます。
ここではフラグを落として、ループが終わるようにしています。

/beat/jstatbeat.go

type Jstatbeat struct {
	// from configuration
	interval string
	name     string

	// state
	pid     string
	isAlive bool // (1)
}

func (jsb *Jstatbeat) Run(b *beat.Beat) error {
	jsb.isAlive = true // (2)

	// 略

	for jsb.isAlive && scanner.Scan() { // (3)

	// 略
}

func (jsb *Jstatbeat) Cleanup(b *beat.Beat) error {
	return nil
}

func (jsb *Jstatbeat) Stop() {
	jsb.isAlive = false // (4)
}

(1) フィールドとして定義した isAlive を
(2) Run関数で処理の前に true にしておき
(3) ループ中は必ず確認するようにして
(4) Stop関数が呼ばれた際に false にします。
こうすることで、SIGTERM受信時には処理が中断できるようになります。

7. main関数の作成

最後にメイン関数の作成です。
Jstatbeatのインスタンスを作成して、それをlibbeatに渡すだけです。
参考: https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html#_the_main_function


/beat/jstatbeat.go

func New() *Jstatbeat { // (1)
	return &Jstatbeat{}
}

/main.go

package main

import (
	jsatbeat "./beat" // (2)
	"github.com/elastic/libbeat/beat"
)

func main() {
	beat.Run("jstatbeat", "0.1", &jsatbeat.Jstatbeat{}) // (3)
}

(1) go言語お作法に従って、Jstatbeatのインスタンスを作成するNew関数を用意しておきます。
(2) また、beatという名前空間がlibbeat被るため、jstatbeatというエイリアスをつけておき
(3) main関数では、実質1行で処理を書いてしまいます。
このRunの第一引数で渡した"jstatbeat"は、設定ファイル名や、Elasticsearchのindex名などでも
利用するため重要です。index名で使う都合上、名前は英語の小文字だけにしてください。


これで本体ソースコードは完成です。
ほら、簡単でしょう?

8. いざ実行!

main.go、beat/config.go、beat/jstatbeat.goの3つのソースコードと、
jstatbeat.ymlを /etc/jstatbeat/jstatbeat.yml におけば、いよいよ実行です。

go run main.go

これでエラーなく動作するようであれば、Elasticsearchで確認しましょう。

http://(Elasticsearchのアドレス):9200/_cat/indices にアクセスして
jstatbeat-yyyy.MM.dd なindexができていれば成功です。

動作しない場合には、elasticsearchのログか、
Macの場合は /var/log/system.log 辺りを見ると、エラーが出ている可能性があります。


これだけでも動作自体には問題ないのですが、index定義をjsonとして作っておいたほうがより親切です。
topbeatのtemplateを参考にすると良いです。
https://github.com/elastic/beats/blob/master/topbeat/etc/topbeat.template.json

9. ダッシュボード

最後に、kibanaでダッシュボードを作りましょう。
今回は、こんな感じのダッシュボードを作ってみました。

f:id:acro-engineer:20151220045735p:plain

なお手動でダッシュボードを作ったあと、dashboardやvisualizationを
kibana上でexportしておくと、別の環境でもimportすることができます。

なのでexportしたものを配布物と一緒に提供するのが良いかと思います。

公開しました。

ということで、意外とさっくりとbeatを作れることが分かってもらえたかと思います。

今回は簡単にjstatを実行しただけなので、言ってしまえば別に、
jstatをファイルに出力してfilebeatやlogstashで送ってしまえばいいって話になります。

もう少し役立つものにするためには、複数プロセスや複数サーバに対応するとか
あるいはJMX経由でもう少し情報を取ってくるとか、そういうことが必要かと思います。
ただ逆に、そういうものを集める方法を統一するために、Beatsがピタっとハマりそうです。


冒頭にも書きましたが、今回作成したコードは、Githubで公開しています。
https://github.com/cero-t/jstatbeat

また、Windows/Mac/Linuxのそれぞれ向けのバイナリや
ダッシュボードの設定ファイルも含んだファイルをリリースしています。
https://github.com/cero-t/jstatbeat/releases

よかったら参考にしてください。


それでは、
Stay beats, See you!


Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、AWSHadoop、Storm、NoSQL、SpringBoot、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ