前説
次は、どの組み合わせのサンプルにしようか、若干迷ったのですが、DynamoDBへの登録処理のサンプルソースを掲載したので、DynamoDBに絡めて DynamoDB Stream からトリガーでLambdaを実行して、AWS SNSに通知するサンプルソースを選択してみました。 せっかくなのでサンプル2の多重度指定つき並行処理を雛形に組み上げしています。
githubにて公開
DynamoDB Stream + Lambda + SNS(Simple Notification Service) のサンプルソースです。
ダウンロードは下記より
https://github.com/SyoAwsBlog/ShoLambdaSample04
・Dynamo Streamから新規登録(INSERT)のレコードのみをSNSに通知する
利用可能な環境変数
変数名 | 変数値 |
---|---|
LogLevel | ログの出力レベルを(0~4)までの間で設定する |
LogLevelForWorker | ログの出力レベルを(0~4)までの間で設定する(ワーカー処理(疑似スレッド処理)) |
ExecutorsThreadsNum | 並行処理の多重度を数字で指定 |
ExecutorsThreadsWait | ワーカー処理(疑似スレッド処理)毎に Sleepを入れられる(ミリ秒指定) |
TargetSnsTopicArn | 通知先のSNS TopicのARNを指定する |
autoFunctionRetry | 省略したらエラー時再実行はしない。0より大きい値(数字)を設定すると、その回数、再実行を行う |
概要
組み上げには、サンプル2を利用してみました。転用して実装すると、こんなイメージになります。
動作確認にも、特にコードを変更せずに、関連サービスの設定とLambda環境変数指定で動くようにしてあります。
主要な基底処理
AbstractDynamoDbStreamCommon 120行目近辺
return new Promise(function (resolve, reject) {
if (event && event.Records) {
resolve(event.Records);
} else {
reject("event.Records Not Exists");
}
});
がポイントになるでしょう。DynamoDB Streamからの引数の中身は、下記のようなイメージです。
Records配下に DynamoDB Stream で連携されてきた各行の変更内容が連携されてくるので、そのデータを取得します。
{
"Records": [
{
"eventID": "0123456789XXXXXXXXXXXXXXXX",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "リージョン名",
"dynamodb": {
"ApproximateCreationDateTime": 1600000000,
"Keys": {
"PrimaryKey": {
"S": "XXXXX"
}
},
"NewImage": {
"ColumnB": {
"S": "YYYYY"
},
"ColumnC": {
"S": "ZZZZZ"
},
"SortKey": {
"S": "2020-07-01T00:00:00.000+0900"
},
"PrimaryKey": {
"S": "XXXXX"
}
},
"SequenceNumber": "0123456789XXXXXXXXXXXXXXXX",
"SizeBytes": 88,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:リージョン:AWSアカウントID:table/テーブル名/stream/2020-07-01T00:00:00.000"
}
]
}
AbstractWorkerSnsCallTopicCommon
AbstractWorkerSnsCallTopicCommon.prototype.AbstractBaseCommon.getTasks = function (
event,
context
) {
var base = AbstractWorkerSnsCallTopicCommon.prototype.AbstractBaseCommon;
try {
base.writeLogTrace("AbstractWorkerSnsCallTopicCommon# getTasks : start");
return [
this.beforeMainExecute,
this.extractBizInfos,
this.transformRecordInfos,
this.businessMainExecute,
this.afterMainExecute,
];
} catch (err) {
base.printStackTrace(err);
throw err;
} finally {
base.writeLogTrace("AbstractWorkerSnsCallTopicCommon# getTasks : end");
}
};
- beforeMainExecute ・・・ DynamoDB Streamの1行データを返却
- extractBizInfos ・・・ 新規登録行のみを連携データとする判定
- transformRecordInfos ・・・ AWS SDKのAPI利用用にパラメータ組み立て
- businessMainExecute ・・・ API実行
- afterMainExecute ・・・ API実行後のログ出力
といった処理の流れになっています。
動作させる為のAWS設定手順は、別記事にて掲載予定。