こんにちは。ブログは書くのは久しぶりの上田です。
今日は、AWS Lamdaを使ったCloudWatchのログのキーワード監視ツールを作成したので、
その紹介をしたいと思います。
GitHub - acroquest/aws-cloudwatch-logwatcher
モチベーション
唐突ですが、皆さんAWS Lambdaがメインロジックのアプリ監視はどうしているでしょうか?
私はLambdaアプリログの監視をしたいと考えたときに、
ちょうど良いサービス/ツールがなくて悩みました。
私の場合、以下のようなツールがほしいと考えていました。
- 予期せぬエラーログが出ているかどうか、定期的にメール通知したい。
- 指定するエラーログは正規表現で書きたい。
- 複数のロググループをまとめて1つの設定で監視したい。
- メール通知には、エラーログの内容を含めたい。
自作LambdaのアプリログはCloudWatchに出力され、
CloudWatchにはメトリクスフィルタやエラーを拾ってサブスクリプションで通知する機能があるのですが、
以下条件が目的に合いませんでした。
サブスクリプションでLambdaに通知した場合
- 通知するエラーログを正規表現で指定するには、ログ出力ごとに通知を処理する必要がある。
- 各ロググループ単位でしかエラー通知の設定ができない。
- 手間がかかる上に料金がかさむ(正確には計算していませんが、CloudWatch,Lambdaの値段と頻度を考えると)。
(もしかしたら私の勉強不足で、方法はあるかもしれません。)
結局、自作ツールでLambda上にPythonロジックを書き、
下記構成のツールを作成することで、
自分のほしかった監視機能を実現しました。

構成のためのServerless定義はこちらを参照。
※Serverless定義にメール通知先のSNSは記載されていないため、ツールを使うためには事前にSNSを手動で設定しておく必要があります。
ツールの動き
ツールはこんな流れで動くようにしました。
- Cronから呼び出されたParent Lambdaが、S3から前回実行時刻を取得する。
- Parent Lambdaが指定したprefixごとに子Lambdaを呼びだす。
- 呼び出された子Lambdaが指定prefixグループのログを収集し、前回実行時刻以降のログに絞り込む。
- ログ内容を整形し、SNSへ通知。
今回作製してみて、Lambdaのバッチ起動~他AWS機能の連携は、
非常に作りやすいと感じました。
今回はLambdaからS3、Lambda、CloudWatch、SNSのAPIを呼び出して実現しています。
参考までにそれぞれ連携に使用したPythonのAPIコール例を載せます。
LambdaからS3
// S3バケット読み込み
s3 = boto3.session.Session().resource('s3')
bucket_name = os.environ['ALERT_LOG_BUCKET']
bucket = s3.Bucket(bucket_name)
key = os.environ['ALERT_LOG_KEY']
object = bucket.Object(key)
last_updated_time_str = object.get()['Body'].read().decode('utf-8')
result = int(last_updated_time_str)
...
// S3バケット書き込み
object = bucket.Object(key)
object.put(
Body=str(last_exec_time).encode('utf-8'),
ContentEncoding='utf-8',
ContentType='text/plain'
)Dynamoに入れようか迷いましたが、
S3の方が安そうなので。。。
'text/plain'でepoc形式の時刻を素のテキストとして読み書きしています。
普段は'application/json'でjsonで扱うほうが多いかもしれません。
LambdaからLambda
function_name = os.environ['FILTERED_ALERT_LOGS']
lambda_client = boto3.session.Session().client('lambda')
lambda_client.invoke(
FunctionName=function_name,
InvocationType='Event',
LogType='Tail',
Payload=json.dumps(request)
)InvocationTypeを'RequestResponse'にすることで同期、'Event'にすることで非同期処理になります。
今回は非同期並列処理がしたかったので'Event'に設定しました。
LambdaからCloudWatch
// ロググループの取得
client = boto3.session.Session().client('logs')
response = client.describe_log_groups(
logGroupNamePrefix=prefix
)
...
// ログストリームの取得
response = client.describe_log_streams(
logGroupName=group_name,
orderBy='LastEventTime',
descending=True,
limit=limit
)
...
// ログイベントの取得
logs = client.get_log_events(
logGroupName=group_name,
logStreamName=stream_name,
startTime=start_time,
endTime=end_time,
startFromHead=True
)CloudWatchは実行Lambdaごとにロググループを、
ロググループの中にコンテナのライフサイクルごとにログストリームを、
ログ出力ごとにイベント情報を階層的に保持しています。
'logGroupNamePrefix'を指定してロググループを取得することで取得対称の分類を、
ログストリーム取得後にPython側で時刻のフィルタリングを、
イベント取得後にPython側で時刻のフィルタリングと正規表現での内容絞込みを行っています。
LambdaからSNS
message = '整形したCloudWatchログ'
topic_arn = os.environ['ALERT_LOG_TOPIC_ARN']
subject = os.environ['ALERT_LOG_SUBJECT']
sns_request_params = {
'TopicArn': topic_arn,
'Message': message,
'Subject': subject
}
sns_client = boto3.session.Session().client('sns')
sns_client.publish(**sns_request_params)環境変数から取得したタイトルをつけて通知しています。
以上、自分で欲しかったツールを作成し、
目的とするLambdaのアプリログ監視をできるようになりました。
ここで作製したツールは、GitHubで公開しています。
もしかしたらもっと良い方法/サービスがあるかもしれません、
こうしたほうがいい、とかあればぜひコメントください。
ではでは
Acroquest Technologyでは、キャリア採用を行っています。
- ビッグデータ(Hadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
- マイクロサービス、DevOpsなどの技術を使ったり、データ分析、機械学習などのスキルを活かしたい。
- 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
- 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
モノリシックなアプリケーションをマイクロサービス化したいエンジニア募集! - Acroquest Technology株式会社のWeb エンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com