はじめに
LambdaでETL処理を実装する際に、冪等にするために考えたことをまとめます。
今回前提とするETL処理は、S3のデータをインプットとして変換後のデータを別のS3バケットにアウトプットするというものになります。またインプットS3データは、Lambdaによって作成されるものとします。
具体的な内容に入る前に 2つ補足説明します。
- ETL とは
- Lambdaで冪等性を考慮する必要性
メインの内容としては、直近の案件で用いた冪等性の対策についての1例を紹介します。
その後、他のパターンなども紹介できればと思います。
ETL
とは
下記単語の略であり、そのデータフローのことを指します。 本記事で扱う解釈と近い説明は、wikiになると思います。
- Extract(抽出)
- Transform(変換)
- Load(格納)
Lambdaで冪等性を考慮する必要性
冪等性という単語の意味をざっくり説明すると、次のとおりです。
「ある操作を 1 回行っても複数回行っても結果が同じである」という概念。
awsの記事では下記の通り説明されています。
プログラミングにおいて、冪等性とは、イベントの 繰り返しを識別し、データの重複、矛盾、損失を防ぐアプリケーションまたはコンポーネントの能力のことをいいます。AWS Lambda 関数を冪等にするには、重複するイベントを正しく処理するように関数ロジックを設計する必要があります。
引用文で記載されている通り、Lambdaでは重複するイベントを正しく処理するロジックを実装する必要があります。したがって、冪等性を考慮しなければなりません。
重複するイベントが発生する可能性に関するソースは下記リンクが参考になるかと思います。
上記リンクからの引用文です。
同期呼び出しの場合、エラーを含む関数レスポンスの詳細が、レスポンス本文とヘッダーに含まれます。どちらの呼び出しタイプについても、詳細は、実行ログおよびトレースに記載されています。 エラーが発生すると、関数が複数回呼び出される可能性があります。再試行の動作は、エラータイプ、クライアント、イベントソース、呼び出しタイプによって異なります。たとえば、関数を非同期に呼び出してエラーが返された場合、Lambda は関数を最大 2 回実行します。詳細については、「再試行動作」を参照してください。 非同期呼び出しの場合、Lambda はイベントを、関数に送信する前にキューに追加します。関数に、キューに対応する十分な容量がない場合、イベントが失われる可能性があります。場合によっては、エラーが発生しなくても、関数が同じイベントを複数回受信することがあります。処理されなかったイベントを保持するには、デッドレターキューで関数を設定します。
Amazon S3 イベント通知は、少なくとも 1 回配信されるように設計されています。通常、イベント通知は数秒で配信されますが、1 分以上かかる場合もあります。
Amazon SQS FIFO queues ensure that the order of processing follows the message order within a message group. However, it does not guarantee only once delivery when used as a Lambda trigger. If only once delivery is important in your serverless application, it’s recommended to make your function idempotent . You could achieve this by tracking a unique attribute of the message using a scalable, low-latency control database like Amazon DynamoDB.
冪等性の対策
対策1)日付情報などを含めてアウトプットファイルを常に上書きする
重要なことなのではじめに記載しますが、本対策はAWSが提案している冪等性のための対策には沿っていません。AWSが提案している対策はこの後で紹介します。
本対策シンプルに説明すると、S3イベント通知によりLambdaが複数回コールされた場合でも、
各Lambdaで全く同じ処理を実行し、アウトプットもS3の同じバケット/キーに出力する(すなわち同一データで上書きする)方法です。
このとき同じアウトプットにするために、インプットの日時情報などをアウトプットの名前に利用します。インプットの日時情報の候補としては、S3から取得できるLastModified
の値やインプットファイル名に付与した情報などがあるかと思います。
インプットの日付情報を含める理由としては、今回の案件独自の要件が起因します。
- アウトプットのファイルは、インプットファイルの作成毎に1:1で作成する
- ①を満たすためにアウトプットのsuffixに日時情報をいれる必要あり
- 当初、lambdaのプログラム実行時刻をsuffixにする仕様だったのですが、S3イベント通知により複数回Lambdaが実行されると同じ処理結果のアウトプットが複数作成されてしまう
本ケースでは下記ポイントが制約になります。
- 同じ日時で複数のインプットファイルが作成されることはない
本ケースの入出力イメージは下記のとおりです。
【S3インプット】
s3://input-bucket/input_object_20220406123045.csv
【S3アウトプット】
s3://output-bucket/output_object_20220406123045.json
また今回は、日時情報をsuffixにしていますが他の情報を参照するほうが好ましいかもしれません。
versionId
: S3バージョニングを利用している場合- 固有属性の値
- ETag(要件によっては利用できるかもしれません)
対策2) 入力イベントを検証し、そのイベントが処理済かどうかを識別する
AWSが公開しているLambda 関数を冪等にするにはどうすればよいですか?では、「解決方法」セクションで下記のように説明されています。
関数を冪等にするには、関数のコードによって、入力イベントを適切に検証し、そのイベントが以前に処理されたことがあるかどうかを識別する必要があります。アプリケーションの動作によって、最善のコードの書き方が決まります。 ガイダンスとして、以下の関数ロジックの例やベストプラクティスをご使用ください。
「冪等 Lambda 関数ロジックの例」セクションで説明されているロジック例が今回のETLユースケースでも参考にできるかと思います。
- 入力イベントの固有属性の値を抽出します。(※ 今回の要件であれば、日時情報を含むインプットファイル名)
- 属性値がコントロールデータベース(DynamoDB)に存在するかどうかを確認します。 次に、結果に応じて以下を実行します。
- 固有の値が存在する場合は、エラーを生成せずにアクションを終了します。
- または、固有の値が存在しない場合は、元々設計してあるアクション(ETL処理)に進みます。
3. 関数の動作が終了したら、コントロールデータベースにレコードを含めます。
4. アクションを終了します。
上記ロジックを組み込むことでLambda関数を冪等にできます。
また、より厳密に制御したい場合は下記記事のようにロック/アンロックのロジックにすると良いかと思います。
上記記事では、条件付きの書き込みを用いて実装されています。
dynamodb.put_item(
TableName = 'test-lambda',
Item = {'filename':{'S':key},'status':{'S':'complete!'}},
Expected = {'filename':{'Exists':False}}
)
Expected
はレガシーパラメータなので、下記のように実装するとよいかもしれません。
ConditionExpression='attribute_not_exists(deviceId)'
dynamodb.put_item(
TableName = 'test-lambda',
Item = {'filename':{'S':key},'status':{'S':'complete!'}},
ConditionExpression='attribute_not_exists(filename)'
)
対策3)AWS Lambda Powertools (Python)で冪等にする
考え方は対策2と同じになります。
詳細は、下記記事が参考になります。
2つ目の記事では色々と検証されています。
その他
今回のシナリオとは異なりますが、下記リンクはサーバーレスの冪等性についてかなり丁寧に説明されています。ETLに対する冪等性についても説明されています。
まとめ
LambdaでETLを行う際の冪等性について、直近の案件で行った対策や参考になる事例を紹介しました。全てのユースケースに適応できる1つの冪等性対策はないと思ったので、都度最適な対策を選択できるようにしていきたいです。