StepFunctionsのDistributedMapとSESのConfigurationSetを利用したメール配信検証

AWS

今回は、サーバーレスアーキテクチャに基づいたバックエンド中心のメール配信機能を構築しました。この実装では、Amazon Simple Email Service (SES) が提供する Configuration Set を活用し、以下の機能を実現しました:

  1. バウンスメール処理
  2. メール開封率の追跡
  3. メール内のリンクのクリック数の測定

さらに、大規模なメール配信を効率的に処理するため、AWS Step Functions の Distributed Map を利用した並列処理の検証を行いました。これにより、以下の利点が得られました:

  • 高いスケーラビリティ:多数の受信者へのメール送信を並列で処理
  • 処理の効率化:大量のメールデータを分散して処理することによる時間短縮
  • リソースの最適化:必要に応じて動的にリソースを割り当て

この アプローチ により、高性能で柔軟性の高いメール配信システムを実現し、大規模なメールキャンペーンや通知システムにも対応可能な基盤を構築しました。

アーキテクチャの概要

以下の図は、今回構築したアーキテクチャの全体像です。

StepFunctionsとSESを含む使用したAWSサービス

  1. AWS CDK: 各Resourcesを作成
  2. Amazon S3: 対象となるメーリングリストを静的ファイルとして登録している。
  3. AWS Step Functions: DistributedMapを利用して並列処理でメールを送信している。
  4. AWS Lambda: メール送信・配信的・バウンス処理を実行
  5. Amazon SES (Simple Email Service): メールの送信。
  6. Amazon DynamoDB: 送信リストを管理し、重複送信防止、バウンス・苦情の管理をしている。
  7. Amazon SNS: SESのConfigurartionSetのEventを受け取ってLambdaからSubscribeする。


CDKでDistributedMapを作成する

     # Create a Step Functions task to get an item from DynamoDB
        get_item_task = tasks.DynamoGetItem(
            self, "GetDynamoDBItem",
            table=DYNAMODB_TABLE_NAME,
            key={
                "mail_id": tasks.DynamoAttributeValue.from_string(sfn.JsonPath.string_at("$.mail_id"))
            },
            result_path="$.dynamo_result"
        )

        # Create the DistributedMap state
        distributed_map = sfn.DistributedMap(
            self, "DistributedMap",
            max_concurrency=10, #並列処理を10に制限
            item_reader=sfn.S3CsvItemReader(
                bucket=bucket,
                key=sfn.JsonPath.string_at("$.file_name")
            ),
            item_selector={
                "mail_data.$": "$.dynamo_result.Item",
                "record.$": "$$.Map.Item.Value"
            },
            result_path="$.results"
        )
        # Create a Wait state
        wait_state = sfn.Wait(
            self, "WaitState",
            time=sfn.WaitTime.duration(Duration.seconds(1))
        )

        distributed_map.item_processor(
            tasks.LambdaInvoke(
                self, "ProcessItemTask",
                lambda_function=process_mailing_list_csv_function_alias,
                payload=sfn.TaskInput.from_object({
                    "row_item.$": "$.record",
                    "mail_data.$": "$.mail_data"
                    }),
                output_path="$.Payload"
            ).next(wait_state),
            mode=sfn.ProcessorMode.DISTRIBUTED,
            execution_type=sfn.ProcessorType.STANDARD
        )
        
        # Create the state machine
        definition = sfn.DefinitionBody.from_chainable(get_item_task.next(distributed_map))
        state_machine = sfn.StateMachine(
            self, "DataProcessingStateMachine",
            definition_body=definition,
            timeout=Duration.minutes(5)
        )
        
        bucket.grant_read(state_machine)
        EmailsTable.grant_read_data(state_machine)
今回はStepfunctionsを以下のようなeventを持って実行されること想定します。
{
    "file_name": "s3_key_path",
    "mail_id": "mail_id_on_dynamo_db"
}

SESのConfigurationSetについて

Amazon Simple Email Service (SES) の Configuration Set を活用して、送信したメールの詳細な追跡と分析を実現しました。具体的には、以下の情報を収集・処理しています:

  1. バウンスメール (Bounce)、苦情 (Complaint)、拒否(Reject)
  2. 送信済み(Delivery)
  3. メールの開封率

これらのイベントデータは、SES の Configuration Set を通じて Amazon Simple Notification Service (SNS) の Topics に送信されます。その後、AWS Lambda 関数が SNS Topic をサブスクライブし、これらのイベントを処理します。

この構成をAWS Cloud Development Kit (CDK) を使用してインフラストラクチャをコードとして定義しました。以下は、リソースを作成するCDKコードの概要です:

