Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Amazon EventBridge Pipes の処理時間を確認してみた

最近、会社の人からプレゼントしてもらったビールグラスでビールを飲むのにはまっている、クラウドエンジニアのコガです。
ツマミも増えておなかの成長が止まりません。

この記事では、昨年の AWS re:Invent 2022 で発表された Amazon EventBridge Pipes について性能を確認した結果をまとめました。
Amazon EventBridge Pipes を利用することで簡単にサービス同士を結合することができ、
よりシンプルかつ短時間でのイベント駆動アプリケーション作成が可能になります。

Amazon EventBridge Pipes 公式ページより

一般的に、イベント駆動アプリケーションを実際に運用する場合、各イベントハンドラでの処理時間やイベントのトレーシングが重要です。
そこで今回は、Amazon EventBridge Pipes を利用して Amazon SQS と AWS Lambda を結合し、イベントのフィルタリング、エンリッチメント(情報付加)を行った際の処理時間の計測、AWS X-Ray との連携を試してみました。

Amazon EventBridge Pipes とは

イベントプロデューサーとコンシューマー間の統合を、簡単に作成するサービスです。
aws.amazon.com

通常、サービス同士を結合するためには結合のためのコード(グルーコード)を書く必要がありますが、
Amazon EventBridge Pipes を使えばグルーコードを書く必要がなくなり、より簡単にサービス同士を結合することができるようになります。

Amazon EventBridge Pipes は以下の4つの要素から成ります。

要素 説明
ソース(Source) イベントデータを受信する。
Amazon DynamoDB stream, Amazon Kinesis stream, Amazon SQS queue などをソースとして指定可能。
フィルタリング(Filtering) ソースからくるイベントをフィルタリングして、それらのイベントのサブセットのみを処理する。
エンリッチメント(Enrichment) ソースからのデータをターゲットに送信する前に、イベントの情報付加や変換、拡張を行う。
ターゲット(Target) パイプによって届けられるイベントを処理する。

検証の概要

Amazon SQS(ソース) → フィルタリング → AWS Lambda(ターゲット) の構成で作成したパイプ(SQSLambdaPipe)と、
SQSLambdaPipe の設定に加え、 エンリッチメントとして AWS Lambda を設定したパイプ(SQSLambdaEnrichmentPipe)を検証に使用します。
今回は、Amazon SQS に対してメッセージを送信する AWS Lambda を経由してパイプを動かし、フィルタリング処理にかかる時間を計測しました。


SQSLambdaPipe


SQSLambdaEnrichmentPipe

準備

ソースとして設定するキューを作成する

以下の手順を元に、SampleQue というスタンダードキューを作成します(設定はすべてデフォルト)。
docs.aws.amazon.com

Amazon SQS にメッセージを送る AWS Lambda の実装

SendToSQS
event から受け取ったメッセージを指定の Amazon SQS に送信する AWS Lambda を作成します。

import boto3
from logging import getLogger, INFO

from aws_xray_sdk.core import patch_all

QUEUE_URL = 'https://sqs.{region}.amazonaws.com/{account_id}/SampleQue'

patch_all() 

logger = getLogger()
logger.setLevel(INFO)

def lambda_handler(event, context):
    client = boto3.client('sqs')

    message = event['message']
    logger.info('Start sending message')

    response = client.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody='sqs test'
    )

エンリッチメント設定する AWS Lambda の実装

Enrichment
イベントの情報付加や変換、拡張を行う AWS Lambda を作成します。

from logging import getLogger, INFO

from aws_xray_sdk.core import patch_all

patch_all() 

logger = getLogger()
logger.setLevel(INFO)

def lambda_handler(event, context):
    message = event[0]['body']
    logger.info('Message was reached to enrichment.: message=%s' % message)
    
    enriched_message = f'{message} enriched.'
    event[0]['body'] = enriched_message
    
    return event

ターゲットに設定する AWS Lambda の実装

PipeTarget
メッセージを受け取るターゲットに設定する AWS Lambda を作成します。

from logging import getLogger, INFO

from aws_xray_sdk.core import patch_all

patch_all() 

logger = getLogger()
logger.setLevel(INFO)

def lambda_handler(event, context):
    message = event[0]['body']
    logger.info('Message was reached to target.: message=%s' % message)

EventBridge Pipes でのパイプの作成

SQSLambdaPipe, SQSLambdaEnrichmentPipe の二つのパイプを作成します。
二つのパイプの違いは「エンリッチメントの設定」を行うかどうかのみです。

パイプの作成

Amazon EventBridge のコンソールを開き、サイドバーから「パイプ」を選択します。
その後、「パイプの作成」を押下します。


パイプ名を入力し、「パイプを作成」を押下します。

ソースの設定

ソースに SQS を選択し、SQS キューに上で作成した SampleQue を選択します。

フィルタリングの設定

