Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Amazon EventBridge Pipes の処理時間を確認してみた

最近、会社の人からプレゼントしてもらったビールグラスでビールを飲むのにはまっている、クラウドエンジニアのコガです。
ツマミも増えておなかの成長が止まりません。

この記事では、昨年の AWS re:Invent 2022 で発表された Amazon EventBridge Pipes について性能を確認した結果をまとめました。
Amazon EventBridge Pipes を利用することで簡単にサービス同士を結合することができ、
よりシンプルかつ短時間でのイベント駆動アプリケーション作成が可能になります。

Amazon EventBridge Pipes 公式ページより

一般的に、イベント駆動アプリケーションを実際に運用する場合、各イベントハンドラでの処理時間やイベントのトレーシングが重要です。
そこで今回は、Amazon EventBridge Pipes を利用して Amazon SQS と AWS Lambda を結合し、イベントのフィルタリング、エンリッチメント(情報付加)を行った際の処理時間の計測、AWS X-Ray との連携を試してみました。

Amazon EventBridge Pipes とは

イベントプロデューサーとコンシューマー間の統合を、簡単に作成するサービスです。
aws.amazon.com

通常、サービス同士を結合するためには結合のためのコード(グルーコード)を書く必要がありますが、
Amazon EventBridge Pipes を使えばグルーコードを書く必要がなくなり、より簡単にサービス同士を結合することができるようになります。

Amazon EventBridge Pipes は以下の4つの要素から成ります。

要素 説明
ソース(Source) イベントデータを受信する。
Amazon DynamoDB stream, Amazon Kinesis stream, Amazon SQS queue などをソースとして指定可能。
フィルタリング(Filtering) ソースからくるイベントをフィルタリングして、それらのイベントのサブセットのみを処理する。
エンリッチメント(Enrichment) ソースからのデータをターゲットに送信する前に、イベントの情報付加や変換、拡張を行う。
ターゲット(Target) パイプによって届けられるイベントを処理する。

検証の概要

Amazon SQS(ソース) → フィルタリング → AWS Lambda(ターゲット) の構成で作成したパイプ(SQSLambdaPipe)と、
SQSLambdaPipe の設定に加え、 エンリッチメントとして AWS Lambda を設定したパイプ(SQSLambdaEnrichmentPipe)を検証に使用します。
今回は、Amazon SQS に対してメッセージを送信する AWS Lambda を経由してパイプを動かし、フィルタリング処理にかかる時間を計測しました。


SQSLambdaPipe


SQSLambdaEnrichmentPipe

準備

ソースとして設定するキューを作成する

以下の手順を元に、SampleQue というスタンダードキューを作成します(設定はすべてデフォルト)。
docs.aws.amazon.com

Amazon SQS にメッセージを送る AWS Lambda の実装

SendToSQS
event から受け取ったメッセージを指定の Amazon SQS に送信する AWS Lambda を作成します。

import boto3
from logging import getLogger, INFO

from aws_xray_sdk.core import patch_all

QUEUE_URL = 'https://sqs.{region}.amazonaws.com/{account_id}/SampleQue'

patch_all() 

logger = getLogger()
logger.setLevel(INFO)

def lambda_handler(event, context):
    client = boto3.client('sqs')

    message = event['message']
    logger.info('Start sending message')

    response = client.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody='sqs test'
    )

エンリッチメント設定する AWS Lambda の実装

Enrichment
イベントの情報付加や変換、拡張を行う AWS Lambda を作成します。

from logging import getLogger, INFO

from aws_xray_sdk.core import patch_all

patch_all() 

logger = getLogger()
logger.setLevel(INFO)

def lambda_handler(event, context):
    message = event[0]['body']
    logger.info('Message was reached to enrichment.: message=%s' % message)
    
    enriched_message = f'{message} enriched.'
    event[0]['body'] = enriched_message
    
    return event

ターゲットに設定する AWS Lambda の実装

PipeTarget
メッセージを受け取るターゲットに設定する AWS Lambda を作成します。

from logging import getLogger, INFO

from aws_xray_sdk.core import patch_all

patch_all() 

logger = getLogger()
logger.setLevel(INFO)

def lambda_handler(event, context):
    message = event[0]['body']
    logger.info('Message was reached to target.: message=%s' % message)

EventBridge Pipes でのパイプの作成

SQSLambdaPipe, SQSLambdaEnrichmentPipe の二つのパイプを作成します。
二つのパイプの違いは「エンリッチメントの設定」を行うかどうかのみです。

パイプの作成

Amazon EventBridge のコンソールを開き、サイドバーから「パイプ」を選択します。
その後、「パイプの作成」を押下します。


パイプ名を入力し、「パイプを作成」を押下します。

ソースの設定

ソースに SQS を選択し、SQS キューに上で作成した SampleQue を選択します。

フィルタリングの設定

イベントパターンにて、プレフィックスマッチングを選択し、以下の JSON を記入します。
今回は、メッセージのプレフィックスに "sqs" という文字列が入っているときのみ、その後の処理を実行するよう設定しました。

{
  "body": [{
    "prefix": "sqs"
  }]
}

エンリッチメントの設定

AWS Lambda を選択し、機能に上で作成した Enrichment を選択します。
※設定はSQSLambdaEnrichmentPipeのみ

ターゲットの設定

ターゲットサービスに AWS Lambda を選択し、機能に上で作成した PipeTarget を選択します。

パイプの動作

SQSLambdaPipe の動作

フィルタリング処理を通過

①上で作成しした AWS Lambda SendToSQS にて、event に以下の JSON を設定して実行します。

{
  "message": "sqs test"
}

②CloudWatch Logs ログイベントにて、AWS Lambda PipeTarget の実行ログが記録されていることを確認します。
今回は、"Message was reached to target.: message=sqs test" というログが確認できたので、パイプは正常に動いているようです。

フィルタリング処理で除外

①上で作成しした AWS Lambda SendToSQS にて、event に以下の JSON を設定して実行します。
 ※プレフィックスに "sqs" を含まない

{
  "message": "test"
}

②CloudWatch Logs ログイベントにて、AWS Lambda PipeTarget の実行ログが記録されていないことを確認します。
 実行ログが記録されていなければ、フィルタリング処理が正常に動作し、メッセージがターゲットまで届かなかったことが確認できました。

SQSLambdaEnrichmentPipe の動作

①上で作成しした AWS Lambda SendToSQS にて、event に以下の JSON を設定して実行します。

{
  "message": "sqs test"
}

②CloudWatch Logs ログイベントにて、AWS Lambda PipeTarget の実行ログが記録されていることを確認します。
今回は、"Message was reached to target.: message=sqs test enriched." というログが確認できたので、
エンリッチメントの処理も含めてパイプは正常に動いているようです。

処理時間を検証

SQSLambdaPipe, SQSLambdaEnrichmentPipe それぞれにおいて、
Amazon SQS にメッセージを送る AWS Lambda(SendToSQS) がメッセージを送信する直前から、
ターゲットに設定する AWS Lambda(PipeTarget) がメッセージを受信するまでの経過時間を計測します。
それぞれのパイプに対して、5回メッセージを送った際の平均を計測しました。
※今回は、AWS Lambda のコールドスタートが発生した処理は計測対象から外しています。

パイプ名 処理時間平均(秒)
SQSLambdaPipe
Lambda(メッセージ送信) → SQS → フィルタリング → Lambda(ターゲット)
0.238
SQSLambdaEnrichmentPipe
Lambda(メッセージ送信) → SQS → フィルタリング → Lambda(エンリッチメント) → Lambda(ターゲット)
0.268

それほど大きなタイムラグなく、フィルタリング、エンリッチメントが実行できているようです。

AWS X-Ray との連携

EventBridge Pipes のコンソールからは、現状 AWS X-Ray の設定はできないようなので、
各 Lambda にて X-Ray SDK を使用して AWS X-Ray の設定を行いました。
AWS Lambda の以下の部分が、X-Ray との連携用のコードになります。
X-Ray SDK は標準では組み込まれていないので、今回は Lambda Layers を使用して読み込むよう設定しました(設定方法の詳細は割愛)。

from aws_xray_sdk.core import patch_all

patch_all() 

Service Map の結果は以下のようになりました。
※残念ながら、現状 EventBridge PipesAWS X-Ray とは連携していないようなので、処理全体の流れをサービスマップにて表示することはできませんでした。

Amazon SQS での処理に 平均 0.160 秒かかっているようです。
さきほどの結果を踏まえると、フィルタリング処理にかかっている時間(コールドスタート発生時を除く)は、約0.08秒ほどとなり、
大きなタイムラグなくフィルタリング処理できているようです。

