Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Stormデモでのコードを説明します


こんにちは。kimukimuです。

3月1日にAdvanced Tech Night No.4 BigDataの次のSTEP リアルタイム分散処理のイマドキを開催しました。
その際のデモのコードを説明します。

尚、どんなデモだったかについては下記のSlideを参照してください。

上記のデモに使用したコードを説明します。

尚、ソースコードは StormSample から取得してください。

まず、デモで使用したTopologyのソースは上記より、4つのソースからなります。

1.TwitterSingleSpout
2.HashTagFilterBolt
3.PrinterBolt
4.HashTagDetectTopology(トポロジ起動)

各々について説明します。

1.TwitterSingleSpout

TwitterStreamから検索条件を指定してTweetを取得するSpoutです。
実質的な実装箇所は下記の通りですね。

■TwitterSingleSpout.java

@Override
public void open(Map conf, TopologyContext context,
    SpoutOutputCollector collector) {
  statusQueue_ = new LinkedBlockingQueue<Status>(10000);
  outputCollector_ = collector;

  // ★TwitterStreamのリスナーを設定
  StatusListener listener = new StatusListener() {

    @Override
    public void onStatus(Status status) {
      // ★取得したTweetをQueueにつめる
      statusQueue_.offer(status);
    }

    @Override
    public void onDeletionNotice(StatusDeletionNotice sdn) {
    }

    @Override
    public void onTrackLimitationNotice(int i) {
    }

    @Override
    public void onScrubGeo(long l, long l1) {
    }

    @Override
    public void onException(Exception e) {
    }

  };
  TwitterStreamFactory fact = new TwitterStreamFactory(
      new ConfigurationBuilder().setUser(userid_)
          .setPassword(password_).build());
  twitterStream_ = fact.getInstance();
  twitterStream_.addListener(listener);

  // ★検索条件を指定
  FilterQuery query = new FilterQuery();
  query.track(this.filterTrack_);
  twitterStream_.filter(query);
}

@Override
public void nextTuple() {

  // ★TweetがQueueに存在した場合取得してTopologyに流す
 // ★存在しなければ50ms待つ。
  Status ret = statusQueue_.poll();
  if (ret == null) {
    Utils.sleep(50);
  } else {
    outputCollector_.emit(new Values(ret));
  }
}
2.HashTagFilterBolt

HashTagを含まないツイートを除去するBoltです。
併せて、HashTagを含むツイートの場合はHashTagごとにメッセージを分割しています。

■HashTagFilterBolt.java

public void execute(Tuple tuple) {
  Status status = (Status) tuple.getValue(0);

  // ★Tweetからハッシュタグを取得
  HashtagEntity hashTags[] = status.getHashtagEntities();

  // ★ハッシュタグ毎にメッセージを分割
  // ★ハッシュタグを小文字に変換し、Topologyに流す
  if (hashTags != null && hashTags.length >= 1) {

    for (HashtagEntity hashtag : hashTags) {
      collector_.emit(tuple, new Values(hashtag.getText()
          .toLowerCase(), status));
    }
  }

  // ★メッセージを処理したことをTopologyに通知
  collector_.ack(tuple);
}
3.PrinterBolt

HashTagを判定し、出力するBoltです。
尚、各Taskで標準出力に出力した場合、Taskが所属するWorkerのログに出力されます。

■PrinterBolt.java

public void execute(Tuple tuple, BasicOutputCollector collector) {

  String hashtag = (String) tuple.getValue(0);
  Status status = (Status) tuple.getValue(1);

  if (hashtag.contains(this.target_)) {
    System.out.println("CollectTag! User:"
        + status.getUser().getScreenName() + " Tag:" + hashtag
        + " " + status.getText());
  } else {
    System.out.println("InvalidTag! User:"
        + status.getUser().getScreenName() + " Tag:" + hashtag
        + " " + status.getText());
  }
}
4.HashTagDetectTopology

起動引数を取得し、Topologyを起動するクラスです。
トポロジ名の指定があればStormクラスタで、なければLocalクラスタで動作します。

/**
 * トポロジを起動する。
 * 
 * 第1引数 : SpoutからTweet取得時のフィルタ
 * 第2引数 : Boltで判定する際のハッシュタグ
 * 第3引数 : TwitterUserid
 * 第4引数 : TwitterPassword
 * 第5引数 : トポロジ名(未入力の場合、ローカルクラスタで起動)
 * 
 * @param args 起動引数
 * @throws Exception 起動失敗時
 */
public static void main(String[] args) throws Exception {
  String spoutFilterkeyword = args[0];
  String boltFilterKeyword = args[1];
  String userid = args[2];
  String password = args[3];

  String topologyName = null;
  if (5 <= args.length) {
    topologyName = args[4];
  }

  TopologyBuilder builder = new TopologyBuilder();

  builder.setSpout("twitter", new TwitterSingleSpout(userid, password,
      new String[] { spoutFilterkeyword }));
  builder.setBolt("hashtagfilter", new HashTagFilterBolt(), 5)
      .shuffleGrouping("twitter");

  builder.setBolt("print", new PrinterBolt(boltFilterKeyword), 5)
      .fieldsGrouping("hashtagfilter", new Fields("hashtag"));

  StormTopology topology = builder.createTopology();

  // ★トポロジ名の指定があればStormクラスタで動作
  // ★ない場合はLocalクラスタで動作
  if (topologyName != null) {
    Config conf = new Config();
    conf.setDebug(false);
    conf.setNumWorkers(1);

    // ★Stormクラスタ(Nimbus)のアドレスを指定
    conf.put(Config.NIMBUS_HOST, "192.168.2.101");
    conf.put(Config.NIMBUS_THRIFT_PORT, 6627);

    StormSubmitter.submitTopology(topologyName, conf, topology);

  } else {
    Config conf = new Config();

    LocalCluster cluster = new LocalCluster();

    cluster.submitTopology("test", conf, topology);

    // ★動作するのは100秒間
    Utils.sleep(100000);
    cluster.shutdown();
  }
}

・・・と、大体これだけで常時動作するリアルタイム処理が作れます。
かつ、何かややこしい定義があるわけでもなく、
特定のことしかできないわけでもありません。

尚、Stormクラスタで動作させる際にはJarファイルもStormクラスタにアップする必要があります。
その際の手順は前回の記事を参考にしてください。

Localモードであれば手軽に動かすことができるため、色々試してみるのがお勧めです。
それでは。

関連ページ:
Twitter Stormクラスタを構築します(その2