読者です 読者をやめる 読者になる 読者になる

Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Stormデモでのコードを説明します

Storm ATN


こんにちは。kimukimuです。

3月1日にAdvanced Tech Night No.4 BigDataの次のSTEP リアルタイム分散処理のイマドキを開催しました。
その際のデモのコードを説明します。

尚、どんなデモだったかについては下記のSlideを参照してください。

上記のデモに使用したコードを説明します。

尚、ソースコードは StormSample から取得してください。

まず、デモで使用したTopologyのソースは上記より、4つのソースからなります。

1.TwitterSingleSpout
2.HashTagFilterBolt
3.PrinterBolt
4.HashTagDetectTopology(トポロジ起動)

各々について説明します。

1.TwitterSingleSpout

TwitterStreamから検索条件を指定してTweetを取得するSpoutです。
実質的な実装箇所は下記の通りですね。

■TwitterSingleSpout.java

@Override
public void open(Map conf, TopologyContext context,
    SpoutOutputCollector collector) {
  statusQueue_ = new LinkedBlockingQueue<Status>(10000);
  outputCollector_ = collector;

  // ★TwitterStreamのリスナーを設定
  StatusListener listener = new StatusListener() {

    @Override
    public void onStatus(Status status) {
      // ★取得したTweetをQueueにつめる
      statusQueue_.offer(status);
    }

    @Override
    public void onDeletionNotice(StatusDeletionNotice sdn) {
    }

    @Override
    public void onTrackLimitationNotice(int i) {
    }

    @Override
    public void onScrubGeo(long l, long l1) {
    }

    @Override
    public void onException(Exception e) {
    }

  };
  TwitterStreamFactory fact = new TwitterStreamFactory(
      new ConfigurationBuilder().setUser(userid_)
          .setPassword(password_).build());
  twitterStream_ = fact.getInstance();
  twitterStream_.addListener(listener);

  // ★検索条件を指定
  FilterQuery query = new FilterQuery();
  query.track(this.filterTrack_);
  twitterStream_.filter(query);
}

@Override
public void nextTuple() {

  // ★TweetがQueueに存在した場合取得してTopologyに流す
 // ★存在しなければ50ms待つ。
  Status ret = statusQueue_.poll();
  if (ret == null) {
    Utils.sleep(50);
  } else {
    outputCollector_.emit(new Values(ret));
  }
}
2.HashTagFilterBolt

HashTagを含まないツイートを除去するBoltです。
併せて、HashTagを含むツイートの場合はHashTagごとにメッセージを分割しています。

■HashTagFilterBolt.java

public void execute(Tuple tuple) {
  Status status = (Status) tuple.getValue(0);

  // ★Tweetからハッシュタグを取得
  HashtagEntity hashTags[] = status.getHashtagEntities();

  // ★ハッシュタグ毎にメッセージを分割
  // ★ハッシュタグを小文字に変換し、Topologyに流す
  if (hashTags != null && hashTags.length >= 1) {

    for (HashtagEntity hashtag : hashTags) {
      collector_.emit(tuple, new Values(hashtag.getText()
          .toLowerCase(), status));
    }
  }

  // ★メッセージを処理したことをTopologyに通知
  collector_.ack(tuple);
}
3.PrinterBolt

HashTagを判定し、出力するBoltです。
尚、各Taskで標準出力に出力した場合、Taskが所属するWorkerのログに出力されます。

■PrinterBolt.java

public void execute(Tuple tuple, BasicOutputCollector collector) {

  String hashtag = (String) tuple.getValue(0);
  Status status = (Status) tuple.getValue(1);

  if (hashtag.contains(this.target_)) {
    System.out.println("CollectTag! User:"
        + status.getUser().getScreenName() + " Tag:" + hashtag
        + " " + status.getText());
  } else {
    System.out.println("InvalidTag! User:"
        + status.getUser().getScreenName() + " Tag:" + hashtag
        + " " + status.getText());
  }
}
4.HashTagDetectTopology

起動引数を取得し、Topologyを起動するクラスです。
トポロジ名の指定があればStormクラスタで、なければLocalクラスタで動作します。

/**
 * トポロジを起動する。
 * 
 * 第1引数 : SpoutからTweet取得時のフィルタ
 * 第2引数 : Boltで判定する際のハッシュタグ
 * 第3引数 : TwitterUserid
 * 第4引数 : TwitterPassword
 * 第5引数 : トポロジ名(未入力の場合、ローカルクラスタで起動)
 * 
 * @param args 起動引数
 * @throws Exception 起動失敗時
 */
public static void main(String[] args) throws Exception {
  String spoutFilterkeyword = args[0];
  String boltFilterKeyword = args[1];
  String userid = args[2];
  String password = args[3];

  String topologyName = null;
  if (5 <= args.length) {
    topologyName = args[4];
  }

  TopologyBuilder builder = new TopologyBuilder();

  builder.setSpout("twitter", new TwitterSingleSpout(userid, password,
      new String[] { spoutFilterkeyword }));
  builder.setBolt("hashtagfilter", new HashTagFilterBolt(), 5)
      .shuffleGrouping("twitter");

  builder.setBolt("print", new PrinterBolt(boltFilterKeyword), 5)
      .fieldsGrouping("hashtagfilter", new Fields("hashtag"));

  StormTopology topology = builder.createTopology();

  // ★トポロジ名の指定があればStormクラスタで動作
  // ★ない場合はLocalクラスタで動作
  if (topologyName != null) {
    Config conf = new Config();
    conf.setDebug(false);
    conf.setNumWorkers(1);

    // ★Stormクラスタ(Nimbus)のアドレスを指定
    conf.put(Config.NIMBUS_HOST, "192.168.2.101");
    conf.put(Config.NIMBUS_THRIFT_PORT, 6627);

    StormSubmitter.submitTopology(topologyName, conf, topology);

  } else {
    Config conf = new Config();

    LocalCluster cluster = new LocalCluster();

    cluster.submitTopology("test", conf, topology);

    // ★動作するのは100秒間
    Utils.sleep(100000);
    cluster.shutdown();
  }
}

・・・と、大体これだけで常時動作するリアルタイム処理が作れます。
かつ、何かややこしい定義があるわけでもなく、
特定のことしかできないわけでもありません。

尚、Stormクラスタで動作させる際にはJarファイルもStormクラスタにアップする必要があります。
その際の手順は前回の記事を参考にしてください。

Localモードであれば手軽に動かすことができるため、色々試してみるのがお勧めです。
それでは。

関連ページ:
Twitter Stormクラスタを構築します(その2

広告を非表示にする