まとめ

Amazon EventBridge Pipes を利用することでソースとターゲットの結合や、フィルタリング、エンリッチメントが
とても簡単に実現できることがわかりました。

フィルタリングやエンリッチメントの処理も大きなタイムラグなく実行でき、
実際のイベント駆動アプリケーション構築の際にも利用できそうです。

ただ、現状は作成したパイプの定義をエクスポートすることができないので、
コンソールからパイプを構築した場合、IaCとして、定義を構成管理するのが難しそうです。
また、運用するうえでは AWS X-Ray との連携が簡単に設定できるようになるとより便利になりそうです。
この辺りは、さらなるバージョンアップを期待したいですね。それでは。

Acroquest Technologyでは、キャリア採用を行っています。
  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • Elasticsearch等を使ったデータ収集/分析/可視化
  • マイクロサービス、DevOps、最新のOSSクラウドサービスを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
  少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。 www.wantedly.com

Sentence BERTをFine TuningしてFAQを類似文書検索してみる

こんにちは。社内データサイエンスチームYAMALEXの@Ssk1029Takashiです。
最近はRTX4090のマシンを買って電気代が上がってきています。

昨今NLP界隈では事前学習モデルが出てからは、検索というのもキーワードでの検索だけではなく、文章を入力にして似たような文章を探す類似文書検索も使われるようになりました。
そんな中で、今回はFAQを対象にした類似文書検索をSentence BERTを使って試してみます。

FAQでよくある困りごと

FAQはあらゆる場面で重要な情報源ですが、いまいち検索がしづらい情報でもあります。
FAQを利用しようとするときはたいていは何か困ったことがあった時に調べに行くものですが、ユーザーが困っている内容をドキュメントと同じような単語で表現するのはときに難しいです。
例えば、何かシステムがあってログインできないという症状を説明するだけでも、「認証が通らない」とか「勝手にサインアウトする」など多様な説明ができてしまいます。

このようなことを解決するために、ユーザーの入力の意味を解釈して、近しい回答を出してくれる検索がFAQ検索では有効になります。

今回やること

今回は以下のステップに沿って進んでいきます。

  1. 既存のFAQデータセットを使ってSentence BERTをFine Tuning
  2. Fine TuningしたBERTを使ってFAQの回答をベクトル化
  3. 同じくFine TuningしたBERTを使って質問文をベクトル化して回答ベクトルを検索

まずは既存のFAQデータセットを使ってSentence BERTをFine Tuningします。
詳細は手法は後述しますが、FAQデータセットはQとAはセットで与えられるので、その組み合わせを類似した文章とみなして学習させます。

そのあと、学習したBERTを使用して、回答文と質問文の文章ベクトルを作成して、コサイン距離を測り類似文章を探します。

この手法で試したいのは、質問文同士の検索ではなく、直接質問文のベクトルから回答文のベクトルが探れるのかというポイントになります。
そのために、質問文同士をペアにするのではなく、質問文と回答文を類似文章としてペアにしてSentence BERTを学習してみます。

Sentence BERTとは

検証に入る前にSentence BERTについて簡単に説明します。
Sentence BERTとは、BERTをベースにより精度の高い文章の埋め込み表現を取得できるようFine Tuningする手法になります。

Fine Tuning方法の詳細は省きますが、簡単に言うと、類似する一組の文章からそれぞれ生成される文章ベクトルが近くなるように学習するという内容になります。

https://arxiv.org/pdf/1908.10084.pdfより引用

詳細が気になる方はぜひ読みやすいので元論文も読んでみてください。
arxiv.org

検証

それでは実際に検証してみましょう。

FAQデータセットから文章ベクトルを学習する

それでは実際に文章ベクトルを学習してみましょう。
今回使用するデータセットはJapanese FAQ dataset for e-learning systemというe-ラーニングシステムの日本語FAQをデータセットにしたものです。
データセットの内容は質問文が427個とその質問文に対する回答が全部で79個与えられているので、サイズとしては比較的小さいデータセットになります。
また、質問・回答のペアにはそれぞれ「資料」「課題」などのカテゴリが振られています。
zenodo.org


Sentence BERTはロス関数に何を使うかで入力が変わります。
例えば、Cosine Similarity Lossを使用する場合は、文章のペアとその文章の類似度が入力で与えられる必要があります。
今回はロス関数にはTriplet Lossを使用します。

Triplet Lossとは、簡単に言うと近しいものを近づけて、遠いものを遠ざけるように計算するロス関数になります。
詳細は以下のページが詳しく載っているのでぜひ読んでみてください。
qiita.com

Triplet Lossを計算する際には、Sentence BERTには入力として以下の三つを与える必要があります。

  • anchor: サンプルとなる文章。今回は質問文を与えます。
  • positive: anchorと意味的に近しい文章。今回はanchorに対する回答文を与えます。
  • negative: anchorと意味的に遠い文章。今回はanchorと異なるカテゴリの回答文を与えます。

本当はnegativeには同一のカテゴリの文章を与えたほうが、hard negative、つまり識別しづらい負例になり学習に有用なのですが、同一カテゴリ内にデータのバリエーションが少ないので今回は異なるカテゴリのデータを使用します。

上記の形式に合わせて以下のようなデータを用意します。

ここから一行を一つの組として学習していきます。
今回ライブラリにsentence-transformersを使っていきます。
sentence-transformersは文字通りSentence BERTを使って学習・推論するためのライブラリです。
huggingfaceに登録されている事前学習モデルを使用できるなどが便利な点です。
www.sbert.net

まずは、学習するモデルを定義します。
使用するモデルは汎用日本語BERTでもよいのですが、せっかくなので今回はhuggingfaceに登録されている日本語用のsentence bertの事前学習モデルを使用してみます。
huggingface.co

事前学習モデルをそのまま使用する場合は、文章ベクトルを取得するためにPooling層を追加する必要があります。
以下のようなコードになります。

from sentence_transformers import SentenceTransformer, SentencesDataset, InputExample, losses, models

bert = models.Transformer('sonoisa/sentence-bert-base-ja-mean-tokens-v2')
pooling = models.Pooling(bert.get_word_embedding_dimension())
model = SentenceTransformer(modules=[bert, pooling])

次にTriplet Lossで学習するために、dataset, dataloaderを定義して、model.fit()の入力にします。

from torch.utils.data import DataLoader

train_dataset = SentencesDataset([InputExample(texts=[row["anchor"], row["positive"], row["negative"]]) for index,row in datasets_df.iterrows()], model)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=8)
train_loss = losses.TripletLoss(model=model)

model.fit(
        train_objectives=[(train_dataloader, train_loss)],
        epochs=5,
        evaluation_steps=1,
        warmup_steps=1,
        output_path="./sbert",
        )

今回epochsなどのハイパーパラメータは適当に決めているので、データ数などを見て調整してください。
これで事前学習モデルにFAQの回答と質問の類似度を学習させることができました。

回答文から文章ベクトルを生成する

次は回答文を文章ベクトルに変換します。
通常ベクトル検索にはFAISSやValdなどのベクトル検索ライブラリを使うのが一般的ですが、今回は対象データも少なくまずは簡単に試したいのでリストで持って逐次にコサイン類似度を計算することにします。

# 回答文書の一覧を取得
answers = list(datasets_df["positive"][~datasets_df["positive"].duplicated()])
# ベクトルに変換
corpus_embeddings = model.encode(answers, convert_to_tensor=True)

質問文から近しい回答を見つける

最後に質問文もベクトルに変換して、回答文のベクトルとのコサイン類似度を計算して近しい文章を取得します。

from sentence_transformers import util

query = "生徒登録がしたい"

query_embedding = model.encode(query, convert_to_tensor=True)
cos_scores = util.cos_sim(query_embedding , corpus_embeddings)
top_results =  torch.topk(cos_scores, k=3)

print("\n\n======================\n\n")
print("Query:", query)
print("\nTop 3 most similar sentences in corpus:")

for score, idx in zip(top_results[0][0], top_results[1][0]):
    print(answers[idx], "(Score: {:.4f})\n".format(score))

すると以下のような結果が得られます。

======================


Query: 生徒登録がしたい

