Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Amazon EventBridge Schedulerの使いどころを整理してみた

Amazon/楽天ブラックフライデーで例年通り散財した、@kojiisdです。
今回、光学30倍ズーム、という謳い文句に惹かれて、10年ぶりくらいにパナソニックのコンデジを買ってしまいました。
これから写真を撮るのが楽しみになりそうです(^^
みなさんは散財しましたか?

さて、アクロクエストでは、エンジニアたちが仕事の中で気になっていたことを調査・検討・検証した際に、
その内容を社内勉強会でフィードバックしています。
今月は12月ということで、そのような調べたことを、アドベントカレンダーとしてブログに公開していくことにしました~!

初日はタイトルにもあるとおり、「Amazon EventBridge Schedulerの使いどころを整理してみた」です。

Amazon EventBridge Schedulerとは

AWSでイベント/スケジュールドリブンな処理の管理、というとAmazon EventBridgeが代表的ですが、先日、スケジュール機能に特化したAmazon EventBridge Schedulerが発表されました。
スケジュールベースのイベントトリガ管理を行う場合、従来のEventBridgeでは、Rulesという機能を使ってcronやrateを使用しつつ処理実行のタイミングを指定していましたが、以下のような制約がありました。


1. UTCベースのスケジュール指定が必要(誤ってJSTのつもりで時刻を設定してしまって、失敗した苦い経験ないですか?私はあります)
2. 実行できる機能は20程度のAWSサービスのみ


これらのスケジュール指定方法、実行サービスの幅などが広がったのがEventBridge Schedulerになります。



EventBridge Rulesとの比較は主に以下のようになっています。(抜粋)

Amazon EventBridge Scheduler Amazon EventBridge Rules
スケジュール上限 アカウント毎に100万 アカウント内リージョン毎に300
イベント実行スループット 秒間1,000トランザクション 秒間5トランザクション
指定可能なターゲット AWS SDKを使用した270以上のサービスと6,000以上のAPI 20以上のターゲット
時刻指定方法とタイムゾーン at(), cron(), rate()に対応
すべてのタイムゾーンとDST(サマータイム)に対応
cron(), rate()に対応
タイムゾーンUTCのみ、DSTはサポート外
1回限りのスケジュール 指定可能 指定不可
ウィンドウ時間を指定した設定 指定可能 指定不可

参考:Introducing Amazon EventBridge Scheduler | AWS Compute Blog


これを見ただけでも、より細かなスケジュール設定が可能になった、ということがわかりますね。


ちなみに従来のEventBridge Rulesでも、スケジュールベースのイベント作成はできますが、現在AWSコンソールにアクセスすると、下図のようにEventBridge Schedulerへの遷移を促されるので、今後スケジュールドリブンなトリガ管理はEventBridge Scheduler、イベントドリブンなトリガ管理はEventBridge Rules、と棲み分けがされていくかもしれないですね。

EventBridge Rulesでスケジュールドリブンイベント作成をしようとすると、EventBridge Schedulerに移動を促される

また、特に今回、大きな機能追加になりそうなものは以下かな、と感じました。

EventBridge Schedulerの特長的な機能追加

詳細を挙げればキリがないですが、私として気になった大きな機能追加は主に以下の3点ですね。

1. ウィンドウ時間(フレックスタイムウィンドウ)の指定が可能になった

予定した起動時刻に対して、指定した時間内の任意のタイミングに処理を起動させることが可能になりました。
これにより、同時刻に大量の処理が発生してしまいAWSのクォータ(制限)に引っかかりにくくすることができるようになりました。
例:12:00起動に設定してフレックスタイムウィンドウを15分と指定すると、12:00 - 12:15の間のどこかで実行されることになります。

2. 1回限りのスケジュールの指定が可能になった

今までのEventBridgeでは、基本的に「定期処理」または「イベントドリブンな処理」での指定が必要でした。
今回のリリースで、単発の実行のみを行う設定が可能になりました。(EventBridge Rulesでも1回限りの指定をするような小技はありますが、Rulesのスケジュールドリブンなイベント管理はあくまで「定期実行」が考え方のベースになっています。)
これにより、1度のみ実行すれば十分、というようなユースケースに対応することができるようになりました。

3. 直接AWSサービスのAPIコールをすることが可能になった

従来のEventBridge Rulesでは、どんなにシンプルな処理であっても、AWSサービスのAPIを呼び出すためにはLambdaを一度経由するなど、ワンクッションを置く必要がありました。
例:指定した時刻に、定期的に決まったDynamoDBのレコードのある値を書き換える。

EventBridge Schedulerでは、6000以上ものAWSサービスのAPIが直接コールできるようになったので、今までAPIを呼ぶためだけにLambdaを準備していたようなユースケースに対して、開発が不要になりました。

EventBridge Schedulerの使いどころを整理してみた

これらの機能追加を踏まえて、今回のサービスリリースで、どんなことが可能になりそうか、整理してみました。

1. 毎日、運用中のサービスの状態チェックを行う
2. 「1回限りの実行」を使用し、夜間のデータ移行処理を実行する
3. 短期間で、大量の処理を分散させて終わらせる
4. 業務時間外のEC2サーバーへのアクセスを制御する

1. 毎日、運用中のサービスの状態チェックを行う

運用しているサービスの状態チェックは、定期的に行っていると思います。このような定期的な処理は、従来のEventBridge Rulesでも指定可能でしたが、もちろんEventBridge Schedulerでも実現が可能です。EventBridge Schedulerでは、実行タイミングの指定時に、タイムゾーンの指定が可能になっているので、毎回UTCに変換した時刻を考える必要もなくなります。(従来のEventBridge Rulesでは、時刻指定の際にはUTCタイムゾーンが固定で指定されていました。)

タイムゾーンをPSTで指定した場合の実行時刻サンプル


2. 「1回限りの実行」を使用し、夜間のデータ移行処理を実行する

サービスの更改などによるデータ移行が必要になった場合、基本的にサービスに影響を出さないように、夜間に実施するケースが多いですが、EventBridge Schedulerで1回限りの実行イベントを定義すれば、夜間の指定した時間帯にデータ移行処理を実行することが可能になります。

深夜3時(日本時間)に1回限りの実行を行う


3. 短期間で、大量の処理を分散させて終わらせる

例えばLambdaで実施される処理を大量に並列で実行し、短期間で結果を得たい、というユースケースの場合、分割の粒度や実行タイミング、並列度を考えながら、Step Functionsを組むことがあると思います。

大量の処理を並列に実施するワークフロー


この場合、前処理のLambdaはどうしても実装が必要になってしまいますし、メイン処理も同時タイミングでの実行はサービスへの負荷が上がってしまい、サービスの運用に影響を出す恐れがあるため、実行タイミングにばらつきを発生させるよう、Step Functionsの設定で並列度を調整したり、場合によってはメイン処理のLambdaにWait処理を入れることがあると思います。

このようなユースケースの場合、フレックスタイムウィンドウを使って処理を分散させることが可能になります。

定期的なスケジュールの選択の上で、フレックスタイムウィンドウを「1時間」に設定すれば、指定した「毎日1時(JST)~2時」の間のいずれかで実行するイベントの作成が可能になり、ばらつきを制御する苦労から解放されます。

フレックスタイムウィンドウを利用した処理のばらつきの実現


4. 業務時間外のEC2サーバーへのアクセスを制御する

たとえばEC2サーバーで運用している社内サービスが以下の要件で稼働しているとします。
1. 業務時間中は、Webアクセスはすべて受け付ける。
2. 業務時間外は、メンテナンス用のIPアドレス以外からのアクセスを遮断する。

このような場合、今まではLambdaによりEC2のセキュリティグループを定期的に変更する処理の実装が必要でした。EventBridge Schedulerを利用すると、EC2のModifyInstanceAttribute API直接コールするように設定することで、Lambdaの実装なしに要件を満たすことが可能となります。

特定のAPIをEventBridge Schedulerから直接コールすることが可能

まとめ

今回、Amazon EventBridge Schedulerの用途について整理をしてみました。スケジュールドリブンなイベント管理の機能強化により、
さらに効率的にサービスの開発や運用ができそうですね。私もより深く調べて、活用していきたいと思います。


Acroquest Technologyでは、キャリア採用を行っています。


  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • Elasticsearch等を使ったデータ収集/分析/可視化
  • マイクロサービス、DevOps、最新のOSSクラウドサービスを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。


www.wantedly.com



ElasticON Tokyo 2022 最速レポート

この記事は Elastic Stack (Elasticsearch) Advent Calendar 1日目の記事です。

こんにちは、@shin0higuchiです😊

昨日11/30(水)に、ElasticON Tokyo 2022 が開催されました。
今年は2019年以来、3年振りのオフライン開催で、私もウェスティンホテル東京の現地に行って参加してきました。

午前中は基調講演やゲストスピーカー落合陽一氏の講演などが、
午後は2並列でテクニカルセッションとユーザ事例のセッションがありました。

私はテクニカルセッションを中心に聴講していましたが、
今年はクラウドベンダー各社との連携が強まった点が印象的で、GoogleMicrosoftAWSのそれぞれから発表がありました。

今回参加した中でも特に印象的だったセッションは「サーバーレスアーキテクチャへの道」という発表です。
Elasticsearch社のプリンシパルソリューションアーキテクト古久保さんによる発表で、Elasticsearchのアーキテクチャの変遷と今後の展望について話されました。

このブログでは、このセッションについて詳しく紹介します。

「サーバーレスアーキテクチャへの道」

概要

セッションの動画等は公開されていませんが、下記のブログが元ネタとなっています。
www.elastic.co

ブログでは現在のアーキテクチャと、今後のアーキテクチャの2つに分けて違いが説明されていますが、セッションの中ではさらに細分化して4つに分けて説明されました。

  • Act-one:Elasticsearch初期のアーキテクチャ (シャード分割)
  • Act-two:現在のアーキテクチャ(Data Tierを用いた時系列データ管理)
  • Act-three:ステートレス
  • Act-four:サーバレス/フルマネージド


こちらが、ブログで説明されている画像ですが、Existing ArchtectureがAct-two、New ArchitectureがAct-threeに相当することになります。

https://www.elastic.co/jp/blog/stateless-your-new-state-of-find-with-elasticsearchより

話の趣旨としては、現在のAct-twoからAct-threeを目指して移行中で、更に先の将来はAct-fourを目指したいということでした。

Act-two (Existing Architecture )の特徴/課題

Act-twoでは、いわゆるHot-Warm Architectureとも言われるように、データの鮮度に応じてノード間を移動させ、新しいデータ程良いスペックのマシンに載せることで従来よりもコストを抑えつつパフォーマンスを維持することができていました。さらに最近ではレプリカをオブジェクトストレージに配置する Searchable Snapshot 機能も公開されています。
しかし、課題として下記の点が挙げられます。

  • レプリカを保持するためストレージコストがかかる
  • ノード間のデータ移動が頻繁に発生し、また、移動に時間がかかる
  • 検索とインデクシングが同じリソースを共有するので、互いの性能に影響する
Act-three (New Architecture)の特徴

Act-twoの課題を改善すべく、新たに「ステートレス」な構造が提唱されています。
ポイントとしては、データ投入と検索を分離することで、リソースをより効率化できる点です。

書き込み時には、書き込み用のノードがオブジェクトストレージに書き込み、検索時には検索用のノードがオブジェクトストレージから読み込む。こうすることでAct-twoで挙がっていた課題が悉く解消される形になります。現行のアーキテクチャでは、データ投入と検索がCPUリソースなどを食い合うので、必要スペックの正確な見積もりが困難でしたが、その辺りも解消される形になりそうです。

オブジェクトストレージからの検索になったことで検索速度の低下が懸念されることと思いますが、キャッシュを上手く使うことで速度を改善しているとの説明でした。初回の検索が遅くなってしまう等はあるかと思うので、場合によってはSearch Tier側の暖機運転が必要になるかもしれませんが、その辺りはElastic社としても検討中かと思いますので今後の情報が楽しみです。

# Act-fourのフルマネージドはもう少し先の話になると思いますので、ここでは割愛します

まとめ

Elasticsearchを触って8年近くになりますが、時代に合わせて日々変わって来ているのを感じます。
特にクラウド連携や、アーキテクチャの変化など、当初のコンセプトから切り替わるダイナミックな変更もありました。
今後の動向から目が離せません。

今回の記事は以上となります、最後までお読みいただきありがとうございました。


Acroquest Technologyでは、キャリア採用を行っています。

  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • Elasticsearch等を使ったデータ収集/分析/可視化
  • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
世界初のElastic認定エンジニアと一緒に働きたい人Wanted! - Acroquest Technology株式会社のデータサイエンティストの採用 - Wantedlywww.wantedly.com

新しいデータ基盤アーキテクチャである「データレイクハウス」について調べてみた

最近ソーダストリームを買い、炭酸水を飲むのにはまってます。機械学習エンジニアの@yktm31です。

以前に「AWS Lake Formationでデータレイク体験!」という記事を書いてみて、データ基盤アーキテクチャに興味が湧いてきました。

データレイクハウスは、「データウェアハウス」と「データレイク」を統合したようなアーキテクチャで、 2020年にDatabricks社により提唱され、新しいデータ基盤アーキテクチャとして注目されているようです。

www.databricks.com

そこで今回、「データレイクハウス」について調べてみたことをまとめてみたいと思います。

なぜデータレイクハウスが注目されているのか?

データ基盤アーキテクチャは、以下のような変遷をたどっていて、データレイクハウスが最新のアーキテクチャという流れのようです。 それぞれの登場時期、利用用途を以下にまとめて見ました。

アーキテクチャ 登場時期 利用用途
データウェアハウス 1980年代 ビジネス意思決定のためのデータ分析 (BI)
データレイク 2000年代 非構造化データを使う機械学習 (ML)
データレイクハウス 2020年 BIとMLの両立

最新のデータ基盤アーキテクチャであるデータレイクハウスの必要性を理解するために、 まず、データウェアハウスとデータレイクそれぞれの特徴・課題を見ていきたいと思います。

データウェアハウスの特徴・課題

データウェアハウスのアーキテクチャ

データウェアハウスは、ビジネス意思決定のためにデータを一つの場所に集約・保管し、分析に利用するためのデータ基盤アーキテクチャで、1980年代に考案されたそうです。 (私が生まれたのは1997年なので、生まれる前からこうしたデータ基盤アーキテクチャが考えられていたのは驚きでした。) 例えば、「Amazon Redshift」はデータウェアハウスを提供しているサービスとして有名だと思います。

データウェアハウスの特徴として、以下のような点が挙げられていることが多かったです。

  • 複数のソースから取得したデータにETL処理を施し、構造化データを蓄積する。
  • スキーマが決まっていて、トランザクションもサポートされることでデータ一貫性が担保されている。
  • 権限管理が行え、データへのアクセスコントロールが効く。

しかし反面、ビジネスで扱うデータの多様化により、データウェアハウスだけでは新たなニーズに応えられなくなっていったそうです。 とくに、非構造化データを使う機械学習の発展に、データウェアハウスのスキーマが決まっているという点が大きな制約となったようです。

データレイクの特徴・課題

データレイク + データウェアハウスのアーキテクチャ

画像やテキストなど非構造化データの分析という新たなニーズに合わせ、データレイクというデータ基盤アーキテクチャが2000年代に入り誕生したそうです。

例えば、「Amazon S3」や「Azure Data Lake Storage」といったクラウドストレージシステムと周辺サービスを組わせるとデータレイクとなります。

データレイクの特徴として、以下のような点がよく挙げられています。

  • 未加工のデータが格納され、主にデータ分析や機械学習で利用されることが多い。
  • データの形式に制限がなく、テキストデータやセンサーデータ、画像、テキストなど、構造化データ以外のデータも格納できる。

他方、以下の2点がリスク・課題としてもよく挙がります。

  • データレイクはトランザクションはサポートされないためデータ品質が保証されない。
  • 自由にデータが配置できる反面、本当に必要なデータが見つかりづらくなる、データスワンプとなるリスクがある。

トランザクションがサポートされないというデータレイクの欠点を補う必要がある場合は、データレイクとデータウェアハウスを併用するようです。

データレイクのデータを直接扱うだけでなく、ETL処理を施しデータウェアハウスにデータを登録することで、トランザクションを実現する試みです。 しかしその方式では、データ一貫性を保つのが難しいなど、デメリットも残っている現状のようです。

ここまでで、データウェアハウス、データレイクの比較をまとめてみました。

特徴 データウェアハウス データレイク
扱えるデータ形式 構造化データのみ、加工・整形済みデータ 構造化データ、半構造化データ、非構造化データ、生データ
代表的な利用用途 BI (過去データの分析) ML (将来予測)
データ一貫性 ×
データの自由度 ×
コスト データ保持・構築ともにコストがかかる データサイズに対して比較的安価
主な利用者 ビジネスアナリスト データサイエンティスト

データレイクハウスの特徴

データレイクハウスが登場したのは、データウェアハウス・データレイクそれぞれの特徴を活かしつつ、 課題をうまく解決するためでした。

まず、データレイクハウスはどのような特徴を持ち、既存の課題をどう解決したのかをまとめてみたいと思います。

  • ACID トランザクションサポート
  • データレイクの問題点として、トランザクションサポートが無いことで、データ一貫性が保てないことがありました。 その原因の一つとして、トランザクションがサポートされていないという点がありました。 データレイクハウスでは、データレイクをメタデータを扱うレイヤーを重ねることで、 データレイク内のデータを直接扱うのではなく、メタデータを操作する事で扱っていきます。 この機構により、ACIDトランザクションを実現し、データ一貫性の欠如という課題を克服できたようです。
  • データのアクセス権限管理
  • データウェアハウスでは実現でき、データレイクでは実現できなかった、データのアクセス制御を行えます。 テーブル単位のみならず、列/行単位でのきめ細やかな権限管理をサポートします。
  • parquetなどのオープンなファイルフォーマットを利用
  • Apache Parquet, Delta Lake, Apache Hudiなどのオープンソースのデータフォーマットを利用することで、データアクセスの方法を標準化できるというメリットがあるようです。
  • 安価なストレージ
  • データレイクのメリットとして、様々なデータ形式を、安価なストレージで扱えるという点がありました。 データレイクハウスでは、データレイクのこの良い面を受け継いでいます。
  • BI・データ分析・機械学習をサポート
  • データレイクハウスでは、BIはデータウェアハウス、機械学習はデータレイク、という区別なくデータを扱うことができるようになります。

    データレイクハウスのアーキテクチャ

    ここからは、データレイクハウスのアーキテクチャについて具体的に見ていきます。 データレイクハウスのアーキテクチャは、概ね以下のようなレイヤー構造からなっているようです。

  • Data Ingestion Layer
  • Storage Layerにデータを取り込む役割を担う。
  • Storage Layer
  • 高い耐久性とスケーラビリティを持ち、コストの安いデータ保管を担う。構造化/非構造化データを格納する。 データの加工度に応じて、データ格納のレイヤーを分ける、メダリオンアーキテクチャ(※1)を採用するケースが多い。
  • Metadata Layer
  • Storage Layerに存在するデータのメタデータを扱う役割を担う。 データカタログを作成したり、トランザクション・権限管理を実現する。Catalog Layerと表現されることもある。
  • Processing Layer
  • データ分析や機械学習などで利用可能な形にするための、データの検証・変換を担う。
  • Data Consumption Layer
  • SQLやBI、機械学習など、データを利用する層。Serve Layerと表現されることもある。

    データレイクハウスアーキテクチャ

    ※1 メダリオンアーキテクチャ

    メダリオンアーキテクチャとは、データレイクのデータ配置を、データの質によって3つのレイヤーに分けるアーキテクチャということらしいです。 3つのレイヤーはそれぞれ、Bronze、Silver、Goldとなっています。

  • Bronze Layer
  • あらゆるデータソースからデータを取り込み、未加工の生データを配置。
  • Silver Layer
  • Bronze Layerのデータから、ジョイン、フィルターなどの処理を施し、マスターテーブルなどのクレンジングされた中間データを配置。
  • Gold Layer
  • レポート作成などに必要なレベルで集約されたデータを配置。プロジェクトのユースケースごとに専用のデータベースとなり、データマートとも呼ばれる。

    メダリオンアーキテクチャ

    Azure

    Azureでドキュメントで提示されているデータレイクハウスアーキテクチャを見ていきます。

    https://learn.microsoft.com/ja-jp/azure/architecture/example-scenario/data/synapse-exploratory-data-analytics

    Azureのこのレファレンスアーキテクチャのポイントを3つ挙げてみました。

    • Azure Synapse Analyticsを中心としたアーキテクチャ
    • Azure Synapse サーバーレス SQL プールとPowerBIによるデータ可視化
    • Azure Machine Learningを利用した機械学習利用

    Azure Synapse Analyticsを中心としたアーキテクチャ

    Azure Synapse Analyticsとは、データウェアハウス・ビッグデータ解析を統合したデータ分析プラットフォームです。 Azure Synapse Analyticsを利用することで、SQL / Apache Sparkによるデータ分析が用意に実現できるようになります。

    Azure Synapse サーバーレス SQL プールとPowerBIによるデータ可視化

    Azure Synapse サーバーレス SQL プールを利用し、Azure Data Lake Storageに保存されたParquetファイルに対し、 SQLベースでデータ探索を行い、Power BIに結果を可視化します。

    Power BIとSynapseを連携する際、Power BI Linked Serviceで直接連携することも可能ですが、 その方式では、ワークスペースが1つのみに制限されてしまいます。 そのため、サーバーレス SQL プールを利用することが一つのポイントになります。

    Azure Machine Learningを利用した機械学習利用

    Azure Machine Learningを利用し、推論した結果をデータレイクに保存し、PowerBIで可視化することが出来ます。 学習するデータの取得、整形のためにはAzure Synapse サーバーレス Apache Spark プールが利用できます。 Azure Machine Learningによる推論結果は、データレイク中のゴールドレイヤーに保存されることになります。

    参考

    learn.microsoft.com

    Azure Databricksを利用した代替案

    ここまで、Azure Synapse Analyticsを利用したデータレイクハウスアーキテクチャを見てきましたが、 代替アーキテクチャとして、Azure Databricksを利用する方法もあるようです。

    Azure Databricksは、SQL / ML / データ分析をするためのプラットフォームです。 Azure Databricksを利用したデータレイクハウスのアーキテクチャは以下のドキュメントが参考になりそうでした。 learn.microsoft.com

    AWS

    次に、AWSにおけるデータレイクハウスのアーキテクチャを見ていきます。ここでは、AWS Big Data Blogで提示されているアーキテクチャを参考にしていきます。

    https://aws.amazon.com/jp/blogs/big-data/build-a-lake-house-architecture-on-aws/

    AWSのこのレファレンスアーキテクチャのポイントをまとめると、以下のような点が分かりました。

    • メタデータレイヤーとして、AWS Lake Formationを利用
    • Amazon S3Amazon Redshiftのネイティブな連携
    • Amazon Athena / Amazon QuickSight / Amazon SageMakerなどのサービスから、統一されたインターフェースでデータ利用できる

    メタデータレイヤーとして、AWS Lake Formationを利用

    AWS Lake Formationは、データの行/列レベルの権限管理や、ACIDトランザクションを提供しています。 Amazon S3Amazon Redshiftに格納されているデータ本体と、メタデータをレイヤーとして分離することで、スキーマオンリードを実現します。

    データレイク単体では、データのスキーマが変更されても、変更を検知することは難しいです。 そのため、読み込むデータがどのようなスキーマか確認することが必要になり、このスキーマオンリードの考え方が重要になってくるようです。

    また、データレイクは、データのスキーマが変わることが考えられますが、 AWS Glueを通じて、中央データカタログは更新されていくので、変更に追従できるということのようです。

    Amazon S3Amazon Redshiftのネイティブな連携

    データをBIで利用する場合、整理された構造化データが分析パフォーマンスの上で重要になります。他方、機械学習では非構造化データを扱うことになります。

    例えば、大量の履歴データがデータレイク(Amazon S3)に保管され、BIでの分析対象分のデータは、データウェアハウス(Amazon Redshift)にAmazon Redshift Spectrumを通じて展開、 そして、逆にデータウェアハウス(Amazon Redshift)で不要になったデータは、データレイク(S3)にオフロードするという使い方ができるらしいです。 データウェアハウス(Amazon Redshift)からデータレイク(Amazon S3)にオフロードされたデータも同じクエリでアクセスすることができるようです。

    Amazon Athena / Amazon QuickSight / Amazon SageMakerなどのサービスから、統一されたインターフェースでデータ利用できる

    以下のようなサービスから、データレイクハウスのデータを扱うことができます。

    サービス 用途
    Amazon Athena SQL
    Amazon QuickSight BI
    Amazon SageMaker ML
    Amazon Redshift Spectrum データ分析

    まとめ

    機械学習ビッグデータ系のプロジェクトをやっているとき、データをどうやって管理する?という課題がよく出てくる実感があります。 今回見てきたデータレイクハウスは、ぜひデータ管理アーキテクチャの設計に活かしていきたいと感じました。

    ただ、データレイクハウスは、自前で0から構築するとなると、かなり大変そうな印象を受けました。 なので、クラウドサービスで簡単に構築できそうなところは、有難いなと思いました。

    Acroquest Technologyでは、キャリア採用を行っています。
    • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
    • Elasticsearch等を使ったデータ収集/分析/可視化
    • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
    • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
      少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。 www.wantedly.com

    Elastic Cloudの紹介 ~Elastic Cloud上に簡単デプロイ編~

    こんにちは、Elastic認定資格3種(※)を保持しているノムラです。
    ※Elastic社の公式認定資格(Elastic Certified Engineer / Elastic Certified Analyst / Elastic Certified Observability Engineer)

    皆さんはElastic Stackを構築するにあたってElastic Cloudを利用されたことはあるでしょうか?
    Elastic Cloudは、Elastic社が提供しているSaaSサービスです。クラウドプロバイダはGCPAWS、Azureをサポートしています。
    最新バージョンのクラスタ構築や、既存クラスタのバージョンアップを数クリックで実施できるため、導入がお手軽です。

    本記事では、Elastic Cloudの無料トライアルを利用して、Elastic Stackをデプロイする手順を紹介します。

    目次は以下です。

    前提

    本記事では、Elastic Stackである

    • Elasticsearch
    • Kibana
    • Logstash

    については、説明をいたしません。各プロダクトの詳細は公式ホームページをご参照ください。
    www.elastic.co

    アカウントを作成する

    Elastic Cloudは2週間の無料トライアルが利用可能です。
    方法はメールアドレスを登録するのみです。


    Elasticクラスタをデプロイする

    デプロイメント名、クラウドプロバイダ等を設定する

    以下を設定します

    1. デプロイメント名(任意の文字列でOK)
    2. クラウドプロバイダ(今回はAWSを選択)
    3. リージョン(今回はOhioを選択。Tokyoリージョンも選択可能です)
    4. ハードウェアタイプ(Storage Optimizedを選択。詳細はこちら
    5. Elastic Stackバージョン(最新を選びましょう)

    入力後、「Create deployment」を押下することで、Elastic Stackの構築が開始されます。

    さっそく利用!

    数分で構築が完了し、デプロイされたKibanaの画面に遷移します

    標準で付属のサンプルデータ/ダッシュボードをKibanaから1クリックでインストールしてみました。

    数クリックするだけで構築が完了し、利用可能になりました。
    ここまで5分程度です。お手軽ですね!

    まとめ

    Elastic Cloudを利用することで、非常に簡単にElastic Stackが構築可能です。

    追記

    またElasticCloudではセキュリティ設定として、ユーザ認証に加えて、Private Link接続やIPフィルタリングが可能です。
    次回以降で紹介したいと思います。お楽しみに。

    それでは。

    Acroquest Technologyでは、キャリア採用を行っています。

    • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
    • Elasticsearch等を使ったデータ収集/分析/可視化
    • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
    • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長

     
    少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

    世界初のElastic認定エンジニアと一緒に働きたい人Wanted! - Acroquest Technology株式会社のデータサイエンティストの採用 - Wantedlywww.wantedly.com

    ECCV2022参戦記〜Text/ImageでCLIPが活躍〜

    皆さんこんにちは
    当社のデータサイエンスチームYAMALEXのチームリーダの@tereka114です。
    最近までリフレッシュ休暇制度を使って箱根を満喫しながら、ECCV2022に参加していました。その学会の感想を記載します。

    ECCVとは

    ECCV(European Conference on Computer Vision)はコンピュータビジョンのトップカンファレンスの一つで、CVPR、ICCVと並んでよく紹介される学会です。
    今年はイスラエルで開催されており、Virtual/現地のハイブリッド開催となりました。(私はVirtual参加)
    Vitualのみと異なり、ビデオの途中で現地の会場が見えるだけでも、ライブ感が少し増しており、良かったです。

    ECCV2022のスケジュールは次のとおりです。

    10月23日、24日:Turtorial/Workshop
    10月25日〜27日:本会議

    学会は日本時間15時〜休みありで24時ぐらいまでです。

    今年のカンファレンス

    25日の夜にOpeningが開催されました。Openingでは、学会参加者、論文の傾向が話されます。
    まずは、参加者の国籍です。アメリカ中国はこれまでの他の学会と変わらず多いです。また、私の予想とは異なりますが、Virtual参加よりも現地参加の方が人数が多いことに驚きました。

    論文数の紹介です。こちらも数が多く、Acceptが1488件でした。
    非常に多くの論文が採択されており、相変わらず、人一人で見れる量を遥かに超えていました。


    面白かった論文の紹介

    面白かった論文の概要を紹介していきます。これらの論文は全てOralでの採択のものです。
    興味が出てきたものはぜひ、論文も読んでみてください。このあとの各論文タイトルがリンクになっています。

    Revisiting a kNN-based Image Classification System with High-capacity Storage

    画像の分類を特徴量を抽出して、kNN(K近傍法)で実施するシステムの提案です。
    推定させたいクラスの画像から事前にモデルの途中の出力である特徴量を抽出しておき、ストレージに入れます。
    新しい画像も同様に特徴量を抽出し、ストレージに含まれる画像の特徴量と比較し、クラスを推定するものです。(図参照)

    通常の分類では、学習して、推論して、どのクラスが最も良かったのかを直接推論するため、一般的にはkNNより精度が高くなります。
    しかし、画像の特徴量を用いてkNNで推論するのも異なった良さがあり、論文中で述べられています。

    1. 特徴量さえ取り出せれば良いので、新規クラスが追加された際に、新規学習が必要ない。
    2. kNN手法は再学習不要のため、再学習した際に過去学んだことを忘れる破壊的忘却(Catastrophe Forgetting)を回避できる。
    3. 解析が簡単。例えば、推論時に近しい画像がどれかがわかるので、ラベルミスなど、なぜ、判定を失敗したのかがわかりやすくなる。

    論文を読んだ感触として、特にクラス分類の対象が動的に変化するようなシステムだと、kNNによるクラス分類のほうが安定して運用ができそうです。

    The Challenges of Continuous Self-Supervised Learning

    自己教師あり学習を継続的な学習(概ねストリーム学習)で実現する試みを論文にまとめたものです。
    通常の学習だとデータが既に決まっており、そのデータを繰り返し学習して精度をあげます。
    今回の問題はストリーム的なデータの流れ方をするものを対象としており、同じデータは一度しか流れてこないといったものが制約になります。
    この問題を論文中で、学習効率(Trainining Efficiency)、データ間に関連があるもの(Non-IID)、長期間に渡る運用(Lifelong)の3つの項目で議論されています。

    それぞれの課題でBufferと呼ばれる流れてくるデータを一時的に蓄積する仕組みを利用して従来のものよりも精度を向上させています。

    1. データをBufferに蓄積する。このBufferからデータを取り出し、繰り返し学習させることでデータを有効に使い、精度を向上させた。(論文中でBuffered Replayと呼ばれている)
    2. Minimum Redundancy Bufferと呼ばれる最も類似度が小さいデータを省き、新規にデータをBufferに蓄積する方法の提案により
      Non-IID(データ間に関連のあるもの)Data/Lifelongでの精度を向上させた。

    これらの考え方は、自己教師あり学習のみならず、データの傾向が日々変わるようなもの(ECサイトなど)の機械学習の運用で利用できる考え方だと思います。

    Text2LIVE: Text-Driven Layered Image and Video Editing

    画像/動画をテキストの内容を用いて変換させるといった問題を解きます。
    従来手法では、関係のないオブジェクトがテキストにより変換されたり、変換が必要な部分にマスクをかけないといけないといった問題がありました。
    本手法では、マスク不要での入力画像の変換を高精度で成功しており、かつ、ビデオへの拡張も行っています。

    Generatorで編集する画像をRGBAで生成し、それをオリジナルの入力画像で重ね合わせることにより生成します。
    Generatorの学習は主に次の誤差が利用されます。画像/テキストの表現学習であるCLIPの事前学習済のモデルが多く活用されているのが個人的に面白いポイントです。

    • 最終的な出力結果の誤差、CLIPのText/ImageのEncoderを用いて、生成画像とテキストの特徴量を抽出し、コサイン距離を計算する。
    • 編集用の編集画像の誤差、描画する画像を緑色の背景と合成した画像とテキスト(over a green screenを記載する)の特徴量を抽出したもののコサイン距離を計算する。
    • 構造誤差、CLIPのViTを推論されて得られる出力結果(複数のToken)の距離を用いて、入力と出力で構造上類似しているかの制約をかける。
      入力と出力で似たような傾向の意味的構造であれば誤差が低くなる。

    動画においてはAtlasと呼ばれる方法で動画を2Dで表現する方式があり、このAtlas上の画像をText2LIVEで加工し、逆変換して動画を推定するといった方式を行います。
    従来手法では動画の時間軸方向での変換の一貫性が不足していました。しかし、この手法では動画間のフレームの変換の一貫性を改善できたとのことです。


    EclipSE: Efficient Long-range Video Retrieval using Sight and Sound


    動画の解析には動画から取得できるフレームだけではなく、音声も重要です。そのため動画(フレーム+音声)とテキストを用いた表現学習を提案しました。
    特に動画のEncoding部分に特徴があり、図中に「ECLIPSE」と記載されているモジュールが提案されているニューラルネットワークの構造になります。
    Image/textのEncoderの初期値には、CLIPの事前学習済モデルが使われます。「ECLIPSE」モジュールはテキストと動画の特徴を用いたContrastive Lossに基づいて重みを更新します。
    この学習によりActivityNet Captionsと呼ばれる動画のキャプション生成にてSoTAを達成しました。

    特にCLIPの実験結果を見ていると物量が非常に重要な要素の一つだったので、EclipSEもデータを増やせばさらなる精度向上が期待できるかもしれません。

    Registration Based Few-Shot Anomaly Detection


    従来の異常検知の手法(図中a,b)は一つのカテゴリ(製品)ごとに一つのモデルが必要となります。
    本提案手法(図中c)では、一つのモデルで全てのカテゴリの異常検知を実現します。
    まずは、最初の図中のFeature Registration Aggregated Trainingを実施します。次の図はFeature Registration Aggregated Trainingで学習されるモデルの概略図です。

    2つの画像を入力するSiamese Networkを入力し、途中の特徴量の出力結果を幾何的に変換させるSTNのモジュールをはさみます。これにより特徴量のロバスト性を確保します。
    その結果に、特徴量変換のEncoderを適用し、Prediction Headを片方の特徴量のみに適用します。
    Prediction Headを片方の特徴量のみ適用するのは、Negative Pairがない状態で学習するとモデルが同じような特徴を出力する事象が発生するようで、それを回避するためとのことです。
    最後に2つの特徴をコサイン類似度を符号反転させたものを誤差として、パラメータを更新します。

    次にNormal Distribution Estimationです。推定したいデータのカテゴリと同じものであるSupport Setを用いて、新しいカテゴリのパラメータの平均と分散をニューラルネットワークの出力の高さと幅ごとに求めます。
    最後に、前段で求めた平均、分散のパラメータを利用してマハラノビス距離を用いて異常であるかどうかを判定します。
    これにより新しいカテゴリが登場しても、再学習させず、Support Setを利用すれば1つのモデルで様々な異常検知ができることになります。

    最後に

    ECCVに参加してCLIPを1日1回は聞いたように思えました。
    CLIPは途中にも簡単に述べましたが、テキスト/画像の表現学習で、膨大なデータを利用することで、非常に良い表現を獲得しています。
    私自身もKaggleのGoogle Universal Image Embeddingで利用しましたがImageNetの学習済ではCLIPを超えられませんでした。
    CLIPは特にテキスト/画像の2つのドメインを利用する際には必須の技術になってきてました。

    テキスト/画像は発展が著しいので今後も楽しみです。

    Acroquest Technologyでは、キャリア採用を行っています。


    • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
    • Elasticsearch等を使ったデータ収集/分析/可視化
    • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
    • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
     
    少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。


    www.wantedly.com



    Kibana Custom Mapで地図可視化 #1 独自の地図を簡単に設定

    こんにちは、Elastic認定アナリストのshiroiです。
    業務では、Kibanaでビジネスログやセキュリティ情報の可視化を行っています。

    Kibanaで地図を活用した可視化を行う場合、
    標準で用意されている世界地図や日本地図の上にデータを描画することが一般的ですが、
    独自に作成した地図(Custom Map)を利用することも可能です。

    本ブログでは、KibanaでCustom Mapを利用する方法を紹介します。

    Custom Mapのユースケースと利用方法

    国/県/地域など、広いエリアにおける可視化を行う場合、標準で用意されている世界地図/日本地図を利用します。
    一方で「遊園地/アウトレットモールなどの施設」や「オフィス/会議室などの建物内エリア」など、
    標準地図でカバーできない限定エリアにおける可視化を行う場合には、Custom Mapを利用します。

    KibanaでCustom Mapを利用するには、2つの方法があります。

    1. 「Elasticsearch」からCustom Mapを読み込む
    2. 「GeoServer」からCustom Mapを読み込む

    1. 「Elasticsearch」からCustom Mapを読み込む

    ElasticsearchからCustom Mapを読み込む場合、Shapeファイルを利用します。

    Shapeファイルとは、「地形などの図形情報と、地名や位置などの属性情報をもった地図データ」が集まったファイルです。
    Kibana Mapsでは、Shapeファイルを登録するだけでCustom Mapを利用可能であるため、
    手軽に独自の地図で可視化をしたい場合におすすめのアプローチです。

    一方、地図の拡大縮小に合わせて、表示するラベルのサイズを変更したり、複数の地図のデザインを統一するような細かな制御ができません。

    2. 「GeoServer」からCustom Mapを読み込む

    GeoServerは、地理情報を編集し、地図情報を配信することが可能なOSSです。
    このGeoServerを使えば、CSSのような設定で複数の地図に対して、デザインを統一することや、
    拡大縮小に合わせて、表示するラベルの制御を行うことが可能です。

    注意点として、ブラウザが直接GeoServerと通信して地図情報を読み込む仕様であるため、
    不特定多数の人が利用するにはGeoServerを一般公開する必要があります。
    どちらかというとイントラネットでの利用に向いています。

    本記事では、1の「Elasticsearch」からCustom Mapを読み込む方法を解説します。
    2のGeoServerを使用したKibana Mapsでの可視化は、別の記事で紹介する予定です。

    今回書かないこと

    今回は、以下の内容は、対象外とします。

    • Shapeファイルの作り方
    • GeoJSONの作り方

    GeoJSONとは、位置情報とその位置についてのラベル情報や形状(点や矩形など)情報を持ったJSONです。

    実際に試してみる

    今回は、コワーキングスペースの可視化を想定し、1週間の平均利用時間と各部屋のWifi強度を可視化します。
    インデックスは以下です。

    インデックス名:room-availability-weekly-yyyy-MM-dd

    フィールド名 概要
    room_name ルーム名
    use_count ルームの利用回数
    use_date ルームの利用日時
    use_hours ルームの利用時間
    wifi_intensity wifiの強度

    独自の地図(Shapeファイル)の登録

    最初にコワーキングスペースの1フロアを表示するShapeファイルを作成します。
    Shapeファイルを作成する時は、IllustratorQGISなどを利用するとよいでしょう。
    Shapeファイルを作成すると、以下4種の形式のファイルが作成されます。

    • shp
    • dbf
    • prj
    • shx

    上記4種全てを、Kibana Maps画面の「Add layer」を選択し、「Upload file」から登録します。

    Shapeファイル群を登録

    4つのファイルを登録すると、以下のような画面になります。
    登録したShapeファイル群は、indexとして登録されます。

    Shapeファイル群登録後の画面

    index名を入力し、登録した後は、「Add as document layer」をクリックして、レイヤー名や表示する不透明度、登録した地図の色を設定します。
    ここでは、地図の不透明度を100%とします。

    レイヤー名と不透明度を登録

    表示したい地図が固定されるように、以下のFilteringのチェックを外しておきます。

    Filteringの設定変更

    Layer Styleで地図に塗る色を設定後、保存します。
    これで地図部分は完成です。
    レイヤー別に塗り色を変える必要があるため、今回は以下3つに分けて独自の地図を作成しました。

    • 部屋の枠と床の部分
    • 机と椅子
    • ルーム名

    各地図について、それぞれ同じ操作を繰り返して登録します。
    順番に登録していくと以下のような地図になります。

    地図の登録完了

    GeoJSONの登録

    ShapeファイルをGeoJSONファイルにコンバートしましょう。
    Kibana Maps画面の「Add layer」を選択し、「Upload file」から生成したGeoJSONを登録します。
    この時に、Shapeファイル登録時と同様に、index名を指定します。

    GeoJSONを登録

    「Add as document layer」をクリックし、レイヤー名や不透明度を設定します。
    Terms joinsで登録したGeoJSONで指定されたエリアをデータの値別に色が塗られるようにします。

    GeoJSONで指定したエリアとコワーキングスペース利用時間のデータを紐づける

    平均時間を表示するため「avg」を選択

    色設定では、「By Value」にすることで、値別に塗り色を変えることができます。

    「By Value」で値別に色を塗る

    これで設定は完了です。

    データを表示すると以下のようになります。
    この図面では、各部屋の利用時間が長ければ長いほど、濃い赤になり、利用時間が短ければ短いほど、薄い赤になります。

    独自の地図の可視化完成

    上記の地図では、GeoJSONで矩形表示をできるように設定していました。
    エリアの中心に位置情報を設定したGeoJSONを登録すれば、以下のように円の色やサイズでWifi強度を表現することも可能です。
    Wifi強度が強ければ強いほど、オレンジ色で丸の大きさが大きくなります。
    弱いところは、青色で丸の大きさも小さくなります。

    点で表示する場合の例

    まとめ

    今回は、簡単に独自の地図で可視化したいときの作り方を解説しました。
    次回は、GeoServerを使用して、より高度なCustom Mapを作成する方法を紹介します。

    FastAPI+StrawberryでGraphQLのAPIを実現する

    はじめに

    最近アクアリウムを始めました、菅野です。
    プログラムと異なり、生体を扱う都合上の想定外を楽しみながら試行錯誤しております。

    さて、皆さんはAPIサーバを構築する際に、どのAPI形式を用いていますか?
    まだまだREST形式で実装することが多いかとは思いますが、
    GraphQLを用いることも増えてきているのではないでしょうか?

    今回は、そんなGraphQLをFastAPIと各種ライブラリを用いて簡単に実装する方法を紹介していこうと思います。

    GraphQLとは

    GraphQLは、Meta社(旧Facebook社)によって開発・公開されたAPI仕様です。クエリ形式で、処理やパラメータの内容を指定します。
    RESTとの比較としては、

    • クライアント側で取得したい情報をクエリとして渡すことができるため、 利用しないデータを無駄に受け取らなくて済む。
    • 一つのエンドポイントに対し複数リソースのクエリを一度にリクエストできるので、APIコール数を削減できる

    等の利点があります。

    GraphQL | A query language for your API

    構成

    PythonAPI作成フレームワーク、FastAPIと、GraphQLライブラリStrawberryを組み合わせることで、簡単にGraphqlAPIを実現することができます。 今回は、上記ライブラリに加え、ORMライブラリSQLAlchemyを用いてSQLite3に接続するTODO管理アプリのバックエンドサーバを構築します。

    利用ライブラリ

    利用するPythonおよび主なライブラリとそのバージョンは以下になります。

    ライブラリ バージョン
    Python 3.9.6
    FastAPI 0.85.0
    Strawberry 0.131.1
    SQLAlchemy 1.4.41

    実装

    ディレクトリ構成

    ディレクトリ構成は以下のようになっています。
    クリーンアーキテクチャの構成に従いディレクトリを分けております。

    /
    │  poetry.lock
    │  pyproject.toml      
    ├─db
    └─src
        │  app.py
        │  contexts.py
        │  database.py
        │  router.py
        │  resolvers.py
        │  __init__.py
        │  
        ├─domain
        │  ├─model
        │  │      task.py
        │  │          
        │  └─service
        │          task_service.py
        │              
        ├─infra
        │  └─repository
        │          models.py
        │          task_repository.py
        │              
        └─web
            └─task
                    inputs.py
                    types.py

    クラス図

    それぞれのコードの関係性は以下になっています。

    各種コード説明

    infra/repositoryディレクト

    データベースに接続するためのデータモデル定義と、各種DB操作の実装をこちらに記載します。

    models.py

    DBテーブルで利用するカラム定義を記載します。
    一般的に、SQLAlchemyで用いるDBデータモデルクラスはDeclarative Extensionsを用いて、
    テーブルとドメインモデルクラスのマッピングを行います。
    しかし、上記を利用すると、ドメインモデルクラスがSQLAlchemyに依存する形になってしまうため、
    今回は利用せず、ドメインモデルクラスをSQLAlchemyに依存しないように分けて実装します。

    Declarative Extensions — SQLAlchemy 1.4 Documentation

    from domain.model.task import Task ,Status
    from sqlalchemy import (Column, DateTime, Enum,
                            String, Table)
    from sqlalchemy.orm import registry
    from sqlalchemy.sql import func
    from sqlalchemy_utils import UUIDType, ChoiceType
    import uuid
    import enum
    
    mapper_registry = registry()
    
    # Taskテーブルの定義
    task = Table(
        'task',
        mapper_registry.metadata,
        Column('id', UUIDType(binary=False), primary_key=True, default=uuid.uuid4),
        Column('description', String(200)),
        Column('title', String(200)),
        Column('status', Enum(Status)),
        Column('updated_at', DateTime, default=func.now())
    )
    mapper_registry.map_imperatively(Task, task)
    

    task_repository.py

    DBセッション情報を受け取り、CRUD操作を行う処理を記載します。

    from sqlalchemy.orm import Session
    
    from domain.model.task import Task
    
    
    class TaskRepository:
        def __init__(self, db: Session):
            self.__db = db
    
        def find_by_id(self, id_: str) -> Task:
            task = self.__db.query(Task).get(id_)
    
            return task
    
        def find_all(self) -> list[Task]:
            # tasks = list(cls._store.values())
            tasks = self.__db.query(Task).all()
    
            return tasks
    
        def save(self, task: Task) -> None:
            self.__db.add(task)
            self.__db.commit()
    
        def delete(self, task: Task) -> None:
            self.__db.delete(task)
            self.__db.commit()
    

    domainディレクト

    内部で行うビジネスロジック(今回ではAPI実行時の各種処理)とデータ型の定義を行います。

    service/task_service.py

    データアクセスをリポジトリクラスに委任することで接続先に依存しない実装になっています。
    例えば、DB接続ではなくオンメモリでデータを保持するようになった場合はリポジトリクラスのみの実装を変更すればよくなります。

    from datetime import datetime
    from typing import Optional
    
    from domain.model.task import Status, Task
    from infra.repository.task_repository import TaskRepository
    
    
    class TaskService:
        def __init__(self, repo: TaskRepository) -> None:
            self.__repo = repo
    
        @property
        def repo(self) -> type[TaskRepository]:
            return self.__repo
    
        def find(self, id: str) -> Task:
            task = self.repo.find_by_id(id)
    
            return task
    
        def find_all(self) -> list[Task]:
            tasks = self.repo.find_all()
    
            return tasks
    
        def create(self, *, title: str, description: Optional[str] = None) -> Task:
            task = Task(title=title, description=description)
            self.repo.save(task)
    
            return task
    
        def update(self, *, id: str, status: Status) -> Task:
            task = self.repo.find_by_id(id)
            task.status = status
            task.updated_at = datetime.utcnow()
            self.repo.save(task)
    
            return task
    
        def delete(self, id: str) -> Task:
            task = self.repo.find_by_id(id)
            self.repo.delete(task)
    
            return task
    

    model/task.py

    DB接続モデルクラスでも説明したように、SQLAlchemyに依存しないドメインモデルクラスとして、
    利用するデータのモデルクラスを別途実装します。

    import uuid
    from dataclasses import dataclass, field
    from datetime import datetime
    from enum import Enum
    from typing import Optional
    
    
    class Status(Enum):
        TODO = 'todo'
        DOING = 'doing'
        DONE = 'done'
    
    
    @dataclass
    class Task:
        title: str
        id: uuid.UUID = field(default_factory=uuid.uuid4)
        description: Optional[str] = None
        status: Status = Status.TODO
        updated_at: datetime = field(default_factory=datetime.utcnow)
    

    web/taskディレクト

    DB接続、ドメインロジックの実装が終わったので、 いよいよ本投稿の要旨であるStrawberryを用いたGraphql実装部分の肝である、webクラスを実装していきます。

    types.py

    GraphQLでは、クエリを構造を定義する必要があります。 Strawberryではstrawberry.typeアノテーションを用いてコードベースでクエリスキーマを定義できます。 Python型とGraphQLスキーマ型との対応は以下のリンクを参照してください。

    Schema basics | 🍓 Strawberry GraphQL

    from datetime import datetime
    from typing import Optional
    
    import strawberry
    
    from domain.model.task import Status, Task
    
    StatusType = strawberry.enum(Status, name='Status')
    
    
    @strawberry.type(name='Task')
    class TaskType:
        id: strawberry.ID
        title: str
        description: Optional[str]
        status: StatusType
        updated_at: datetime
    

    input.py

    GraphQLリクエストの入力も同様にコードベースで定義します。 strawberry.inputアノテーションを付けたクラスが入力スキーマとして利用できるようになります。

    from typing import Optional
    
    import strawberry
    
    
    @strawberry.input
    class AddTaskInput:
        title: str
        description: Optional[str] = None
    
    
    @strawberry.input
    class UpdateTaskInput:
        id: strawberry.ID
        status: str
    

    srcディレクトリ直下

    FastAPI起動用のコードを記載します。

    database.py DB接続情報と初回起動時のテーブル初期化処理を記載します。

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    
    from infra.repository.models import mapper_registry
    
    
    SQLALCHEMY_DATABASE_URI = 'sqlite:///../db/tasks.db'
    
    
    class DatabaseContext:
        def initialize(self):
    
            engine = create_engine(
                SQLALCHEMY_DATABASE_URI, connect_args={'check_same_thread': False}, echo=True
            )
    
            self.SessionLocal = sessionmaker(
                autocommit=False, autoflush=False, bind=engine)
            Base = mapper_registry.generate_base()
            Base.metadata.create_all(bind=engine)
    
    
    database_context = DatabaseContext()
    
    
    def get_db():
        """
        Get database
    
        Yields:
            SessionLocal: Local session for database connection
        """
        db = database_context.SessionLocal()
        try:
            yield db
        finally:
            db.close()
    

    context.py

    コンテキスト情報として、各種サービスとリポジトリクラスの関係性を保持するように定義します。

    from fastapi import Depends
    from strawberry.fastapi import BaseContext
    
    from database import get_db
    from domain.service.task_service import TaskService
    from infra.repository.task_repository import TaskRepository
    
    
    def init_task_repository(db=Depends(get_db)):
        return TaskRepository(db)
    
    
    def init_task_service(task_repository: TaskRepository = Depends(init_task_repository)):
        return TaskService(
            task_repository
        )
    
    
    class TaskContext(BaseContext):
        def __init__(self, task: TaskService):
            self.__task: TaskService = task
    
        def get_task(self):
            return self.__task
    
    
    class TaskServicesContext(BaseContext):
        def __init__(self, task: TaskService):
            self.__task: TaskService = task
    
        def get_task(self):
            return self.__task
    
    
    async def get_context(
            task_service: TaskService = Depends(init_task_service)
    ) -> TaskContext:
        return TaskContext(
            task=task_service
        )
    

    resolver.py

    コンテキストからサービス情報を取得し、対応する処理を呼び出す実装を記載します。

    import strawberry
    
    from web.task.inputs import AddTaskInput, UpdateTaskInput
    from web.task.types import TaskType
    from strawberry.types import Info
    
    
    def get_task(id: strawberry.ID, info: Info) -> TaskType:
        service = info.context.get_task()
        task = service.find(id)
    
        return task
    
    
    def get_tasks(info: Info) -> list[TaskType]:
    
        service = info.context.get_task()
        tasks = service.find_all()
    
        return tasks
    
    
    def add_task(task_input: AddTaskInput, info: Info) -> TaskType:
        service = info.context.get_task()
        task = service.create(**task_input.__dict__)
    
        return task
    
    
    def update_task(task_input: UpdateTaskInput, info: Info) -> TaskType:
        service = info.context.get_task()
        task = service.update(**task_input.__dict__)
    
        return task
    
    
    def delete_task(id: strawberry.ID, info: Info) -> TaskType:
        service = info.context.get_task()
        task = service.delete(id)
    
        return task
    

    router.py

    GraphQLのQuery, Mutationの形式を定義し、実行する処理をリゾルバとして渡すように記載します。

    import strawberry
    from strawberry.fastapi import GraphQLRouter
    from resolvers import add_task, delete_task, get_task, get_tasks, update_task
    
    from web.task.types import TaskType
    from contexts import get_context
    
    
    @strawberry.type
    class Query:
        task: TaskType = strawberry.field(resolver=get_task)
        tasks: list[TaskType] = strawberry.field(resolver=get_tasks)
    
    
    @strawberry.type
    class Mutation:
        task_add: TaskType = strawberry.field(resolver=add_task)
        task_update: TaskType = strawberry.field(resolver=update_task)
        task_delete: TaskType = strawberry.field(resolver=delete_task)
    
    
    schema = strawberry.Schema(query=Query, mutation=Mutation)
    task_app = GraphQLRouter(schema, context_getter=get_context)
    

    app.py

    from fastapi import FastAPI
    from router import task_app
    import uvicorn
    
    from database import database_context
    
    api = FastAPI()
    
    
    def register_controller():
        api.include_router(task_app, prefix='/task')
    
    
    if __name__ == '__main__':
        database_context.initialize()
        register_controller()
    
        uvicorn.run(app=api, host='0.0.0.0', port=8000)
    

    実装したrouter.pyの内容を登録し、FastAPIを起動する処理を記載します。

    起動、API呼び出し

    起動コマンドは以下のようになっています。

    cd ./src
    python -m app

    起動に成功するとlocalhost:8000にAPIが立ち上がります。
    /taskにGraphQLルータを追加したので、
    localhost:8000/taskにブラウザでアクセスするとStrawberryのGraphQL UIページにアクセスできます。

    中央のパネルにGraphQLクエリを入力して実行することができます。
    左側のメニューからドキュメントを確認したり、クエリの探索的作成もできる便利なUIになっています。

    データの投入クエリは以下のようになっています。

    mutation addDataSample {
      taskAdd(taskInput:{ title:"test", description: "testTask"}){
        title
      }
    }

    クエリの実行結果は右側のパネルに表示されます。

    投入したデータを一覧取得するクエリを実行してみましょう。

    query listtasks {
      tasks {
        id
        title
        status
        description
        updatedAt
      }
    }

    無事、投入したデータを確認することができました。

    さいごに

    FastAPIとGraphQLライブラリStrawberryを用いて簡単にGraphQLAPIを実装する方法を紹介しました。
    ライブラリに則った定義を記載するだけで簡単にGraphQLのAPIを実現できます。
    手軽に実現できるため、今後APIを構築する際はGraphQLも選択肢の一つに入るのではないかと思います。
    REST API、GraphQL双方の利点、欠点を踏まえながら最適な形式を選択していきたいものですね。

    それでは!

    Acroquest Technologyでは、キャリア採用を行っています。
    • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
    • Elasticsearch等を使ったデータ収集/分析/可視化
    • マイクロサービス、DevOps、最新のOSSを利用する開発プロジェクト
    • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
      少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。 www.wantedly.com