3月1日にAdvanced Tech Night No.4 BigDataの次のSTEP リアルタイム分散処理のイマドキを開催しました。
その際のデモのコードを説明します。
尚、どんなデモだったかについては下記のSlideを参照してください。
上記のデモに使用したコードを説明します。
尚、ソースコードは StormSample から取得してください。
まず、デモで使用したTopologyのソースは上記より、4つのソースからなります。
1.TwitterSingleSpout
2.HashTagFilterBolt
3.PrinterBolt
4.HashTagDetectTopology(トポロジ起動)
各々について説明します。
1.TwitterSingleSpout
TwitterStreamから検索条件を指定してTweetを取得するSpoutです。
実質的な実装箇所は下記の通りですね。
■TwitterSingleSpout.java
@Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { statusQueue_ = new LinkedBlockingQueue<Status>(10000); outputCollector_ = collector; // ★TwitterStreamのリスナーを設定 StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { // ★取得したTweetをQueueにつめる statusQueue_.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) { } @Override public void onTrackLimitationNotice(int i) { } @Override public void onScrubGeo(long l, long l1) { } @Override public void onException(Exception e) { } }; TwitterStreamFactory fact = new TwitterStreamFactory( new ConfigurationBuilder().setUser(userid_) .setPassword(password_).build()); twitterStream_ = fact.getInstance(); twitterStream_.addListener(listener); // ★検索条件を指定 FilterQuery query = new FilterQuery(); query.track(this.filterTrack_); twitterStream_.filter(query); } @Override public void nextTuple() { // ★TweetがQueueに存在した場合取得してTopologyに流す // ★存在しなければ50ms待つ。 Status ret = statusQueue_.poll(); if (ret == null) { Utils.sleep(50); } else { outputCollector_.emit(new Values(ret)); } }
2.HashTagFilterBolt
HashTagを含まないツイートを除去するBoltです。
併せて、HashTagを含むツイートの場合はHashTagごとにメッセージを分割しています。
■HashTagFilterBolt.java
public void execute(Tuple tuple) { Status status = (Status) tuple.getValue(0); // ★Tweetからハッシュタグを取得 HashtagEntity hashTags[] = status.getHashtagEntities(); // ★ハッシュタグ毎にメッセージを分割 // ★ハッシュタグを小文字に変換し、Topologyに流す if (hashTags != null && hashTags.length >= 1) { for (HashtagEntity hashtag : hashTags) { collector_.emit(tuple, new Values(hashtag.getText() .toLowerCase(), status)); } } // ★メッセージを処理したことをTopologyに通知 collector_.ack(tuple); }
3.PrinterBolt
HashTagを判定し、出力するBoltです。
尚、各Taskで標準出力に出力した場合、Taskが所属するWorkerのログに出力されます。
■PrinterBolt.java
public void execute(Tuple tuple, BasicOutputCollector collector) { String hashtag = (String) tuple.getValue(0); Status status = (Status) tuple.getValue(1); if (hashtag.contains(this.target_)) { System.out.println("CollectTag! User:" + status.getUser().getScreenName() + " Tag:" + hashtag + " " + status.getText()); } else { System.out.println("InvalidTag! User:" + status.getUser().getScreenName() + " Tag:" + hashtag + " " + status.getText()); } }
4.HashTagDetectTopology
起動引数を取得し、Topologyを起動するクラスです。
トポロジ名の指定があればStormクラスタで、なければLocalクラスタで動作します。
/** * トポロジを起動する。 * * 第1引数 : SpoutからTweet取得時のフィルタ * 第2引数 : Boltで判定する際のハッシュタグ * 第3引数 : TwitterUserid * 第4引数 : TwitterPassword * 第5引数 : トポロジ名(未入力の場合、ローカルクラスタで起動) * * @param args 起動引数 * @throws Exception 起動失敗時 */ public static void main(String[] args) throws Exception { String spoutFilterkeyword = args[0]; String boltFilterKeyword = args[1]; String userid = args[2]; String password = args[3]; String topologyName = null; if (5 <= args.length) { topologyName = args[4]; } TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter", new TwitterSingleSpout(userid, password, new String[] { spoutFilterkeyword })); builder.setBolt("hashtagfilter", new HashTagFilterBolt(), 5) .shuffleGrouping("twitter"); builder.setBolt("print", new PrinterBolt(boltFilterKeyword), 5) .fieldsGrouping("hashtagfilter", new Fields("hashtag")); StormTopology topology = builder.createTopology(); // ★トポロジ名の指定があればStormクラスタで動作 // ★ない場合はLocalクラスタで動作 if (topologyName != null) { Config conf = new Config(); conf.setDebug(false); conf.setNumWorkers(1); // ★Stormクラスタ(Nimbus)のアドレスを指定 conf.put(Config.NIMBUS_HOST, "192.168.2.101"); conf.put(Config.NIMBUS_THRIFT_PORT, 6627); StormSubmitter.submitTopology(topologyName, conf, topology); } else { Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, topology); // ★動作するのは100秒間 Utils.sleep(100000); cluster.shutdown(); } }
・・・と、大体これだけで常時動作するリアルタイム処理が作れます。
かつ、何かややこしい定義があるわけでもなく、
特定のことしかできないわけでもありません。
尚、Stormクラスタで動作させる際にはJarファイルもStormクラスタにアップする必要があります。
その際の手順は前回の記事を参考にしてください。
Localモードであれば手軽に動かすことができるため、色々試してみるのがお勧めです。
それでは。
関連ページ:
Twitter Stormクラスタを構築します(その2