Top 3 most similar sentences in corpus:
履修申請した受講生は「授業情報」機能の履修状態が「本登録」と表示されます。一方、教員が受講生を手動登録、あるいは学生が自己登録することでコースに登録された場合「仮登録」と表示されます。仮登録でもkibacoのコースを利用することはできますが、別途履修申請しなければ単位を取得できません。なお、以下の場合、受講生は仮登録のままとなります。1. 科目等履修生や研究生などの非正規学生。表示を本登録へ変更する必要がある場合は、教務課や国際課などの事務あるいは授業担当の先生から システム管理室2(e-learning-ml●ml.tmu.ac.jp、●をアットマークに変えてください) へ連絡してください。2. 事務情報システムに登録されていないコースをkibaco上で作成し利用している場合。学生をTA/チューターとして登録する場合には、「授業情報」機能の「受講生・TAを登録」で学生を登録する際に「TA/チューター」権限を与えてください。詳細は授業担当者向けマニュアルの31~33ページをご覧ください。なお、資料は教員が作成する必要があります。" (Score: 0.8198)

科目に対応するコースへ学生を登録すれば、履修申請前でも受講生をコースへアクセスさせることができます。登録方法の一つに、コースを自己登録可能に設定して学生に自己登録させる方法があります。コースへの自己登録の可否は、各授業の担当教員が設定できます。学生が自己登録したいと思っても、授業担当の先生が設定しなければ自己登録できません。なお、学部1年生は、前期の情報リテラシー実践I/IAの初回授業で情報倫理講習を受講するまでkibacoを利用できません。詳細は授業担当者向けマニュアルの34ページをご覧ください。" (Score: 0.8008)

仮登録の受講生が履修申請した場合、翌日に本登録へ変更されます。履修申請しない場合は履修申請期間を過ぎても仮登録のままコースに残ります。授業担当教員は仮登録の受講生をコースから登録解除することができます。ただし、以下の場合、履修申請していても受講生は仮登録のままとなりますので注意してください。1. 科目等履修生や研究生などの非正規学生。表示を本登録へ変更する必要がある場合は、教務課や国際課などの事務あるいは授業担当の先生から システム管理室2(e-learning-ml●ml.tmu.ac.jp、●をアットマークに変えてください) へ連絡してください。2. 事務情報システムに登録されていないコースをkibaco上で作成し利用している場合。なお、受講生が履修申請を取り下げた場合のみ、本登録の受講生がコースから登録解除されます。 (Score: 0.6863)

上記の例では近しい回答を出せているように見えます。

単語が揺れてもいい感じに回答を取得できるのか検証

例として以下の回答が欲しい場合を考えてみます。

まず、パスワードの有効期限が過ぎたことでkibacoを利用できなくなった場合、利用停止後2週間以内ならTMUNER上で新しいパスワードを設定できます。TMUNER上で「利用者メニューログイン」から、「利用者メニューログインはこちら」でログインして確認します。それ以外の場合でユーザIDやパスワードを忘れてしまった場合はこちらを参照してください

データセット上では以下の質問文に対して上記の回答が与えられています。

  • ID、パスワードが分からない。
  • kibacoのログインパスワードを忘れてしまったので、パスワードの再発行をしたい。
  • 荒川キャンパスの学生ですが、kibakoを利用したいのですが、ユーザーIDとパスワードを忘れてしまいました。
  • ログインパスワードの再設定を行いたい。
  • 日野キャンパスの学生ですが、kibakoを利用したいのですが、ユーザーIDとパスワードを忘れてしまいました。

なので、検証として上記の質問と回答で使われている単語を避けて「アカウント情報を紛失した」という問い合わせにしてみます。

結果としては、以下になりました。

Query: アカウント情報を紛失した

Top 3 most similar sentences in corpus:
まず、パスワードの有効期限が過ぎたことでkibacoを利用できなくなった場合、利用停止後2週間以内ならTMUNER上で新しいパスワードを設定できます。TMUNER上で「利用者メニューログイン」から、「利用者メニューログインはこちら」でログインして確認します。それ以外の場合でユーザIDやパスワードを忘れてしまった場合はこちらを参照してください。" (Score: 0.5902)

新学期が始まる約一ヶ月までの情報を基にしているので、それ以降の変更は反映されていません。授業担当者ではなくなった場合、登録を解除する必要があります。登録解除は システム管理室2(e-learning-ml●ml.tmu.ac.jp、●をアットマークに変えてください)宛てに申請メール(科目名、授業番号、解除する教員氏名、解除する教員の教育研究用情報システムID)を主担当教員からお送りいただくか、主担当教員をCCに入れてお送りいただく必要があります。 (Score: 0.4353)

仮登録の受講生が履修申請した場合、翌日に本登録へ変更されます。履修申請しない場合は履修申請期間を過ぎても仮登録のままコースに残ります。授業担当教員は仮登録の受講生をコースから登録解除することができます。ただし、以下の場合、履修申請していても受講生は仮登録のままとなりますので注意してください。1. 科目等履修生や研究生などの非正規学生。表示を本登録へ変更する必要がある場合は、教務課や国際課などの事務あるいは授業担当の先生から システム管理室2(e-learning-ml●ml.tmu.ac.jp、●をアットマークに変えてください) へ連絡してください。2. 事務情報システムに登録されていないコースをkibaco上で作成し利用している場合。なお、受講生が履修申請を取り下げた場合のみ、本登録の受講生がコースから登録解除されます。 (Score: 0.4321)

ほしい回答が得られていますね。
このように、単語の揺れにはある程度対応できるFAQ検索がSentence BERTで実現できそうです。

まとめ

今回はSentence BERTを使って、FAQの類似文章検索を試してみました。
キーワード検索でもヒットしない回答が、類似文章検索を使えば取得できそうです。
Sentence BERT自体は、文章のペアを作れるデータがあれば適用可能なので、他の分野でも活用できそうですね。
ぜひ試してみてください。

それではまた。

Acroquest Technologyでは、キャリア採用を行っています。


  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • Elasticsearch等を使ったデータ収集/分析/可視化
  • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。


www.wantedly.com



pyinfraを使ってWindows環境から自動構築を実行してみた

こんにちは。トシです。
本投稿はアクロクエスアドベントカレンダー12月25日の記事です。

今年も残りわずか、このアドベントカレンダーも終盤ですね。
今回は、普段、自動構築にAnsibleを利用しているのですが、pyinfraなるものがあると聞き、試してみたので紹介します。
なお、Ansibleなどのツールを利用したことがある前提で記載しています。
pyinfra.com

構築にあたっての構成管理、構築の自動化として、私はAnsibleを利用していますが、
Windows環境で、Playbookを作成し、Ansibleを利用(実行)しようと考えると、
WSL/Dockerを利用する形になるかと思います。

仕事柄、Windows環境でPlaybookを作成するケースがあり、そのまま実行したいということがありました。
pyinfraを見つけた時に、これを使えばWindows環境からでも構築を実行できるのではと思い使ってみました。

pyinfraとは?

詳細は、ドキュメント見ていただけるとよいのですが、使ってみた感触は、Ansibleと同じ感覚で利用できました。
docs.pyinfra.com

Webサイトには、
Super fast、Idempotent operationsなどの特徴が書かれていますが、

私の試した限りでは、以下のような特徴があるかな、と思いました。

  1. 対象のノードをインベントリとして定義できる、インベントリのホストやグループとしてパラメータを定義可能。
  2. Ansibleのモジュール相当のものが、Operationsという形で用意されています。
  3. パッケージをインストールして、設定を行うなど、よくある構築であればOperationを使って実行可能。
    Operations Index — pyinfra documentationに一覧があります。気になる方はチェックしてみてください。
  4. Ansibleと同様に、アドホックコマンドが使える
  5. 拡張性として、Ansibleの場合は、Pythonを書くなら、モジュールとして作成することになりますが、pyinfraの場合は、モジュールの代わりにOperationが作成できるのと、Playbook相当の処理を記述する際にPythonのライブラリが直接使えるのは便利ですね。
  6. 他にもコネクターも種類があり、便利そう。

構築時の環境

今回、構築に利用する環境は、以下になります。
PrometheusとNode Exporterをインストールしてセットアップします。
※PrometheusとNode Exporter自体は構築しただけで、pyinfraと関係しないため説明は省略します

自分の作業PCからnode01とnode02にSSH接続できる状態になっています。
Linux環境のテストに、Amazon Linuxを使っています。
途中のネットワークは省略しています。

さて、本題

pyinfraが、Ansibleと似たような形で、構築を自動化するのに利用できそうだなと思ってもらえたところで
実際に試したことを紹介します。

前提にも記載した通り、Linuxサーバ2台に、PrometheusとNode Exporterをインストールしてみました。

作成したファイルは以下のような形です。

ファイル 概要
files\prometheus.repo Prometheus/Node Exporterをパッケージインストールするのに利用したrepoファイル
files\prometheus.yml Prometheusの設定ファイル(インストールしたNodeExporterを対象にするfile_sd_configs設定を追加)
tasks\prometheus.py Prometheusをインストールする処理を記述したファイル
tasks\target.py Node Exporterをインストールする処理を記述したファイル
templates\target.yml.j2 PrometheusがNode Exporterが入ったノードをターゲットと認識するファイルを出力するためのテンプレート
deploy.py 実行時に指定する。構築の処理を指定するファイル
inventory.py 実行時に指定する。対象ノードを指定するインベントリファイル


