Lambdaサンプルソース

S3 putトリガー + Lambda + DynamoDB のサンプルソース

前説

Webアプリやデータ連携バッチで、S3のputトリガーを起点にしてLambdaを起動して何らかの処理をする。Lambdaの使い方としてはアイデアとして思いつきやすい用法ですが、実は考慮しておいた方が良い点があるので、実体験で得た情報と共に説明していきます。 今回も出力側は、DynamoDBを選択しましたが、batchWriteItemのAPIで複数件データを登録&リトライ処理する辺りについても触れていきます。

だんだん、サンプルと言いながらソース記述量が増えていってるのは、突っ込まないで!

具体的なAWSサービス同士の連携を考えると、考慮しておきたい機構や仕組みなんかが増えていく。。。

githubにて公開

S3 putトリガー + Lambda + DynamoDB のサンプルソースです。 ダウンロードは下記より https://github.com/SyoAwsBlog/ShoLambdaSample05

  • S3でputトリガーからのLambda起動を使う際の注意点
  • DynamoDBへの複数件一括登録の再起呼び出しリトライループ

などの小技を織り込んでみました。

利用可能な環境変数

変数名変数値
LogLevelログの出力レベルを(0~4)までの間で設定する
LogLevelForWorkerログの出力レベルを(0~4)までの間で設定する(ワーカー処理(疑似スレッド処理))
ExecutorsThreadsNum並行処理の多重度を数字で指定
ExecutorsThreadsWaitワーカー処理(疑似スレッド処理)毎に Sleepを入れられる(ミリ秒指定)
BatchPutSizebatchWriteItemを処理する件数、リンク先のドキュメントに記載があるが最大25件未満なので、1~24までの間で設定すること
ObjectWaitForRetryMaxS3オブジェクトのアクセス可否を何回待機するかを設定する。平常時なら2~3回で十分、運用に耐えるハズである。極まれにあるS3の障害時などは、回数を増やすのもアリ。
TableName読み込んだデータを登録するDynamoDBのテーブル名
PrimaryKeyDynamoDBのキー名
SortKeyDynamoDBのソートキー名
DynamoPutForRetryMax batchWriteItemでデータ登録する際に、一度に処理しきれない事があるので、再起呼び出しでリトライする処理が組み込んである。リトライ上限を設定する。
DynamoDBのテーブルのキャパシティにもよるが、回数がかかっても3~4回の間位で大体処理できる(と思われる)
「BatchPutSize」と「テーブルのキャパシティ」の兼ね合いになる。

概要

今回も組み上げには、サンプル2を利用しました。
サンプル集として、「S3からファイルを読み込んで正常に処理できたらオブジェクトを削除する」「読み込んだデータを受け取って処理する(今回はDynamoDBへ登録する)」別クラスになっていた方が別用途での使い回しが効きやすいし、コードが読める人には読みやすいという理由からの選択です。

DynamoDBの書き込みキャパシティは大量に増やすとAWS利用料の課金が増えるので注意してくださいね。

書き込みキャパシティにデカい数字を割り当てられる程、お財布事情に余裕があるのであれば、存分に多重非同期の効果を確認してみてください。

主要な基底処理(S3のデータ読み込み~オブジェクト削除)

まずはS3からのトリガーを処理している方の簡単な流れから説明します。

AbstractS3PutTriggerCommon  540行目近辺

  AbstractS3PutTriggerCommon.prototype.AbstractBaseCommon.getTasks = function (
    event,
    context
  ) {
    var base = AbstractS3PutTriggerCommon.prototype.AbstractBaseCommon;
    try {
      base.writeLogTrace("AbstractS3PutTriggerCommon# getTasks :start");

      return [
        this.initS3PutTriggerParameter,
        this.waitExistsS3Object,
        this.beforeMainExecute,
        this.businessMainExecute,
        this.afterUnProcessedItemCheck,
        this.afterMainExecute,
      ];
    } catch (err) {
      base.printStackTrace(err);
      throw err;
    } finally {
      base.writeLogTrace("AbstractS3PutTriggerCommon# getTasks :end");
    }
  };
  • initS3PutTriggerParameter ・・・ AWS SDK で S3 の各APIで使う汎用引数を生成
  • waitExistsS3Object ・・・ putトリガーで対象となったS3オブジェクトのアクセス待機
  • beforeMainExecute ・・・ S3オブジェクトからファイル読み込み、処理データ整形
  • businessMainExecute ・・・ 非同期多重実行の制御
  • afterUnProcessedItemCheck ・・・ DynamoDBへの登録残件の有無判定
  • afterMainExecute ・・・ S3のオブジェクト削除

