Warning: Undefined global variable $stdata321 in /home/physalis/physalisgp02.com/public_html/awsblog/wp-content/themes/affinger/functions.php on line 6855

Warning: Undefined global variable $stdata322 in /home/physalis/physalisgp02.com/public_html/awsblog/wp-content/themes/affinger/functions.php on line 6855

Warning: Undefined global variable $stdata323 in /home/physalis/physalisgp02.com/public_html/awsblog/wp-content/themes/affinger/functions.php on line 6855
DynamoDB Stream + Lambda + SNS(Simple Notification Service) のサンプルソース - とあるAWSエンジニアの戯言

IT技術に関する覚書

とあるAWSエンジニアの戯言

DynamoDB Stream + Lambda + SNS(Simple Notification Service) のサンプルソース

前説

次は、どの組み合わせのサンプルにしようか、若干迷ったのですが、DynamoDBへの登録処理のサンプルソースを掲載したので、DynamoDBに絡めて DynamoDB Stream からトリガーでLambdaを実行して、AWS SNSに通知するサンプルソースを選択してみました。 せっかくなのでサンプル2の多重度指定つき並行処理を雛形に組み上げしています。

githubにて公開

DynamoDB Stream + Lambda + SNS(Simple Notification Service) のサンプルソースです。

ダウンロードは下記より
https://github.com/SyoAwsBlog/ShoLambdaSample04

・Dynamo Streamから新規登録(INSERT)のレコードのみSNSに通知する

利用可能な環境変数

変数名 変数値
LogLevel ログの出力レベルを(0~4)までの間で設定する
LogLevelForWorker ログの出力レベルを(0~4)までの間で設定する(ワーカー処理(疑似スレッド処理))
ExecutorsThreadsNum 並行処理の多重度を数字で指定
ExecutorsThreadsWait ワーカー処理(疑似スレッド処理)毎に Sleepを入れられる(ミリ秒指定)
TargetSnsTopicArn 通知先のSNS TopicのARNを指定する
autoFunctionRetry 省略したらエラー時再実行はしない。0より大きい値(数字)を設定すると、その回数、再実行を行う

概要

組み上げには、サンプル2を利用してみました。転用して実装すると、こんなイメージになります。
動作確認にも、特にコードを変更せずに、関連サービスの設定とLambda環境変数指定で動くようにしてあります。

主要な基底処理

AbstractDynamoDbStreamCommon 120行目近辺

     return new Promise(function (resolve, reject) {
        if (event && event.Records) {
          resolve(event.Records);
        } else {
          reject("event.Records Not Exists");
        }
      });

がポイントになるでしょう。DynamoDB Streamからの引数の中身は、下記のようなイメージです。

Records配下に DynamoDB Stream で連携されてきた各行の変更内容が連携されてくるので、そのデータを取得します。

{
    "Records": [
        {
            "eventID": "0123456789XXXXXXXXXXXXXXXX",
            "eventName": "INSERT",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "リージョン名",
            "dynamodb": {
                "ApproximateCreationDateTime": 1600000000,
                "Keys": {
                    "PrimaryKey": {
                        "S": "XXXXX"
                    }
                },
                "NewImage": {
                    "ColumnB": {
                        "S": "YYYYY"
                    },
                    "ColumnC": {
                        "S": "ZZZZZ"
                    },
                    "SortKey": {
                        "S": "2020-07-01T00:00:00.000+0900"
                    },
                    "PrimaryKey": {
                        "S": "XXXXX"
                    }
                },
                "SequenceNumber": "0123456789XXXXXXXXXXXXXXXX",
                "SizeBytes": 88,
                "StreamViewType": "NEW_AND_OLD_IMAGES"
            },
            "eventSourceARN": "arn:aws:dynamodb:リージョン:AWSアカウントID:table/テーブル名/stream/2020-07-01T00:00:00.000"
        }
    ]
}

 

AbstractWorkerSnsCallTopicCommon


  AbstractWorkerSnsCallTopicCommon.prototype.AbstractBaseCommon.getTasks = function (
    event,
    context
  ) {
    var base = AbstractWorkerSnsCallTopicCommon.prototype.AbstractBaseCommon;
    try {
      base.writeLogTrace("AbstractWorkerSnsCallTopicCommon# getTasks : start");
      return [
        this.beforeMainExecute,
        this.extractBizInfos,
        this.transformRecordInfos,
        this.businessMainExecute,
        this.afterMainExecute,
      ];
    } catch (err) {
      base.printStackTrace(err);
      throw err;
    } finally {
      base.writeLogTrace("AbstractWorkerSnsCallTopicCommon# getTasks : end");
    }
  };
  • beforeMainExecute  ・・・ DynamoDB Streamの1行データを返却
  • extractBizInfos ・・・ 新規登録行のみを連携データとする判定
  • transformRecordInfos ・・・  AWS SDKのAPI利用用にパラメータ組み立て
  • businessMainExecute ・・・ API実行
  • afterMainExecute ・・・ API実行後のログ出力

といった処理の流れになっています。

動作させる為のAWS設定手順は、別記事にて掲載予定。

  • B!