pyinfraのインストールは、pipxを利用して、

> pipx install pyinfra 

だけです。

pyinfraをインストール後、上記のファイルを用意して、

> pyinfra inventory.py deploy.py 

を実行すると、無事、PrometheusとNode Exporterが構築されて、
以下の画面が見られました。すばらしいですね。
作成したファイルの内容も以下に記載しますが、Ansibleで、Playbookを記述するのと同じような内容です。

インベントリや、処理内容を記述したファイルはどんなものか

作成したファイルをいくつか示します。接続情報など省いているのはご了承ください。
■インベントリ inventory.py
 Prometheusをインストールする対象と、Node Exporterをインストールする対象を分けてい書いています。

prometheus_servers = [
    ("node01", {"ssh_hostname": "<node01のホスト名>",
     "ssh_user": "<ユーザ>", "ssh_key": "<鍵ファイル>", "private_ip": "172.31.40.187"})
]

target_servers = [
    ("node01", {"ssh_hostname": "<node01のホスト名>",
     "ssh_user": "<ユーザ>", "ssh_key": "<鍵ファイル>", "private_ip": "172.31.40.187"}),
    ("node02", {"ssh_hostname": "<node02のホスト名>",
     "ssh_user": "<ユーザ>", "ssh_key": "<鍵ファイル>", "private_ip": "172.31.36.50"})
]

■処理内容を記述して実行コマンドで指定するファイル deploy.py
 インベントリにあわせて、実行する処理のPythonファイルを指定しています。

from pyinfra import host, local

if 'prometheus_servers' in host.groups:
    local.include('tasks/prometheus.py')

if 'target_servers' in host.groups:
    local.include('tasks/target.py')

■メインのPrometheusの構築内容を記述したファイル prometheus.py
 prometheusをインストールして、Node Exporterをターゲットと設定するために以下の記述になりました。

from pyinfra import inventory
from pyinfra.operations import server, files, yum, systemd

server.shell(
    name="Install prometheus",
    commands=["echo prometheus"],
)

files.put(
    name="Put prometheus repo file",
    src="files/prometheus.repo",
    dest="/etc/yum.repos.d/prometheus.repo",
    mode="644",
    _sudo=True,
)

yum.packages(
    name="Install prometheus2",
    packages=["prometheus2"],
    _sudo=True,
)

files.directory(
    name="Create directory",
    path="/etc/prometheus/targets",
    _sudo=True,
)

files.put(
    name="Put prometheus config file",
    src="files/prometheus.yml",
    dest="/etc/prometheus/prometheus.yml",
    mode="644",
    _sudo=True,
)

for target in inventory.get_group("target_servers"):
    files.template(
        name="Create target file",
        src="templates/target.yml.j2",
        dest="/etc/prometheus/targets/tg_" + target.name + ".yml",
        mode="644",
        private_ip=target.host_data.get("private_ip"),
        _sudo=True,
    )

systemd.service(
    name="Start prometheus service",
    service="prometheus.service",
    running=True,
    restarted=True,
    enabled=True,
    _sudo=True,
)

とてもシンプルな記述ですよね。これらのファイルを見れば、やっていることはだいたいわかるかと思います。

まとめ

Windows環境でも、Ansibleと同じような感覚で簡単に構築自動化が実現できました。
Pythonを書いているので、VSCodeでも補完ができたり、扱いやすかったです。

詳細にAnsibleと比較してはいないですが、十分利用できそうだという感触でした。
興味持たれた方は試してみてください。

それでは。


Acroquest Technologyでは、キャリア採用を行っています。

  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • Elasticsearch等を使ったデータ収集/分析/可視化
  • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

顧客のビジネスをインフラから加速するエンジニア募集! - Acroquest Technology株式会社のインフラエンジニアの採用 - Wantedlywww.wantedly.com

Amazon Rekognition Streaming Video Eventsでリアルタイムに人検知を行う

この記事は AI/ML on AWS Advent Calendar 2022 12/23、および、アクロクエスアドベントカレンダー 12/23 の記事です。

qiita.com

こんにちは、Acroquest データサイエンスチーム YAMALEX メンバーの駿です。

早いもので2022年もあと一週間と少しになってしまいました。

皆さんは年を越す準備は万端でしょうか?
私が住む社員寮では慌てて年越しそばを買ったり、餅を買ったり、ぎりぎりになってバタバタしています。

さて、今回は 2022年4月に発表された Amazon Rekognition の Streaming Video Events を使って、リアルタイムの人検知を試してみました。
エッジデバイスには Raspberry Pi を使いました。


検出結果の例

1. はじめに

最初に今回のメインとなるサービスについて簡単に説明します。

(1) Amazon Rekognition とは

aws.amazon.com

Amazon Rekognition (以下 Rekognition ) は AWS が提供する、機械学習を使用した画像と動画の分析を行うサービスです。

画像・動画からあらかじめ用意された物体を検出するラベル検出、 顔を検出と表情分析、写真間の顔の比較など、様々な機能を 追加の学習なしに利用することができます。
また、カスタムラベルを利用することで、独自のオブジェクトを少ない枚数で学習し検出することも可能です。

今回使用する Rekognition Streaming Video Events は Rekognition に新しく追加された機能で、 既存の Rekognition の機能である Rekognition Video は一度 S3 に保存した動画を使う必要があったところ、 Streaming Video Events は Kinesis Video Streams を利用してストリーム動画に対応しています。

(2) Rekognition Streaming Video Events とは

aws.amazon.com

Rekognition Streaming Video Events は、デバイス等から送信されてきた映像をリアルタイムで解析し、特定の物体などを検知させ、それに対して何かしらの処理を連携させることが可能です。

今回はラベル(人/ペット/荷物)検出の内、人を検出してみます。
他にも、あらかじめ保存した顔を映像から検出する機能もありますが、今回は扱いません。
また、残念ながら Streaming Video Events を使ったカスタムラベルの検出はまだ対応していないようです。

Rekognition Streaming Video Events は Amazon Kinesis Video Streams から動画ストリームを取得し、処理を行います。
処理の結果検出された画像は S3 に保存され、 SNS で詳細のJSONが通知されます。

SNS からはKinesis Data Firehose 、 SQS 、 Lambda など様々なサービスがサブスクライブすることができます。

通知された処理結果は、例えば Lambda を使って加工し、玄関に訪れた人の画像をアプリ上に表示する、リアルタイムに防犯用のライトを点灯させる、などのアクションにつなげることができます。

想定されるユースケースとしては、監視カメラの動体検知と組み合わせることで、偽イベントを削減することが挙げられます。
ここで偽イベントとは、人がカメラ内に入ったことを検出したいのにもかかわらず、野生動物が横切った動きに反応してしまったイベントなどを指します。

すなわち、目的のオブジェクトが検出された時にタイムリーで実用的なアラートを配信し、誤ったアラートを最小限に抑えることができます。

2. 構成図

今回はエッジデバイスとして Raspberry Pi を使用しました。

構成図

大まかな流れは下記のとおりです。

  1. Raspberry Pi から Kinesis Video Streams へ動画を流す
  2. Rekognition Streaming Video Events の プロセッサを起動し、Kinesis Video Streams からストリームを取得し解析を行う
  3. 検出結果画像を S3 に保存し、検査結果詳細 JSONSNS に流す
  4. SNS からメール通知が行われる

3. セットアップ

ストリームを受け取って解析をする Kinesis Video Streams + Rekognition Streaming Video Events 側と、 動画をストリームする Raspberry Pi 側に分けて説明します。

(1) 前提

  1. 最新の AWSCLI v2 が Rekognition Video Stream Events のセットアップに使用するマシンにインストールされていること

    最新(2.9.5 2022/12/22現在)になっていないと、 Streaming Video Events が使う引数が不足している可能性があります。

  2. 各リソースにアクセスできる IAM ユーザがあること

    また、コマンドラインアクセスのため、アクセスキーが発行され、 aws configure で設定されていること

(2) Kinesis Video Streams + Rekognition Streaming Video Events

