こんにちはイワツカです。
今年の夏は、特に猛暑日が続いていたので、例年にも増して素麺を食べてました。
さて今回は、AWS Lambda(Python)でLambda Web Adapterを用いてレスポンスストリーミングする方法を試してみたので紹介します。
1. 概要
1.1 レスポンスストリーミングとは?
レスポンスストリーミングとは、HTTPリクエストに対してサーバーがレスポンスを一度にまとめて送るのではなく、データを分割して順次クライアントに送信する手法のことです。
AWS Lambdaでは、Lambda関数URLを設定してLambdaからのレスポンスをレスポンスストリーミングにすることができます。
レスポンスストリーミングにすることで、従来に比べてユーザーにレスポンスが届くまでの時間を軽減できるので、Webアプリケーションにおけるユーザー体験を向上させることができます。
用途としては、リアルタイムなチャットのやり取りや、処理の進行状況などを表示するのに便利です。
ただ、現在レスポンスストリーミングに直接対応しているのは、ランタイムがNode.jsの場合のみです。 docs.aws.amazon.com
Lambdaで使用するランタイムがPythonの場合は、Lambda Web Adapterを使用することでレスポンスストリーミングを実現可能です。
1.2 Lambda Web Adapterとは?
Lambda Web Adapterとは、ウェブアプリケーションをAWS Lambda上で実行するためのツールです。
Lambda Web Adapterを使用するとFastAPIなどのフレームワークを利用できるようになります。
github.com
そしてFastAPIには、StreamingResponseという、レスポンスストリーミングを実現するための機能があります。
fastapi.tiangolo.com
そのため、Lambda Web AdapterとFastAPIを組み合わせることによって、Pythonで実装をするLambdaでも、レスポンスストリーミングを実現できるようになります。
2. アプリ作成
この記事では、Lambda Web Adapterを使ってFastAPIをLambda上で動かし、Amazon Bedrockを使ったLLM(大規模言語モデル)とのチャットアプリをレスポンスストリーミングで作成します。
構成や実装はAWSの公式の例を参考に実施しました。
github.com
全体像はこちらです。StreamlitというPythonフレームワークで画面を作成し、LLMとのチャットをできるようにしています。
2.1 実行環境
本アプリを動かすには、FastAPIを動かすためのコンテナをビルドするためのDocker環境とStreamlitを動かすためのPython環境が必要です。
Python環境は、Python 3.12.4、Streamlit 1.38.0を使用しました。
2.2 ディレクトリ構成
ディレクトリ構成は、AWS公式の例に載っているファイルに加えて、Streamlitのスクリプトのみを追加しています。
Dockerfile、requirements.txt、template.yamlは本アプリでも同じ内容でそのまま利用しました。
./ |--app/ | |--Dockerfile | |--main.py | |--requirements.txt | |--streamlit_app.py |--template.yaml
2.3 FastAPIの実装
FastAPIの実装(main.py)では、APIのレスポンスをStreamingResponseにすることがポイントです。
また、StreamingResponseに渡すbedrock_chat_stream()の処理では、画面から受け取ったユーザーのメッセージに対して、Bedrockからの応答をreturnでなくyieldで都度、返しているところも重要です。
基本的な実装は公式の例に載っているので、比較的簡単に実装することができました。
import boto3 import json import os import uvicorn from fastapi import FastAPI from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import Optional # FastAPIのインスタンスを作成 app = FastAPI() # チャットメッセージのモデル class ChatMessage(BaseModel): message: Optional[str] = None # APIエンドポイントを定義 @app.post("/api/chat") def api_chat(chat_message: ChatMessage): if chat_message.message is None or chat_message.message.strip() == "": return {"error": "Message cannot be empty"} return StreamingResponse(bedrock_chat_stream(chat_message.message), media_type="text/event-stream") # Bedrockクライアントを初期化 bedrock = boto3.client('bedrock-runtime') async def bedrock_chat_stream(user_message: str): """ユーザーのメッセージを受け取り、LLMからの返答をストリーミング形式で返す""" instruction = f"ユーザーメッセージに対する会話を続けてください: {user_message}" body = json.dumps({ "anthropic_version": "bedrock-2023-05-31", "max_tokens": 1024, "messages": [ { "role": "user", "content": instruction, } ], }) # Bedrockからレスポンスストリーミングを取得 response = bedrock.invoke_model_with_response_stream( modelId='anthropic.claude-3-haiku-20240307-v1:0', body=body ) # ストリームが存在する場合、逐次データを処理 stream = response.get('body') if stream: for event in stream: chunk = event.get('chunk') if chunk: message = json.loads(chunk.get("bytes").decode()) if message['type'] == "content_block_delta": yield message['delta']['text'] or "" elif message['type'] == "message_stop": yield "\n" if __name__ == "__main__": # Uvicornを使用してサーバーを起動 uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "8080")))
2.4 Streamlitの実装
Streamlitの公式ページに載っているLLMとのチャット画面を参考に実装しました。 docs.streamlit.io
Streamlitでストリーミング出力させる場合はst.write_stream()を利用します。
本アプリでは、LLMからの応答をrequestライブラリを通して取得していて、st.write_stream()が対応していない型になっているため、対応させるためにラッパー関数(stream_wrapper())を追加しています。
import requests import streamlit as st # FastAPIのエンドポイントURL API_URL = "<Lambda関数URL>" # 画面にタイトルを追加 st.title("AI Chat with Bedrock") # チャット履歴のsession_stateを初期化 if "messages" not in st.session_state: st.session_state.messages = [] # チャット履歴を表示 for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) def stream_wrapper(response): """レスポンスをStreamlitで互換性のある形式に変換する""" for chunk in response.iter_content(chunk_size=128): if chunk: yield chunk.decode('utf-8') # チャット入力 if prompt := st.chat_input("What would you like to discuss?"): with st.chat_message("user"): st.markdown(prompt) # チャット履歴にユーザー入力を追加 st.session_state.messages.append({"role": "user", "content": prompt}) # APIにリクエストを送信し、レスポンスストリーミングを受け取る with st.chat_message("assistant"): response = requests.post(API_URL, json={"message": prompt}, stream=True) response_text = st.write_stream(stream_wrapper(response)) # チャット履歴にAPIのレスポンスを追加 st.session_state.messages.append({"role": "assistant", "content": response_text})
3. アプリを動かして見る
本アプリは以下の2ステップで動かせます。
1. チャットアプリをデプロイ
2. Streamlitを起動する
3.1 チャットアプリをデプロイ
AWS公式の例に載っている手順でFastAPIを動かすLambdaのビルドとデプロイを行います。
そして、デプロイ後にLambda関数URLが分かります。
sam build --use-container sam deploy --guided
以下は利用したtemplate.yamlです。AWS公式の例に載っているtemplate.yamlと同じ内容です。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > Streaming Chat with FastAPI on AWS Lambda # More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst Globals: Function: Timeout: 300 Resources: FastAPIFunction: Type: AWS::Serverless::Function Properties: PackageType: Image MemorySize: 512 Environment: Variables: AWS_LWA_INVOKE_MODE: RESPONSE_STREAM FunctionUrlConfig: AuthType: NONE InvokeMode: RESPONSE_STREAM Policies: - Statement: - Sid: BedrockInvokePolicy Effect: Allow Action: - bedrock:InvokeModelWithResponseStream Resource: '*' Tracing: Active Metadata: Dockerfile: Dockerfile DockerContext: ./app DockerTag: v1 Outputs: FastAPIFunctionUrl: Description: "Function URL for FastAPI function" Value: !GetAtt FastAPIFunctionUrl.FunctionUrl FastAPIFunction: Description: "FastAPI Lambda Function ARN" Value: !GetAtt FastAPIFunction.Arn
3.2 Streamlitを起動する
上記のデプロイで得られたLambda関数URLをstreamlit_app.pyのAPI_URLに代入します。
その後、streamlit_app.pyがあるディレクトリで以下のコマンドを実行します。
streamlit run streamlit_app.py
3.3 チャットを試してみる
それでは実際にチャットを試してみます。
LLMからの応答が逐次表示されているのが分かります。
※なお、本アプリはレスポンスストリーミングを試すために作成したので、会話履歴はLLMに入力しておらず単発の会話のみ可能です。
4. まとめ
今回はLambda Web Adapterを使ってLambda関数URLの出力をレスポンスストリーミングにする方法を試してみました。
レスポンスストリーミングがAWS Lambdaでも可能になるので、チャットアプリの実現等、Webアプリケーションの幅が広がりますね。
AWSの公式のサンプルやドキュメントが充実していて、比較的簡単に実装することができるので、ぜひ試してみてください。
Acroquest Technologyでは、キャリア採用を行っています。少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。 www.wantedly.com
- Azure OpenAI/Amazon Bedrock等を使った生成AIソリューションの開発
- ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
- マイクロサービス、DevOps、最新のOSSやクラウドサービスを利用する開発プロジェクト
- 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長