イベントパターンにて、プレフィックスマッチングを選択し、以下の JSON を記入します。
今回は、メッセージのプレフィックスに "sqs" という文字列が入っているときのみ、その後の処理を実行するよう設定しました。

{
  "body": [{
    "prefix": "sqs"
  }]
}

エンリッチメントの設定

AWS Lambda を選択し、機能に上で作成した Enrichment を選択します。
※設定はSQSLambdaEnrichmentPipeのみ

ターゲットの設定

ターゲットサービスに AWS Lambda を選択し、機能に上で作成した PipeTarget を選択します。

パイプの動作

SQSLambdaPipe の動作

フィルタリング処理を通過

①上で作成しした AWS Lambda SendToSQS にて、event に以下の JSON を設定して実行します。

{
  "message": "sqs test"
}

②CloudWatch Logs ログイベントにて、AWS Lambda PipeTarget の実行ログが記録されていることを確認します。
今回は、"Message was reached to target.: message=sqs test" というログが確認できたので、パイプは正常に動いているようです。

フィルタリング処理で除外

①上で作成しした AWS Lambda SendToSQS にて、event に以下の JSON を設定して実行します。
 ※プレフィックスに "sqs" を含まない

{
  "message": "test"
}

②CloudWatch Logs ログイベントにて、AWS Lambda PipeTarget の実行ログが記録されていないことを確認します。
 実行ログが記録されていなければ、フィルタリング処理が正常に動作し、メッセージがターゲットまで届かなかったことが確認できました。

SQSLambdaEnrichmentPipe の動作

①上で作成しした AWS Lambda SendToSQS にて、event に以下の JSON を設定して実行します。

{
  "message": "sqs test"
}

②CloudWatch Logs ログイベントにて、AWS Lambda PipeTarget の実行ログが記録されていることを確認します。
今回は、"Message was reached to target.: message=sqs test enriched." というログが確認できたので、
エンリッチメントの処理も含めてパイプは正常に動いているようです。

処理時間を検証

SQSLambdaPipe, SQSLambdaEnrichmentPipe それぞれにおいて、
Amazon SQS にメッセージを送る AWS Lambda(SendToSQS) がメッセージを送信する直前から、
ターゲットに設定する AWS Lambda(PipeTarget) がメッセージを受信するまでの経過時間を計測します。
それぞれのパイプに対して、5回メッセージを送った際の平均を計測しました。
※今回は、AWS Lambda のコールドスタートが発生した処理は計測対象から外しています。

パイプ名 処理時間平均(秒)
SQSLambdaPipe
Lambda(メッセージ送信) → SQS → フィルタリング → Lambda(ターゲット)
0.238
SQSLambdaEnrichmentPipe
Lambda(メッセージ送信) → SQS → フィルタリング → Lambda(エンリッチメント) → Lambda(ターゲット)
0.268

それほど大きなタイムラグなく、フィルタリング、エンリッチメントが実行できているようです。

AWS X-Ray との連携

EventBridge Pipes のコンソールからは、現状 AWS X-Ray の設定はできないようなので、
各 Lambda にて X-Ray SDK を使用して AWS X-Ray の設定を行いました。
AWS Lambda の以下の部分が、X-Ray との連携用のコードになります。
X-Ray SDK は標準では組み込まれていないので、今回は Lambda Layers を使用して読み込むよう設定しました(設定方法の詳細は割愛)。

from aws_xray_sdk.core import patch_all

patch_all() 

Service Map の結果は以下のようになりました。
※残念ながら、現状 EventBridge PipesAWS X-Ray とは連携していないようなので、処理全体の流れをサービスマップにて表示することはできませんでした。

Amazon SQS での処理に 平均 0.160 秒かかっているようです。
さきほどの結果を踏まえると、フィルタリング処理にかかっている時間(コールドスタート発生時を除く)は、約0.08秒ほどとなり、
大きなタイムラグなくフィルタリング処理できているようです。

まとめ

Amazon EventBridge Pipes を利用することでソースとターゲットの結合や、フィルタリング、エンリッチメントが
とても簡単に実現できることがわかりました。

フィルタリングやエンリッチメントの処理も大きなタイムラグなく実行でき、
実際のイベント駆動アプリケーション構築の際にも利用できそうです。

ただ、現状は作成したパイプの定義をエクスポートすることができないので、
コンソールからパイプを構築した場合、IaCとして、定義を構成管理するのが難しそうです。
また、運用するうえでは AWS X-Ray との連携が簡単に設定できるようになるとより便利になりそうです。
この辺りは、さらなるバージョンアップを期待したいですね。それでは。

Acroquest Technologyでは、キャリア採用を行っています。
  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • Elasticsearch等を使ったデータ収集/分析/可視化
  • マイクロサービス、DevOps、最新のOSSクラウドサービスを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
  少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。 www.wantedly.com