AWS 上で必要なリソースを作成していきます。

  1. 出力先

    1. S3 バケット作成

      検出結果などが出力されるバケットです。

      • 今回は 202212-rekognition-bucket という名前で作成しました。
      • 設定はすべてデフォルトのままです。

      S3 バケット作成

    2. SNS トピック作成

      Rekognition が検出した物体をメールやSQSに通知するための トピックです。

      • 今回はスタンダードなトピックを 202212-rekognition-topic という名前で作成しました。
      • Eメール通知を行うサブスクリプションを作成しました。

        作成した後、メールアドレス確認のためのメールが届くので、リンクをクリックして確認してください。

      SNS トピック作成

      サブスクリプション作成

  2. 入力元

    1. Kinesis ビデオストリーム

      Rekognition が参照するビデオストリームを作成します。

      • 今回は 202212-rekognition-stream という名前で作成しました。
      • 設定はすべてデフォルトのままです。

      ビデオストリーム作成

  3. IAM ロール

    Rekognition Streaming Video Events のプロセッサが上で作成した入力と出力のリソースにアクセスするためのロールを作成します。

    1. ポリシー

      こちらのドキュメント に書かれた JSON を参考にして、 202212-rekognition-policy という名前のポリシーを作成しました。

      ステートメントの Resource には上で作成したリソースの Arn をコピペしてください。

    2. ロール

      ユースケースに Rekognition を選択し、 202212-rekognition-role という名前のロールを作成しました。

      こちらのロールに先ほど作成したポリシーをアタッチします。

  4. Stream Processor

    最後に Rekognition Streaming Video Events でビデオを解析するため、Stream Processor を作成します。

    Stream Processor は Rekognition Streaming Video Events が行う一連の処理を管理します。 どのビデオストリームから取得するか、 S3 と SNS はどこに出力するか、 Rekognition での解析は何を行うのか、などの設定を行うことができます。

    ここで検出対象(人/ペット/荷物)の指定と、通知する確度の閾値を設定します。

    ここからは AWS コンソールから実行できないため(2022/12/22 現在)、 AWSCLI を利用します。

    1. CreateStreamProcessor へのリクエストを定義する JSON ファイルを作成

       // create_stream_processor_request.json
       {
         // 作成する Stream Processor の名前
         "Name": "202212-rekognition-processor",
         // 入力元。上で作成した Kinesis ビデオストリーム の Arn を指定
         "Input": {
           "KinesisVideoStream": {
             "Arn": "arn:aws:kinesisvideo:region:accountID:stream/202212-rekognition-stream/id"
           }
         },
         // 出力先。上で作成した S3 バケットと任意の出力先プレフィックスを指定
         "Output": {
           "S3Destination": {
             "Bucket": "202212-rekognition-bucket",
             "KeyPrefix": "processor_output"
           }
         },
         // 出力先。上で作成した SNS トピックの Arn を指定
         "NotificationChannel": {
           "SNSTopicArn": "arn:aws:sns:region:accountID:202212-rekognition-topic"
         },
         // 入出力のリソースにアクセスするために作成したロールの Arn を指定
         "RoleArn": "arn:aws:iam::accountID:role/202212-rekognition-role",
         // 通知する条件を指定
         "Settings": {
           "ConnectedHome": {
             // 対象を指定( PERSON / PET / PACKAGE )
             "Labels": [
               "PERSON"
             ],
             // 通知する確度の下限を指定
             "MinConfidence": 80
           }
         }
       }
      

      人の検出ではなく、顔の検出を行いたい場合は、上記の Settings 内を適宜変更する必要があります。

    2. CreateStreamProcessor を実行

       $ aws rekognition create-stream-processor --cli-input-json file://create_stream_processor_request.json
      
       {
         "StreamProcessorArn": "arn:aws:rekognition:region:accountID:streamprocessor/202212-rekognition-processor"
       }
      
    3. 作成された Stream Processor を確認

       $ aws rekognition describe-stream-processor --name 202212-rekognition-processor
      
       {
         "Name": "202212-rekognition-processor",
         "Status": "STOPPED",
         (以下略)
       }
      

      上の JSON で指定した内容で Stream Processor が作成できていることを確認できました。

(3) Raspberry Pi

Raspberry Pi 上で Kinesis Video Streams に動画を流す設定を行います。

C++ と GStreamer プラグインを含む Producer SDK を利用するため、 Docker を使いました。

AWSKinesis Video Streams の C++ プロデューサのデモとして、 Raspberry Pi 用の Dockerfile を提供しているため、それを利用します。

  1. レポジトリのクローン

     $ git clone https://github.com/aws-samples/amazon-kinesis-video-streams-demos.git
     $ cd amazon-kinesis-video-streams-demos/producer-cpp/docker-raspberry-pi
    
  2. ビルド

     $ docker build -t kinesis-video-producer-sdk-cpp-raspberry-pi .
    
  3. コンテナ起動

     $ docker run -it --rm --device=/dev/video0 --device=/dev/vchiq -v /opt/vc:/opt/vc kinesis-video-producer-sdk-cpp-raspberry-pi:latest /bin/bash
    
  4. GStreamer で配信

     # gst-launch-1.0 v4l2src do-timestamp=TRUE ! \
         video/x-raw,width=640,height=480,framerate=30/1 ! \
         videoconvert ! \
         x264enc bframes=0 key-int-max=45 bitrate=512 ! \
         video/x-h264,profile=baseline,stream-format=avc,alignment=au ! \
         # ここで配信先の Kinesis Video Streams のストリームを指定する
         kvssink stream-name="202212-rekognition-stream" \
         access-key="xxxx" secret-key="xxxx"
    

4. 実行

監視カメラが動体を検知し、Streaming Video Events のトリガーを引いた、という想定で実行してみます。

検出時間は10秒で設定しました。

  1. StartStreamProcessor へのリクエストを定義する JSON ファイルを作成

     // start_stream_processor_request.json
     {
       "Name": "202212-rekognition-processor",
       "StartSelector": {
         "KVSStreamStartSelector": {
           // 解析を開始する日時をプロデューサのタイムスタンプで指定します
           // Unixtime (milliseconds)
           "ProducerTimestamp": 1671447005000
         }
       },
       "StopSelector": {
         // 解析を行う長さを指定します
         // 最長 120 秒(2分)
         "MaxDurationInSeconds": 10
       }
     }
    
  2. StartStreamProcessor を実行

     $ aws rekognition start-stream-processor --cli-input-json file://start_stream_processor_request.json
    

通知結果

StartStreamProcessor を実行するとすぐに2通のメールが届きました。
一つは LABEL_DETECTED (人を検出した)、もう一つは STREAM_PROCESSING_COMPLETE ( Stream Processor が正常終了した)の通知です。

LABEL_DETECTED の メール通知は下記のデータが含まれていました。

// 一部省略
{
  "labels": [
    {
      "id": "xxx",
      "confidence": 89.94747,
      "name": "PERSON",
      "frameImageUri": "s3://202212-rekognition-bucket/processor_output/202212-rekognition-processor/xxxx/notifications/xxx.jpg",
      "croppedImageUri": "s3://202212-rekognition-bucket/processor_output/202212-rekognition-processor/xxxx/notifications/xxx_heroimage.jpg",
      "videoMapping": {
        "kinesisVideoMapping": {
          "producerTimestamp": 1671445924948
        },
        "boundingBox": {
          "left": 0.35534027,
          "top": 0.26913863,
          "height": 0.53154695,
          "width": 0.11012024
        }
      }
    }
  ]
}

画像は S3 に保存されているので、通知に含まれている S3 のリンクをたどると、画像を見ることができます。

検出されたPERSON (私)。 boundingBox の情報をもとに青い枠を後からつけている

今回はメール通知だけでしたが、接続先に Lambda を追加すれば、 Lambda を起動して、検出情報を加工して携帯のアプリで検出された人物を表示する、防犯用のライトを点灯させる、なども容易に行えます。

5. 後片付け

無駄な料金がかからないように今回作成したリソースを削除します。

  1. Rekognition Stream Processor を削除

     $ aws rekognition delete-stream-processor --name 202212-rekognition-processor
    
  2. Kinesis ビデオストリーム を削除

  3. SNS サブスクリプションおよびトピックを削除

  4. S3 のバケットを削除

  5. 作成した IAM ロールおよび ポリシーを削除

6. Pricing

  1. Rekognition Streaming Video Events

    今回使用したラベル検出( PERSON / PET / PACKAGE )は 0.00817USD/分 で利用可能です。
    なので今回実施した内容だけならほぼ 0USD で実施可能です。

    Kinesis Video Streams の料金はこれとは別にかかるので注意してください。

  2. Kinesis Video Streams

    • 取り込まれたデータ 0.00850USD/1GB
    • 消費されたデータ 0.00850USD/1GB

    640x480 の動画 10 秒間のサイズは、せいぜい 800KB 程度なので、 こちらも今回の実施範囲ではほぼお金はかかりません。

