前説
サンプル4、サンプル5とデータ連携シリーズを掲載してきたので、今風に言うとデータレイク(S3)へのCSVファイル出力するの為にFirehoseへ連携するLambdaをサンプルとして公開しようと思います。
- S3にファイルをputし、putトリガーでLambdaを起動
- DynamoDBに取り込み
- DynamoDBからストリームでSNSへ配信
- SQSにデータ蓄積
- SQSからデータを取り出して分析用に加工しながらFirehose(S3)へ出力
とデータ連携システムをLambdaで実現する時の主要な利用AWSサービス群と、ちょっと編集するだけで、繋がる・動くLambdaが用意できるようになります。
githubにて公開
SQS + Lambda + Firehose のサンプルソースです。 ダウンロードは下記より https://github.com/SyoAwsBlog/ShoLambdaSample06
- SQSから一度に取得できるメッセージ数はAPIで10件まで上限があるので指定件数・リトライ数で可能な限り受信
- データ連携を想定してるのでFirehoseへのputは成功するまでリトライ(落ちる時はLambdaの実行時間上限)
- Firehoseへのputに成功したらSQSキューからメッセージを削除
- SQSにメッセージが残っているようなら自身(Lambda)を再起呼び出し(上限設定あり)
などの仕様を盛り込んでいます。
利用可能な環境変数
今回は、ちょっと利用できる環境変数が多いです。(^^A
変数名 | 変数値 |
---|---|
LogLevel | ログの出力レベルを(0~4)までの間で設定する |
LogLevelForWorker | ログの出力レベルを(0~4)までの間で設定する(ワーカー処理(疑似スレッド処理)) |
ExecutorsThreadsNum | 並行処理の多重度を数字で指定 |
ExecutorsThreadsWait | ワーカー処理(疑似スレッド処理)毎に Sleepを入れられる(ミリ秒指定) |
SqsReceiveMaxSize | SQSから受信するメッセージ最大数(リトライして受信) |
SqsReceiveErrRetryMax | SQSから受信エラーが発生した時のリトライ上限 |
SqsQueueUrl | メッセージを受信するSQSのURLを指定 |
SelfReCallMax | SQSにメッセージが残っている場合に自身(Lambda)を再起呼び出しするが、その回数上限 |
FirehoseStreamName | 出力対象のストリーム名 |
概要
今回もサンプル2を雛形として採用しています。
前回同様、クラス構成・カスタマイズの容易性を考えた時に、サンプル2の方が分かりやすいと考えた為です。
どういう事が言うと、、、
「SQSからデータを読み込むだけ読み込んで連携処理を委譲する。データ残件があるなら再処理する」と「連携データを受け取ってデータ加工し出力対象(今回はFirehose)へデータ連携」が別クラスになっていた方がオーバーライドやカスタマイズが容易になるからという理由です。
具体例で言うと出力先をFirehoseではなく、外部API呼び出しにカスタマイズするといった時に、ワーカークラスを修正するだけで対応が可能となるからです。
主要な基底処理(SQSデータ読み込み~自身(Lambda)の再起呼び出し)
まずは制御側のSQSからデータを読み込んでいる処理の簡単な流れから説明します。
AbstractSqsReceiveCommon 624行目付近
AbstractSqsReceiveCommon.prototype.AbstractBaseCommon.getTasks = function (
event,
context
) {
var base = AbstractSqsReceiveCommon.prototype.AbstractBaseCommon;
try {
base.writeLogTrace("AbstractSqsReceiveCommon# getTasks :start");
return [
this.beforeMainExecute,
this.businessMainExecute,
this.afterMainExecute,
this.reCallSelfExecute,
];
} catch (err) {
base.printStackTrace(err);
throw err;
} finally {
base.writeLogTrace("AbstractSqsReceiveCommon# getTasks :end");
}
};
- beforeMainExecute ・・・ 指定件数の受信ができるか、0件受信になるまでSQSから受信
- businessMainExecute ・・・ 非同期多重実行の制御
- afterMainExecute ・・・ SQSのキュー情報(メッセージ残件)を取得
- reCallSelfExecute ・・・ SQSにメッセージが残っているようなら自身(Lambda)を再起呼び出し
この処理を実装しようとした時に、一番、驚いたことは、「SQSからメッセージ受信が1回につき10件まで」に制限されている事です。
SAA資格とかで勉強した事がある人は分かると思いますが、「SQSは耐久性があって大量データをキューイングできる。」という触れ込みの設問が多々出題されます。
Lambdaで処理しようとした時に、「APIでMAX10件しか取得できない!」って、「どーゆーことやねん!」と思った物です。
そんな訳で、「環境変数:SqsReceiveMaxSize」で指定して、ある程度の件数を捌けるようにしてます。
連携データ仕様にもよりますが、実体験で50~100件位で処理させる事が多いですかね。
それでも処理しきらない時の為に、SQSに残件があるかどうか確認して自身(Lambda)を再起呼び出しする機能も組み込んであります。
カスタマイズする時は、バグらせて無限ループさせないように注意してください。
やらかして「Lambdaを無限ループさせちまった!?」という時は、AWSコンソールから対象のLambda関数を削除してください。
主要なワーカー処理(Firehoseへのput処理&キューからメッセージ削除)
制御側から受け取ったデータをFirehoseへputして、SQSキューから処理したメッセージを削除します。
AbstractFirehosePutWorkerCommon ・・・ 492行付近
AbstractFirehosePutWorkerCommon.prototype.AbstractBaseCommon.getTasks = function (
event,
context
) {
var base = AbstractFirehosePutWorkerCommon.prototype.AbstractBaseCommon;
try {
base.writeLogTrace("AbstractFirehosePutWorkerCommon# getTasks : start");
return [
this.beforeMainExecute,
this.extractBizInfos,
this.transformRecordInfos,
this.businessMainExecute,
this.afterMainExecute,
];
} catch (err) {
base.printStackTrace(err);
throw err;
} finally {
base.writeLogTrace("AbstractFirehosePutWorkerCommon# getTasks : end");
}
};
- beforeMainExecute ・・・ 制御側から受け取ったデータを返却
- extractBizInfos ・・・ データの展開(どのサービスを経由してきたによって要オーバーライド)
- transformRecordInfos ・・・ DynamoDBのデータ構造を単純なマップ構造に変換
- businessMainExecute ・・・ FirehoseへのPut処理
- afterMainExecute ・・・ 対象データをSQSキューから削除
Firehoseへの登録に成功しない限り何度でもリトライするようにしてあるので、「環境変数:FirehoseStreamName」のストリーム名やLambdaに付与するIAMロールのポリシー付与は絶対に、絶対に間違わないでください。
Lambdaの実行時間上限まで動作する事になります。
Firehoseへのputに成功したらSQSキューから削除としたかったので、Firehoseのput処理成功時の次のPromise処理に、SQSキューからの削除を配置しています。
主要なカスタマイズ①(出力レイアウト)
AbstractFirehosePutWorkerCommon.businessMainExecuteから、getFileOutputLayoutを呼び出して出力フォーマットを変換しています。
getFileOutputLayout ・・・ 338行目付近 recordには対象の行データ
var recordData = "";
var cols = base.getFileOutputColumns();
for (var i = 0; i < cols.length; i++) { var colName = cols[i]; if (i > 0) {
recordData = recordData + ",";
}
if (
colName in record &&
record[colName] &&
record[colName].length > 0
) {
recordData = recordData + "" + record[colName];
} else {
recordData = recordData + "";
}
}
recordData = recordData + "\n";
という感じで、カンマ区切りのCSVデータに変換しています。
主要なカスタマイズ➁(出力項目)
AbstractFirehosePutWorkerCommon.getFileOutputLayoutから、getFileOutputColumnsを呼び出して出力対象とするDynamoDBの項目名を配列で決定しています。
こちらは、カスタマイズのオーバーライド例として、SampleFirehosePutWorkerModule側にオーバーライド関数:getFileOutputColumnsを実装しています。
SampleFirehosePutWorkerModule ・・・ 30行付近
SampleFirehosePutWorkerModule.prototype.AbstractBaseCommon.getFileOutputColumns = function (
record
) {
var base = SampleFirehosePutWorkerModule.prototype.AbstractBaseCommon;
try {
base.writeLogTrace(
"SampleFirehosePutWorkerModule# getFileOutputColumns : start"
);
var cols = ["PrimaryKey", "SortKey"];
return cols;
} catch (err) {
base.printStackTrace(err);
throw err;
} finally {
base.writeLogTrace(
"SampleFirehosePutWorkerModule# getFileOutputColumns : end"
);
}
}.bind(SampleFirehosePutWorkerModule.prototype.AbstractBaseCommon);
colsに代入する配列に、DynamoDBの項目を設定すると、その順序でCSVデータが並び替えられて出力されます。
データの送出元となったDynamoDBの項目名に合わせて、この関数を書き換えてください。
後は、Lambdaの環境変数を整備してIAMロール&ポリシーを整えるだけで流用できると思います。
あとがき
データ連携シリーズ(サンプル4、サンプル5、サンプル6(今回))は、いかがだったでしょうか?
Firehoseまで連携できれば、ElasticsearchやS3に置いてglue&Athenaなど分析系サービスにデータを繋げていく事ができるので、業務に応用のきく実戦的なLambdaを掲載できたのではないかと思います。