EventBridge PipesをCloudFormationで実装してみた

AWS

はじめに

SQSからStep Functionsを実行するためにPipesを使いました。

今回はCloudFormationで実装したので、備忘録としてコードを残しておきます。

CloudFormationテンプレート

※ 一部抜粋

  DemoPipe:
    Type: AWS::Pipes::Pipe
    Properties:
      Target: !GetAtt DemoStateMachine.Arn
      RoleArn: !GetAtt AmazonEventBridgePipeExecution.Arn
      Source: !GetAtt DemoQueue.Arn
      SourceParameters:
        SqsQueueParameters:
          BatchSize: 1
      TargetParameters:
        StepFunctionStateMachineParameters:
          InvocationType: FIRE_AND_FORGET
        InputTemplate: |-
          {
          "hoge": "<$.body.hoge>"
          }
  AmazonEventBridgePipeExecution:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: pipes.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: SqsPipeSource
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - sqs:ReceiveMessage
                  - sqs:DeleteMessage
                  - sqs:GetQueueAttributes
                Resource: !GetAtt DemoQueue.Arn
        - PolicyName: StepFunctionsPipeTarget
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - states:StartExecution
                Resource: !GetAtt DemoStateMachine.Arn

メモ

  • 適宜、BatchSizeの値は設定する
  • InvocationType: FIRE_AND_FORGET はステートマシンを非同期実行にする
  • InputTemplateはSQSメッセージからbodyプロパティだけを抽出する
    • SQSで受信したときの情報だけにする
    • デバッグなどを考慮すると他プロパティを残すことをありかもしれない
  • 実行ロール
    • ポリシー名は適当
    • 必要なアクションなどは一度コンソールで作成したときのものを模倣したはず

ターゲットの呼び出しタイプ

2つある。

  • 同期(REQUEST_RESPONSE)
  • 非同期(FIRE_AND_FORGET)

By default, for pipes with ordered sources, EventBridge invokes targets synchronously because a response from the target is needed before proceeding to the next event.

デフォルトは同期。

ordered sourcesとして該当するものを考えると、DynamoDB Streamsは一定の条件において該当すると考える。

For each item that is modified in a DynamoDB table, the stream records appear in the same sequence as the actual modifications to the item.

各アイテムの操作は順序が保証されるので、これをPipesでも保証したいときは同期呼び出しを選択するのがよいだろう。

If an source doesn’t enforce order, such as a standard Amazon SQS queue, EventBridge can invoke a supported target synchronously or asynchronously.

標準のSQSの場合に、同期タイプを選択をするユースケースはどのようなものがあるのか気になる。

つまり、ソース側の順序保証も気にしなくてもよい場合に同期でターゲットを実行しなければならないケースに何があるのかということ。

Pipes側で実行結果のエラーハンドリングをしたいときに同期を選択することがあるかもしれない。(非同期だとターゲットを呼び出すだけになるから)

おわりに

SQSからStep Functionsを実行したい場合にはPipesが利用できる。

Cfnテンプレートで実装したい場合、コンソールからエクスポートする機能があるのでうまく活用できるだろう。

記事にするにあたりAWSドキュメントを読んだのだが、複雑な内容もあると感じた。

無料相談実施中
AWSを使用したサーバーレスアプリケーションの構築
サーバーレス開発内製化、AWSへの移行等
様々な課題について無料でご相談お受けいたします。
最新情報をチェックしよう!