もともとコネクテッドホームで大量のカメラが接続されて、それぞれが動体検知時にプロセッサを実行するようなユースケースが想定されているようで、個々の実行は安価で行えるように料金が設定されているようです。

まとめ

Amazon Rekognition Streaming Video Events を利用してリアルタイムの人検出をやってみました。

常に検知し続ける、というよりもイベントに反応して検知を始めて、それが誤検知でないことを確認する、といったユースケースが想定されているようです。

自宅の監視カメラと接続すると相性がよさそうだと思った半面、 工場のラインなどの製品検査には向かないだろうと思います。

Acroquest Technologyでは、キャリア採用を行っています。
  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • Elasticsearch等を使ったデータ収集/分析/可視化
  • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
  少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。 www.wantedly.com

SpringBoot/Quarkus/Micronautの性能を検証してみた ~その1 起動編~

こんにちは。@phonypianistです。
本投稿はアクロクエスアドベントカレンダー 12月21日 の記事です。

最近、Quarkusアプリを本番適用しました。
QuarkusはJavaアプリを作るための軽量なフレームワークで起動が速いって聞くけど、実際どれくらい速いんだろう?と気になったので、Spring Bootや、類似OSSのMicronautと比べてみました。

背景

JavaフレームワークといえばSpringBootが主流ですが、起動が遅かったり、必要なメモリが多かったりしています。 これは、アプリ起動時にリフレクションを用いてDI(Dependency Injection)を行っているのが要因の1つです。

マイクロサービス、コンテナネイティブなアプリケーションは、負荷の状況に応じて、シームレスにスケールアウトできる必要があります。 アプリケーションの起動速度が遅かったり、メモリ消費量が多かったりすると、負荷に対してシステムの性能が追随できません。

この問題を解決し高速化するために、QuarkusやMicronautといった新しいフレームワークが登場しています。

ja.quarkus.io

micronaut.io

QuarkusもMicronautも、コンパイル時にDIの解決を行うことで、起動速度や処理速度の向上、およびメモリ使用量の削減ができます。

簡単に、SpringBoot、Quarkus、Micronautの特徴を以下にまとめます。

フレームワーク 特徴
SpringBoot
Quarkus
Micronaut
  • メモリ使用量が少なく、起動時間が短くなるようにしたJavaフレームワーク
  • SpringBootと一部互換性を持っており、SpringBootから移行しやすいように作られている

QuarkusやMicronautでは、GraalVMを使用してネイティブアプリも作ることができます。 これにより、アプリの起動時間を飛躍的に短縮し、メモリも節約することができます。
※Spring Bootもネイティブアプリが作れるようになりましたが、いくつか課題があるため今回は対象外としています。

今回は、アプリの起動における性能について、SpringBoot、Quarkus、Micronautでどれくらい異なるのか、調べてみました。 QuarkusとMicronautは、ネイティブアプリも計測してみました。

前提

環境構成

AWS Fargateを用いて、以下のアプリを起動します(それぞれ1コンテナずつ、計5コンテナを起動)。

  1. SpringBoot (Java)
  2. Quarkus (Java)
  3. Quarkus (Native)
  4. Micronaut (Java)
  5. Micronaut (Native)

性能比較構成

アプリの1コンテナあたりのリソースは、1vCPU、2GBメモリを割り当てました。

検証に使用するアプリ

データベースのテーブルにあらかじめ入っているレコードを取得・返却するアプリです。 REST APIを提供し、クライアントからそのAPIが呼び出されると、JPAを用いてデータベースからレコードを取得し、クライアントに返却します。 SpringBoot、Quarkus、Micronautそれぞれ同じ処理内容にしてあります。

使用するミドルウェア・ライブラリのバージョンは以下の通りです。

  • Java:17
  • SpringBoot:3.0.0
  • Quarkus:2.13.1.Final
  • Micronaut:3.7.1

それぞれのソースコードは以下の通りです。

SpringBoot

UserController.java

@RestController
@RequestMapping("user")
public class UserController {

    private final UserService userService;

    public UserController(UserService userService) {
        this.userService = userService;
    }

    @GetMapping
    Iterable<User> getAllUsers() {
        return userService.getAllUsers();
    }

    @GetMapping("/{userId}")
    User getUser(@PathVariable("userId") String userId) {
        return userService.getUser(userId);
    }

}

UserService.java

@Component
public class UserService {

    private final UserRepository userRepository;

    public UserService(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Iterable<User> getAllUsers() {
        return userRepository.findAll();
    }

    public User getUser(String userId) {
        return userRepository.findById(userId).orElseThrow(() -> new RuntimeException(userId));
    }

}

UserRepository.java

@Repository
public interface UserRepository extends CrudRepository<User, String> {

}

Quarkus

UserResource.java

@Path("/user")
@Produces("application/json")
public class UserResource {

    @Inject
    UserService userService;

    @GET
    public List<User> getAllUsers() {
        return userService.getAllUsers();
    }

    @GET
    @Path("/{userId}")
    public User getUser(@PathParam("userId") String userId) {
        return userService.getUser(userId);
    }

}

UserService.java

@ApplicationScoped
public class UserService {

    @Inject
    UserRepository userRepository;

    public List<User> getAllUsers() {
        return userRepository.findAllUsers();
    }

    public User getUser(String userId) {
        return userRepository.findByUserId(userId);
    }

}

UserReposirory.java

@ApplicationScoped
public class UserRepository implements PanacheRepository<User> {

    public List<User> findAllUsers() {
        return findAll().list();
    }

    public User findByUserId(String userId) {
        return find("user_id", userId).firstResult();
    }

}

Micronaut

UserController.java

@Controller("/user")
public class UserController {

    private final UserService userService;

    public UserController(UserService userService) {
        this.userService = userService;
    }

    @Get
    Iterable<User> getUsers() {
        return userService.getUsers();
    }

    @Get("/{userId}")
    User getUser(@PathVariable("userId") String userId) {
        return userService.getUser(userId);
    }

}

UserService.java

@Singleton
public class UserService {

    private final UserRepository userRepository;

    public UserService(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Iterable<User> getUsers() {
        return userRepository.findAll();
    }

    public User getUser(String userId) {
        return userRepository.findById(userId).orElseThrow(() -> new RuntimeException(userId));
    }

}

UserRepository.java

@Repository
public interface UserRepository extends CrudRepository<User, String> {

}

計測内容

5つのアプリを起動して、それぞれの起動速度(起動にかかった時間)と、メモリ使用量を計測します。

起動にかかった時間は、CloudWatchログに出力される、アプリ(各フレームワーク)の起動時間を確認します。 メモリ使用量は、CloudWatchメトリクスの値を確認します。

1つのアプリに対して5回起動し、最大と最小を除く3回分の、起動にかかった時間とメモリ使用量の平均値を計算しました。

計測結果

起動完了までの時間は、以下のようになりました。

フレームワーク別起動完了までの時間

起動完了までの時間は、やはりQuarkusもMicronautも、ネイティブアプリが爆速です。
Javaアプリの中では、Quarkusが他のフレームワークと比べると速くなっています。

続いて、起動直後のメモリ使用量です。

起動直後のメモリ使用量

メモリ使用量も、やはりネイティブアプリがJavaアプリの半分の量の消費で抑えられています。特にQuarkusのネイティブアプリはたった12MBの消費で抑えられています。
Javaアプリでは、SpringBootよりQuarkusやMicronautの方が、少し消費量が低くなっています。

まとめ

やはり、ネイティブアプリは起動時間もメモリ使用量もパフォーマンスが良いですね。 Javaアプリの中では、Quarkusが他のフレームワークよりもパフォーマンスが良いようです。
※アプリの構成や環境によって、変わることがあります。

今回は、起動時のパフォーマンスについて検証しました。
次回は、リクエスト処理性能について検証したいと思います。

それでは。

Acroquest Technologyでは、キャリア採用を行っています。
  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • Elasticsearch等を使ったデータ収集/分析/可視化
  • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
  少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。 www.wantedly.com

Kibana Custom Mapで地図可視化 #2 独自の地図にスタイルを設定

この記事は Elastic Stack (Elasticsearch) Advent Calendar18日目の記事です。

こんにちは、Elastic認定アナリストのshiroiです。

前回のブログでは、「Elasticsearch」からCustom Mapを読み込む方法について紹介しました。

参考:
acro-engineer.hatenablog.com

おさらいをすると、KibanaでCustom Mapを読み込む場合には、以下の2つの方法があります。

