SQSを使ってAWSのLambda関数を実行させる方法
目的
キューが送信されたら、AWSのLambda関数を実行する。
対象
やったこと
PowerAutomationのSQSコネクタを使用してAWSをキックする。
使ったものの説明
Lambda
AWSアカウントを持っていれば使える、クラウド上のプログラム実行環境。
SQS
AWSアカウントを持っていれば使える、ソフト間のメッセージの送受信ツール。
Step1 実行したいLambda関数を用意する
今の時点で実行トリガーなどは設定しないでいいので、とりあえず関数を作ります。 AWSコンソールのサービス検索窓からLambdaのコンソールを開き、「関数の作成」を実行。
一から作成のままで、関数名+ランタイムを設定します。デフォルトの実行ロールは既存のロールでも大丈夫です。
キューされたメッセージ本文を引数にしたい場合
Lambdaで実行する関数の冒頭部分を以下の構成にすることで、payload
という変数にメッセージ本文が入ります。
def lambda_handler(event, context): for record in event['Records']: # キューされたメッセージをpayloadという変数に格納 payload=record["body"] # 以下、処理内容を記載 print(str(payload))
次に、Lambdaの実行ロールに対してSQS関連のポリシーを与えます。
ポリシーを持たないロールのままだとSQSが実行されたことをLambdaが認識できず、エラーすら表示されないので注意しましょう。
コンソールの設定からアクセス権限を選び、実行ロールをクリックします。
別タブでIAMロール(AWS上の実行権限)の設定画面が立ち上がるので、アクセス権限タブの「ポリシーをアタッチします」をクリックします。
立ち上がった画面の検索枠に「SQS」と入力すると3つポリシー候補が表示されます。 「AmazonSQSFullAccess」のチェックボックスをONにして、画面最下の「ポリシーのアタッチ」をクリックします。
※もし「AmazonSQSFullAccess」が表示されない場合は既にポリシーが付与されていないかを確認しましょう。
これで準備は完了です。
Step2 SQSを構築する
AWSコンソールのサービス検索窓で「Simple Que Service」を選択します。 右上の「キューを作成」を押下し、次の画面で名前を入力します。他の要素はデフォルトのままで問題ないです。
無事にキューが作成出来たら、Lambdaトリガーのタブを選択し、「Lambda関数トリガーを設定」のボタンを押下します。 プルダウンから先ほど作成した関数を選択すれば完了です。
テスト
Queueの画面右上のメッセージを送受信を押下し、適当にメッセージ本文を入力、右上の「メッセージを送信」を押下します。 CloudWatchのロググループで、先ほど紐づけたLambda関数の実行ログを確認し、メッセージを送信した直後のログがあれば成功です。
せっかくなのでSQSのイベント情報を確認してみましょう。 キューから実行するLambda関数で‘print('event: ' , event)‘として、キューを実行します。
CloudEventWatchで確認すると、以下のようなイベントが渡されていることが分かりました。
event: {'Records': [ { 'messageId': '5c362fce-564a-4127-aa9d-2faea4401c4e', 'receiptHandle': '◆◆◆◆◆◆', 'body': 'test_message', 'attributes': { 'ApproximateReceiveCount': '1', 'SentTimestamp': '1624683601405', 'SenderId': '◆◆◆◆◆◆', 'ApproximateFirstReceiveTimestamp': '1624683601417' }, 'messageAttributes': {}, 'md5OfBody': '◆◆◆◆◆◆', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:%your_region%:◆◆◆◆◆◆:%your_Queue_Name%', 'awsRegion': '%your_region%' } ] }
以下のように記載すると、Lambdaではキューメッセージを変数化するだけでなく、送信日時、受信日時も変数として扱えます。 逆に言えば、キューを送ったクラントまでは分からない、ということのようです。
import datetime def lambda_handler(event, context): for record in event['Records']: # キューされたメッセージをpayloadという変数に格納 payload=record["body"] # キューが送付された日時をSentTimestampという変数に格納 UNISentTimestamp=record["attributes"]["SentTimestamp"] # UNIX型日時を通常の日時型に変換 SentTimestamp = datetime.datetime.fromtimestamp(int(UNISentTimestamp) / 1000) print('SentTimestamp: ',SentTimestamp) # キューを受信された日時をRecieveTimestampという変数に格納 UNIReceiveTimestamp =record["attributes"]["ApproximateFirstReceiveTimestamp"] # UNIX型日時を通常の日時型に変換 ReceiveTimestamp=datetime.datetime.fromtimestamp(int(UNIReceiveTimestamp)/1000) print('ReceiveTimestamp:' ,ReceiveTimestamp) # 以下、処理内容を記載 print('message: ', str(payload))
まとめ
Lambdaを実行するだけならAPIゲートウェイでもいいのだが、より設定が簡単なものを使ってみたいという方にはお勧めです。
次回
次はキューメッセージを送付する方法について纏めようと思います。