はじめに
SQSからStep Functionsを実行するためにPipesを使いました。
今回はCloudFormationで実装したので、備忘録としてコードを残しておきます。
CloudFormationテンプレート
※ 一部抜粋
DemoPipe:
Type: AWS::Pipes::Pipe
Properties:
Target: !GetAtt DemoStateMachine.Arn
RoleArn: !GetAtt AmazonEventBridgePipeExecution.Arn
Source: !GetAtt DemoQueue.Arn
SourceParameters:
SqsQueueParameters:
BatchSize: 1
TargetParameters:
StepFunctionStateMachineParameters:
InvocationType: FIRE_AND_FORGET
InputTemplate: |-
{
"hoge": "<$.body.hoge>"
}
AmazonEventBridgePipeExecution:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: pipes.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: SqsPipeSource
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
Resource: !GetAtt DemoQueue.Arn
- PolicyName: StepFunctionsPipeTarget
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- states:StartExecution
Resource: !GetAtt DemoStateMachine.Arn
メモ
- 適宜、BatchSizeの値は設定する
InvocationType: FIRE_AND_FORGET
はステートマシンを非同期実行にする- InputTemplateはSQSメッセージからbodyプロパティだけを抽出する
- SQSで受信したときの情報だけにする
- デバッグなどを考慮すると他プロパティを残すことをありかもしれない
- 実行ロール
- ポリシー名は適当
- 必要なアクションなどは一度コンソールで作成したときのものを模倣したはず
ターゲットの呼び出しタイプ
2つある。
- 同期(REQUEST_RESPONSE)
- 非同期(FIRE_AND_FORGET)
By default, for pipes with ordered sources, EventBridge invokes targets synchronously because a response from the target is needed before proceeding to the next event.
デフォルトは同期。
ordered sourcesとして該当するものを考えると、DynamoDB Streamsは一定の条件において該当すると考える。
For each item that is modified in a DynamoDB table, the stream records appear in the same sequence as the actual modifications to the item.
各アイテムの操作は順序が保証されるので、これをPipesでも保証したいときは同期呼び出しを選択するのがよいだろう。
If an source doesn’t enforce order, such as a standard Amazon SQS queue, EventBridge can invoke a supported target synchronously or asynchronously.
標準のSQSの場合に、同期タイプを選択をするユースケースはどのようなものがあるのか気になる。
つまり、ソース側の順序保証も気にしなくてもよい場合に同期でターゲットを実行しなければならないケースに何があるのかということ。
Pipes側で実行結果のエラーハンドリングをしたいときに同期を選択することがあるかもしれない。(非同期だとターゲットを呼び出すだけになるから)
おわりに
SQSからStep Functionsを実行したい場合にはPipesが利用できる。
Cfnテンプレートで実装したい場合、コンソールからエクスポートする機能があるのでうまく活用できるだろう。
記事にするにあたりAWSドキュメントを読んだのだが、複雑な内容もあると感じた。