  1. 「Elasticsearch」からCustom Mapを読み込む
  2. 「GeoServer」からCustom Mapを読み込む

第二弾では、「GeoServer」からCustom Mapを読み込む方法を紹介します。

Geoserverとは

GeoServerは、地理情報を編集し、地図情報を配信することが可能なOSSです。
このGeoServerを使えば、XML形式でスタイル設定をすることで、複数の地図に対して、
デザインを統一することや、 拡大縮小に合わせて、表示するラベルの制御を行うことが可能です。

注意点として、ブラウザが直接GeoServerと通信して地図情報を読み込む仕様であるため、
不特定多数の人が利用するにはGeoServerを一般公開する必要があります。
どちらかというとイントラネットでの利用に向いています。

利用したい環境に合わせて、以下のサイトから構築用のファイルをダウンロードできます。

GeoServer:https://geoserver.org/

今回書かないこと

以下の内容は、対象外とします。

  • Shapeファイルの作り方
  • GeoJSONの作り方
  • GeoServerに地図データ(Shapeファイル)を登録する方法の詳細
  • 登録した地図データにデザインを適用する方法の詳細

GeoJSONとは、位置情報とその位置についてのラベル情報や形状(点や矩形など)情報を持ったJSONです。

GeoServerでShapeファイルを登録する方法と地図データにデザインを適用する方法は、以下のドキュメントを参照ください。

Shapeファイルを登録:
https://docs.geoserver.org/latest/en/user/gettingstarted/shapefile-quickstart/index.html

地図データにデザインを適用:
https://docs.geoserver.org/latest/en/user/styling/index.html#styling

実際に試してみる

今回も、コワーキングスペースの可視化を想定し、1週間の平均利用時間を可視化します。
インデックスは以下です。

インデックス名:room-availability-weekly-yyyy-MM-dd

フィールド名 概要
room_name ルーム名
use_count ルームの利用回数
use_date ルームの利用日時
use_hours ルームの利用時間

GeoServerに登録した地図をKibana Mapsに登録

GeoServer構築には以下のDockerイメージを利用しました。
https://hub.docker.com/r/kartoza/geoserver

GeoServerに、前回のブログで記載した時と同じShapeファイルを登録します。
以下3つの独自の地図を表示できるようにします。

  • 部屋の枠と床の部分
  • 机と椅子
  • ルーム名

GeoServerを使用した場合、Powerpointのオブジェクトのグループ化のように、レイヤーをグループ化することができます。
ここでは、「部屋の枠と床の部分」と「机と椅子」のレイヤーグループを作成しました。

GeoServerでレイヤーグループを設定:
https://docs.geoserver.org/latest/en/user/data/webadmin/layergroups.html

また、GeoServerでXML形式のスタイルを設定することで、地図の各レイヤーごとに色を塗ることができます。

GeoServerに登録後、Kibana Maps画面の「Add layer」を選択し、「Web Maps Service」を選択します。

「Web Map Service」を選択

Kibana Mapsの「Web Maps Service」を使用する場合、以下のようなURLを指定することで、GeoServerに登録した地図のレイヤーを呼び出すことができます。

http://localhost:8600/geoserver/{登録した地図の名前}/wms

「Web Maps Service」を選択後、以下のような画面となるので、使用したい地図のURLを入力します。

GeoServerに登録した地図を呼び出す

「Load capabilities」をクリックすると、GeoServerに登録した地図のレイヤーを選択することができます。
まずは、GeoServerで事前にレイヤーをグループ化した、コワーキングスペースの「部屋の枠と床の部分」と「机と椅子」を表示するためのレイヤーを登録します。

表示したいレイヤーを選択

レイヤー名を入力し、設定を保存します。

レイヤー名を設定

これで地図部分は完成です。
ルーム名が一番上に表示されるようにする必要があるため、ルーム名を表示するレイヤーを同じ操作で登録します。

登録後、以下のような地図になります。

レイヤーの登録完了後のKibana Maps

GeoServerでスタイルを設定することで、地図が拡大されたらラベルを表示し、地図が縮小されたらラベルを非表示にすることができます。

地図が縮小されると、ラベルが非表示になる。

このスタイルを有効活用することで、Google Mapsのように、地図の表示を制御していくことができます。

以下は、ラベルの表示/非表示を設定する場合のスタイル(XML)の一例です。

<?xml version="1.0" encoding="ISO-8859-1"?>
<StyledLayerDescriptor version="1.0.0"
  xsi:schemaLocation="http://www.opengis.net/sld http://schemas.opengis.net/sld/1.0.0/StyledLayerDescriptor.xsd"
  xmlns="http://www.opengis.net/sld" xmlns:ogc="http://www.opengis.net/ogc"
  xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <NamedLayer>
        <Name>room name</Name>
        <UserStyle>
            <Title>room name style</Title>
                <FeatureTypeStyle>
                <!-- ポリゴンサイズに合わせて、四角が塗られないように透明色をセット -->
                <Rule>
                    <PolygonSymbolizer>
                        <Fill>
                            <CssParameter name="fill">#F9F9F9</CssParameter>
                            <CssParameter name="fill-opacity">0</CssParameter>
                        </Fill>
                        <Stroke>
                            <CssParameter name="stroke">#EFEFEF</CssParameter>
                            <CssParameter name="stroke-width">0</CssParameter>
                        </Stroke>
                    </PolygonSymbolizer>
                </Rule>
                <!-- ズームレベルが3以上4未満の場合は、ラベルをfont size:10で表示。 -->
                <Rule>
                    <Name>Medium</Name>
                    <MinScaleDenominator>20000000</MinScaleDenominator>
                    <MaxScaleDenominator>30000000</MaxScaleDenominator>
                    <TextSymbolizer>
                        <Label>
                            <ogc:PropertyName>room_name</ogc:PropertyName>
                        </Label>
                        <Font>
                            <CssParameter name="font-family">Arial</CssParameter>
                            <CssParameter name="font-style">normal</CssParameter>
                            <CssParameter name="font-weight">bold</CssParameter>
                            <CssParameter name="font-size">10</CssParameter>
                        </Font>
                        <LabelPlacement>
                            <PointPlacement>
                                <AnchorPoint>
                                    <AnchorPointX>0.5</AnchorPointX>
                                    <AnchorPointY>0.5</AnchorPointY>
                                 </AnchorPoint>
                            </PointPlacement>
                        </LabelPlacement>
                        <VendorOption name="partials">true</VendorOption>
                        <VendorOption name="polygonAlign">ortho</VendorOption>
                    </TextSymbolizer>
                </Rule>
            </FeatureTypeStyle>
        </UserStyle>
    </NamedLayer>
</StyledLayerDescriptor>

GeoServerを利用した地図のスタイル設定の詳細は、以下を参照してください。

地図データにデザインを適用:
https://docs.geoserver.org/latest/en/user/styling/index.html#styling

ドキュメントを登録

Kibana MapsからGeoJSONを一度登録すると、登録したデータをindexとして参照することができます。
「Add Layer」から「Documents」を選択し、前回登録したindex(GeoJSONデータ)を呼び出します。

「Add Layer」から「Documents」を選択

登録済みのドキュメントを呼び出す

レイヤー名や不透明度を設定します。
Terms joinsで登録したドキュメントで指定されたエリアをデータの値別に色が塗られるようにします。

ドキュメントで指定したエリアとコワーキングスペース利用時間のデータを紐づける

平均時間を表示するため「avg」を選択

色設定では、「By Value」にすることで、値別に塗り色を変えることができます。

「By Value」で値別に色を塗る

これで設定は完了です。

データを表示すると以下のようになります。
この図面では、各部屋の利用時間が長ければ長いほど、濃い赤になり、利用時間が短ければ短いほど、薄い赤になります。

独自の地図の可視化完成

まとめ

今回は、GeoServerを使用して、より高度なCustom Mapを作成する方法を解説しました。
GeoServerを使用することで、より細かなスタイル設定ができます。
使用したい地図の複雑さにもよりますが、それぞれのメリット・デメリットも踏まえて、最適な地図可視化を選択していきたいですね。

それでは。



Acroquest Technologyでは、キャリア採用を行っています。

  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • Elasticsearch等を使ったデータ収集/分析/可視化
  • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
Kaggle Grandmasterと一緒に働きたエンジニアWanted! - Acroquest Technology株式会社のデータサイエンティストの採用 - Wantedlywww.wantedly.com

メモリリークを検出するmemlabをAngularで試してみた

アクロクエスアドベントカレンダー 12月15日 の記事です。

こんにちは。 最近の趣味がAtCoderのmaron8676です。パズルを解くみたいに楽しめて、勉強にもなるので毎週コンテストに参加しています。

さて、2022/09にMeta社からmemlabというメモリリーク検出のためのフレームワークがリリースされました。

facebook.github.io

気になって調べたところチュートリアルを試していた方はいましたが、他にもスナップショットファイルの解析などmemlabが提供している機能があるようだったため、この機会に試してみた結果を書いていきます。

memlabとメモリリーク

memlabJavaScriptのヒープ解析とメモリリーク検出を行うためのフレームワークです。

JavaScriptにはガベージコレクションの機能があり、基本的に使い終わったメモリ領域は自動的に解放されていきます。しかし、プログラムの書き方によってはメモリが自動的に解放されず、メモリ使用量が増えていってしまう「メモリリーク」と呼ばれる問題が発生します。メモリリークの問題があるサイトは、操作しているうちにだんだん動作が重くなってしまい、場合によってはダウンしてしまうこともあります。

このように避けたい問題であるメモリリークですが、memlabのドキュメントに書かれているように、原因を探すのは大変な作業です。そのため、フレームワークを活用して原因調査を効率化しつつ、メモリリークの検出も自動化できるとよいですね。

memlabの機能

memlabのGitHubページでは以下の機能が書かれています。