それぞれの関数の詳細は、実際の関数を見てもらうとして、、、「やってみた系」では、よく省略されている、「~S3.waitFor("objectExists"~」の必要性について訴えていきたいと思います。

S3のputトリガーはLambdaで、すぐにアクセスすると「オブジェクトが存在しない!」というエラーでErrorをスローする事があります。

Errorをスローする事があります。(大事な事なので2回書きます)

もう1つ、紛らわしい事があってS3の障害時や、ちょっとだけ調子が悪くなった時(ネットワーク遅延)などに、1回のS3オブジェクト保存で、putトリガーが同じ内容で2~3回起動される事があります。

後続の処理では、リトライ&再実行が行われても問題ないような連携仕様にしておくと幸せになれる。と留意が必要です。

その点、DynamoDBへの連携は上書きされるだけなので、選択肢としては申し分なしですね。

主要なワーカー処理(DynamoDBへの一括登録)

制御側から受け取ったデータをDynamoDBへ一括登録する処理になります。

AbstractWorkerDynamoDbBatchPutCommon ・・・490行付近

 AbstractWorkerDynamoDbBatchPutCommon.prototype.AbstractBaseCommon.getTasks = function (
    event,
    context
  ) {
    var base =
      AbstractWorkerDynamoDbBatchPutCommon.prototype.AbstractBaseCommon;
    try {
      base.writeLogTrace(
        "AbstractWorkerDynamoDbBatchPutCommon# getTasks :start"
      );

      return [
        this.beforeMainExecute,
        this.transformRecordInfos,
        this.businessMainExecute,
      ];
    } catch (err) {
      base.printStackTrace(err);
      throw err;
    } finally {
      base.writeLogTrace("AbstractWorkerDynamoDbBatchPutCommon# getTasks :end");
    }
  };
  • beforeMainExecute ・・・ 制御側から受け取ったデータを返却
  • transformRecordInfos ・・・ オーバライド用関数の呼び出しとデータ重複排除処理
  • businessMainExecute ・・・ DynamoDBへの登録実行&リトライ

DynamoDBには、batchWriteItemという複数データを一度に登録するAPIがありますが、データを渡しても必ずしも登録される訳ではない。というのが注意点になりますかね。
APIのレスポンス情報の、UnprocessedItemsに登録されなかった情報が返却されるので、登録されなかった分は使い手側でハンドリングしろよ。という仕様です。
サンプルはリトライ上限(環境変数で設定)を設けて、処理して貰えるまで何度かリトライ(1秒待機)するという感じにしてあります。

主要なカスタマイズ(読み込み行データからテーブル項目への変換)

SampleWorkerDynamoDbBatchPutModule ・・・ 30行目付近

      var argLine = String(record);
      base.writeLogTrace(
        "SampleWorkerDynamoDbBatchPutModule# argLine" + argLine
      );
      var values = argLine.split(",");

      var primaryKey = base.getPrimaryKey();
      var sortKey = base.getSortKey();

      var rtnObj = {};
      rtnObj[primaryKey] = { S: values[0] };
      rtnObj[sortKey] = { S: values[1] };
      rtnObj.ColumnB = { S: values[2] };

      base.writeLogTrace(
        "SampleWorkerDynamoDbBatchPutModule# JSON :" + JSON.stringify(rtnObj)
      );

      return rtnObj;


ファイルから読み込んだ1行のデータを引数で受けて、DynamoDBに登録する為の項目マッピングする部分は、末端のクラスに関数を切り出してあるので、DynamoDBのカラム&設計に合わせて書き直すだけで流用できるようにしてあります。

SortKeyをテーブルに定義していない場合は、基底側(AbstractBaseCommon.transformRecordInfos)の一括登録データ内の重複データを排除する処理に修正が必要です。

スポンサーリンク

-Lambdaサンプルソース
-, , , , ,