# Create an SNS topic for bounce and complaint notifications
bounce_complaint_topic = sns.Topic(
    self, 
    "BounceComplaintTopic",
    display_name="SES Bounce and Complaint Notifications Topic"
)
mail_delivery_topic = sns.Topic(
    self, 
    "MailDeliverysTopic",
    display_name="SES notice for delivery  Notifications Topic"
)
mail_actions_topic = sns.Topic(
    self, 
    "MaiilActionsTopic",
    display_name="SES notice for open or clicked Notifications Topic"
)
# Grant SES permission to publish to SNS topic
bounce_complaint_topic.grant_publish(
    iam.ServicePrincipal("ses.amazonaws.com")
)
mail_delivery_topic.grant_publish(
    iam.ServicePrincipal("ses.amazonaws.com")
)
mail_actions_topic.grant_publish(
    iam.ServicePrincipal("ses.amazonaws.com")
)

# Configure SES to send notifications to SNS
ses_configratuon_set = ses.CfnConfigurationSet(self, "ConfigurationSet",
    name="MarketingMailsConfigurationSet"
)

ses.CfnConfigurationSetEventDestination(self, "BounceEventDestination",
    configuration_set_name=ses_configratuon_set.name,
    event_destination={
        "name": "BounceComplaintDestination",
        "enabled": True,
        "matchingEventTypes": ["bounce", "complaint", "reject"],
        "snsDestination": {
            "topicArn": bounce_complaint_topic.topic_arn
        }
    }
)
ses.CfnConfigurationSetEventDestination(self, "EmailActionsEventDestination",
    configuration_set_name=ses_configratuon_set.name,
    event_destination={
        "name": "EmailActionDestination",
        "enabled": True,
        "matchingEventTypes": ["open", "click"],
        "snsDestination": {
            "topicArn": mail_actions_topic.topic_arn
        }
    }
)
ses.CfnConfigurationSetEventDestination(self, "EmailDeliveryEventDestination",
    configuration_set_name=ses_configratuon_set.name,
    event_destination={
        "name": "EmailDeliveryDestination",
        "enabled": True,
        "matchingEventTypes": ["delivery"],
        "snsDestination": {
            "topicArn": mail_delivery_topic.topic_arn
        }
    }
)

上のコードに含まれてないが、必要に応じて多数のEventを受け取ることができる。詳細は以下にて確認できる。SES送信アクティビティモニタリング

想定処理内容

Aws S3に送信するメーリングリストのcsvをアップロードして。s3のkeyを抑えます。DynamoDB送信するメールをmail_id、subject、bodyを登録してmail_idを作成します。作成したmail_idとs3_keyをファイル名にしたeventをCdkで作成したStepFuctionsのState machinesに手動で実行してます。作成されたState MachinesのDefinition以下の図面通りになる・

Eventに指定したmail_idでDynamoDBから送信内容を取得します。DistribitedMapで並列処理でメーリングリストから各行ごとを取得してProcessItemTaskに指定してLambdaでメール送信するようにしてます。
ProcessItemTaskの処理ですでに必要に応じてバウンスになっているかどうか、同じメールIdですでに配信済みかどうかなどの確認を行いつつSESを使ってメール配信してます。

SESは実績がつくるまでは毎秒送れる件数少ない。そのためレートリミットに引っかからないため各ループの間に1秒待機するようにしてます。

まとめ

今回は、サーバーレスアーキテクチャを活用した高性能かつ柔軟なメール配信システムの構築について詳述しました。Amazon SESのConfiguration Setを利用することで、バウンスメール処理、開封率追跡、リンククリック数測定などの機能を実現しました。さらに、AWS Step FunctionsのDistributed Mapを用いた並列処理により、大規模なメール配信の効率化を図りました。

アーキテクチャの核心部分では、AWS CDK、S3、Lambda、DynamoDB、SNSなどのAWSサービスを統合し、スケーラブルで管理しやすいシステムを構築しました。特に、StepFunctionsのDistributed Mapの実装により、並列処理の最適化と処理効率の向上を達成しました。

また、SESのConfiguration Setを活用したイベント追跡システムにより、メール配信の詳細な分析が可能となり、バウンスや苦情、開封率などの重要指標をリアルタイムで把握できるようになりました。

このアプローチにより、大規模なメールキャンペーンや通知システムにも対応可能な、高性能で柔軟性の高いメール配信基盤を実現しました。サーバーレスアーキテクチャの利点を最大限に活かしつつ、効率的なリソース管理と拡張性の高いシステム設計になっているかと思います。

無料相談実施中
AWSを使用したサーバーレスアプリケーションの構築
サーバーレス開発内製化、AWSへの移行等
様々な課題について無料でご相談お受けいたします。
最新情報をチェックしよう!