  1. メモリリークの検知
  2. オブジェクト指向のヒープトラバースAPI
  3. メモリ最適化のためのツールボックス
  4. メモリ内容のアサーション

Angularのメモリリークを検出してみる

検出対象のWebアプリ

検証のためのアプリとして、Angular公式のチュートリアルアプリをベースに、setIntervalを消し忘れる問題を追加したWebアプリを作成しました。 問題を追加したのはheroes.component.tsで、以下の通りです。

export class HeroesComponent implements OnInit, OnDestroy {
  heroes: Hero[] = [];
  leakObjects: number[] = [];

  constructor(private heroService: HeroService) { }

  ngOnInit(): void {
    window.setInterval(() => {
      for (let i = 0; i < 1000; i++) {
        this.leakObjects.push(Math.random());
      }
    }, 100);
    this.getHeroes();
  }
...

このコードには「setIntervalで設定した定期実行スケジュールの解除を忘れているため、SPA内の別ページに切り替えてもfunctionオブジェクトが残ってしまう」という問題があります。

memlabを使った検出

memlabを使ってメモリリークを検出してみましょう。 まずは、memlabがメモリリークを検出する方式について説明します。memlabは以下の流れでメモリリークを検出します。

  1. ヘッドレスブラウザのChromiumを使ってテスト対象のページを開く ...①
  2. 新たにメモリを確保するような画面操作を行う ...②
  3. ページを開いた初期状態に戻るための画面操作を行う ...③
  4. それぞれのタイミングで取得したメモリダンプファイルを参照して、②のタイミングに増えたが、③のタイミングで減っていないようなメモリ領域を検出し、所定の条件(後述)を満たすかチェックする。

memlabがメモリリークと判断する条件は、以下の通りです。

  1. オブジェクトは action 関数でトリガーされたインタラクションによって割り当てられている
  2. オブジェクトは back 関数実行後も開放されていない
  3. オブジェクトは切り離された DOM 要素またはアンマウントされた React Fiber ノードである

次に実際に書いたテストコードを紹介します。今回は以下のようなテストコードtests/test-memory.jsを作成しました。

/**
 * シナリオとして最初に開くURLを書く
 */
function url() {
    return 'http://localhost:4200/dashboard';
}

/**
 * メモリリークを起こすきっかけとなる処理を書く
 *
 * @param page - Puppeteer's page object:
 * https://pptr.dev/api/puppeteer.page/
 */
async function action(page) {
    await page.click('a[href="/heroes"]'); // 問題のコンポーネントを表示
    await page.click('button.clear'); // 画面表示しているメッセージを削除
}

/**
 * action関数の結果をリセットするための処理を書く
 *
 * @param page - Puppeteer's page object:
 * https://pptr.dev/api/puppeteer.page/
 */
async function back(page) {
    await page.click('a[href="/dashboard"]'); // 初期状態のコンポーネントを表示
    await page.click('button.clear'); // 画面表示しているメッセージを削除
}

/**
 * リークしたオブジェクトかどうか判定する条件を書く
 *
 * @param node 判定対象オブジェクト
 * @param _snapshot スナップショット
 * @param _leakedNodeIds 判定対象オブジェクトのID
 */
function leakFilter(node, _snapshot, _leakedNodeIds) {
    return node.retainedSize > 1024 * 1024;
}

module.exports = { action, back, leakFilter, url };

このテストコードは、url関数、action関数、back関数、leakFilter関数で構成されています。それぞれの役割は以下の通りです。 url, action, back, leakFilterがそれぞれ、メモリリークを検出する方式の1, 2, 3, 4に対応しています。 今回見つけたい「解放されないfunctionオブジェクト」はmemlabが設定している初期条件を満たさないため、1MiB以上のオブジェクトを追加条件としました。

関数名 役割
url (必須)テスト対象のURL文字列を指定する
action (必須)新たにメモリを確保するような画面操作について記載する
back (必須)ページを開いた初期状態に戻るための画面操作について記載する
leakFilter (任意)リークと判断する追加条件を記載する

コンソールで npx memlab run --scenario .\tests\test-memory.js --work-dir memlab_resultsと実行すると、テスト結果が表示されます。 workディレクトリとして指定したmemlab_resultsにはスナップショットなどの結果が保存されます。 以下の図を見ると、メモリリークを検出することができました。

メモリリークを検出したテスト結果

memlabを使った原因調査

memlabを使って原因調査をしていきます。 テスト結果からは、3MBのクロージャが問題になっているということまでしか分かりませんが、 memlabのメモリ解析ツールを使うことでもう少し詳しい情報を得ることができます。 今回は以下のようなコード(analyze.js)を作成しました。

const memlab = require('memlab');

(async function () {
    const analysis = new memlab.ObjectSizeAnalysis();
    const result = await analysis.analyzeSnapshotFromFile("tests/s3.heapsnapshot");
})()

memlab workディレクトリの構成にあるs3.heapsnapshotファイルをtestsにコピーしてから実行しています*1。 初期状態に戻すback関数実行後である、s3.heapsnapshotの解析結果は以下のようになりました。

メモリ解析結果

問題のクロージャ内で参照している、leakObjectsという名前のArray変数が3MBで検出されています。 今回使ったObjectSizeAnalysisはスナップショットの中で大きいサイズのオブジェクトを表示してくれるため、問題の特定につなげることができました。 他にもドキュメントを見ると、いろいろな解析パターンがあるため試してみるとよいと思います。 例えばGlobalVariableAnalysisは、意図せずグローバル変数となってしまった問題を見つけるのに役立ちそうです。

修正後の確認

原因が分かったため、以下のようにHeroesComponentのデストラクタでclearIntervalを実行するようにして、テストを再実行してみましょう。

export class HeroesComponent implements OnInit, OnDestroy {
  heroes: Hero[] = [];
  leakObjects: number[] = [];
  private timer: number | undefined;

  constructor(private heroService: HeroService) { }

  ngOnInit(): void {
    this.timer = window.setInterval(() => {
      for (let i = 0; i < 1000; i++) {
        this.leakObjects.push(Math.random());
      }
    }, 100);
    this.getHeroes();
  }

  ngOnDestroy(): void {
    clearInterval(this.timer);
  }
...

以下のように、メモリリークが解決していることを確認できます*2

問題修正後のテスト結果

まとめ

memlabを使って、Angularアプリのコンポーネント内に含まれたメモリリークを検出しました。 また、memlabのスナップショット解析ツールを使って、メモリリークの原因を特定する作業の一例を紹介しました。変数名レベルまで出してくれるため、便利そうです。 メモリリークはいったん起きてしまうと、解析が難しく、時間もかかる問題であるため、フレームワークで調査を助けてもらえるのはありがたいですね。

参考サイト

  1. memlab
    https://facebook.github.io/memlab/
  2. memlab Docs
    https://facebook.github.io/memlab/docs/intro
  3. memlab GitHub
    https://github.com/facebook/memlab
  4. memlab を使って Web サイトのメモリリークを検出しよう
    https://zenn.dev/sa2knight/articles/analyze_javascript_heap_with_memlab
Acroquest Technologyでは、キャリア採用を行っています。
  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • Elasticsearch等を使ったデータ収集/分析/可視化
  • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
  少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。 www.wantedly.com

*1:analyzeSnapshotFromFileにmemlab_results/cur/s3.heapsnapshotと指定できればよかったのですが、なぜか読み込みエラーになってしまうためファイルを移動しています。もう少し調査が必要そうです

*2:完全に初期状態と同じメモリ使用量ではありませんが、全て元に戻せるとも限らないため、大きく増えていなければ問題なしとしています。