SQS LambdaからSESでメール送信を行いたい

AWS

やりたいこと、目標

Amazon SESを使ってメールを送信する際、秒間に送信できる数にはクォータが設定されています。上限緩和申請すれば枠の拡張はできますが、複数のサービスでメール処理を行う場合バッチ処理で大量の並列処理でメールを送信すると簡単にクォータを超えてしまうと思われます。

今回、SQS -> lambda -> SESという構成を組むことでアカウント全体でメールを送信する処理は1箇所で行うようにし、メール送信の速度も調整できるようにしたいと思います。

SESの設定

SESサービスを使用するにはまず、送信元のアドレスを登録する必要があります

SESコンソール画面で「IDの作成」を押下、画面に沿ってドメイン登録もしくはEメールアドレス登録を行います

詳細はこちらを参照ください

CDKでリソース定義

構成管理ツールのCDKを用いて、lambda SQS IAM等の構築を一括で行います

import os
from aws_cdk import (
    Duration,
    aws_sqs as sqs,
    aws_lambda_event_sources as lambda_event_sources,
)
from constructs import Construct
from cdk_resources.modules.lambda_module import LambdaModule


env = os.getenv("ENVIRONMENT", "dev")
commit_ref_name = os.getenv("COMMIT_REF_NAME")


class SendEmailSqsResource(Construct):

    def __init__(
        self,
        scope: Construct,
        id: str,
        backend_dirname: str,
        environment={},
        layers=[],
        layer_hashes=[]
    ):
        super().__init__(scope, id)
        service_name = self.node.try_get_context("service_name")
        ses_region = self.node.try_get_context("ses_region")

        ctx_stage = self.node.try_get_context(env)
        sender_email = ctx_stage["sender_email"]
        application_url = "https://" + ctx_stage["domain_name"]

        if env == "feat":
            stage_name = "Feat" + commit_ref_name[:10]
            application_url = "http://localhost:3000"
        else:
            stage_name = env.capitalize()

        # SQS
        send_email_queue = sqs.Queue(
            self, "SendEmailQueue",
            content_based_deduplication=False,
            queue_name=f"{service_name}{stage_name}SendEmailQueue.fifo",
            receive_message_wait_time=Duration.seconds(20),
            fifo=True,
            visibility_timeout=Duration.minutes(15)
        )

        sqs_consume_props = {
            "name": "EmailSendSqs",
            "path": "functions/src/sqs/email_send_sqs",
            "memory_size": 512,
            "reserved_concurrent_executions": None,
            "metric_group_name": "sqs",
            "role_props": {
                    "consume_queues": [send_email_queue],
                    "is_ses": True
            },
            "environment": {
                "SES_REGION": ses_region,
                "SENDER_EMAIL": sender_email,
                "APPLICATION_URL": application_url,
            },
            "timeout": 29
        }

        sqs_consume_lambda_env = sqs_consume_props["environment"]
        sqs_consume_lambda_env.update(environment)
        sqs_consume_lambda_module = LambdaModule(
            self,
            name=sqs_consume_props["name"],
            path=os.path.join(
                backend_dirname, sqs_consume_props["path"]),
            handler="app.lambda_handler",
            environment=sqs_consume_lambda_env,
            timeout=sqs_consume_props["timeout"],
            memory_size=sqs_consume_props["memory_size"],
            reserved_concurrent_executions=sqs_consume_props["reserved_concurrent_executions"],
            layers=layers,
            layer_hashes=layer_hashes,
            metric_group_name=sqs_consume_props["metric_group_name"],
            role_props=sqs_consume_props["role_props"]
        )
        sqs_consume_lambda_module.create_lambda_alias()
        sqs_consume_lambda_function_alias = sqs_consume_lambda_module.lambda_function_alias
        sqs_consume_lambda_function_alias.add_event_source(
            lambda_event_sources.SqsEventSource(
                send_email_queue,
                batch_size=5
            )
        )

        self.send_email_queue = send_email_queue

ここではSQSとlambdaリソースを定義しています

必要に応じてlambdaで処理するbatch_sizeとlambdaのreserved_concurrent_executionsを調整することで

メールを処理する速度を調整します

import os
from aws_cdk import (
    AssetHashType,
    BundlingOptions,
    Duration,
    RemovalPolicy,
    aws_lambda as lambda_,
    aws_logs as logs,
    aws_sqs as sqs,
    aws_iam as iam,
)
from typing import TypedDict

env = os.getenv("ENVIRONMENT", "dev")
stage_name = env.capitalize()

if env == "feat":
    stage_name += os.getenv("COMMIT_REF_NAME")


class RolePropsType(TypedDict, total=False):
    send_queues: list[sqs.Queue]
    consume_queues: list[sqs.Queue]
    is_ses: bool


class LambdaModule:
    def __init__(
        self,
        scope,
        name: str,
        path: str,
        handler: str,
        environment: dict,
        timeout: int,
        memory_size: int,
        reserved_concurrent_executions: int,
        layers: list,
        layer_hashes=None,
        role_props: RolePropsType = {}
    ):
        self.scope = scope
        self.name = name
        self.path = path
        self.handler = handler
        self.environment = environment
        self.timeout = timeout
        self.memory_size = memory_size
        self.reserved_concurrent_executions = reserved_concurrent_executions
        self.layers = layers
        self.layer_hashes = layer_hashes
        self.role_props = role_props

        self.service_name = scope.node.try_get_context("service_name")

    def joined_hash(self):
        if self.layer_hashes is None:
            return ""
        return (", ").join(self.layer_hashes)

    def create_lambda_function(self):
        lambda_function = lambda_.Function(
            self.scope, f"{self.name}Function",
            code=lambda_.Code.from_asset(
                self.path,
                asset_hash_type=AssetHashType.SOURCE,
                bundling=BundlingOptions(
                    image=lambda_.Runtime.PYTHON_3_10.bundling_image,
                    command=[
                        "bash",
                        "-c",
                        " && ".join(
                            [
                                "pip install -r requirements.txt -t /asset-output",
                                "cp -au . /asset-output",
                            ]
                        ),
                    ],
                    user="root:root"
                ),
            ),
            handler=self.handler,
            runtime=lambda_.Runtime.PYTHON_3_10,
            environment=self.environment,
            description=f"stage: {self.environment['STAGE']}, branch: {self.environment['COMMIT_REF_NAME']}, name: {self.name}",
            timeout=Duration.seconds(
                self.timeout),
            memory_size=self.memory_size,
            reserved_concurrent_executions=self.reserved_concurrent_executions,
            current_version_options={
                "removal_policy": RemovalPolicy.RETAIN
            },
            layers=self.layers,
            log_retention=logs.RetentionDays.TWO_MONTHS,
            architecture=lambda_.Architecture.X86_64
        )
        self.lambda_function = lambda_function

    def create_role_lambda(self):
        role_props = self.role_props
        lambda_function = self.lambda_function

        if role_props.get("send_queues"):
            for send_queue in role_props["send_queues"]:
                send_queue.grant_send_messages(lambda_function)

        if role_props.get("consume_queues"):
            for consume_queue in role_props["consume_queues"]:
                consume_queue.grant_consume_messages(lambda_function)

        if role_props.get("is_ses"):
            lambda_function.add_to_role_policy(statement=iam.PolicyStatement(
                actions=["ses:SendEmail", "ses:SendRawEmail"], resources=["*"]))


    def create_lambda_alias(self):
        self.create_lambda_function()
        self.create_role_lambda()
        self.assign_metrics()

        self.lambda_function_alias: lambda_.Alias = self.lambda_function.add_alias(
            "Live")

lambdaの定義はモジュール化して1箇所にまとめると個人的には便利です

grant_xxメゾットと使用すると簡単にIAM権限をlambdaへ付与することができます

descriptionを設定することでAWSコンソールからlambdaを探す際に楽になります

{
  "service_name": "XXXXXXXXXXXXXXXXXXXXX",
  "ses_region": "ap-northeast-1",
  "feat": {
    "hosted_zone_id": "XXXXXXXXXXXXXX",
    "domain_name": "feat.XXXXXXXXXXXXXX.dev.XXXXXXX.com",
    "log_level": "DEBUG",
    "sender_email": "XXXXXXX@dev.XXXXXXX.com"
  },
  "dev": {
    "hosted_zone_id": "XXXXXXXXXXXXXX",
    "domain_name": "XXXXXXXXXXXXXX.dev.XXXXXXX.com",
    "log_level": "INFO",
    "sender_email": "XXXXXXX@dev.XXXXXXX.com"
  },
  "prod": {
    "hosted_zone_id": "XXXXXXX",
    "domain_name": "XXXXXXXXXXXXXX.XXXXXXX.com",
    "log_level": "WARNING",
    "sender_email": "XXXXXXX@XXXXXXX.com"
  }
}

ステージごとの変数はcdk.context.jsonに定義します

lambda内のPythonからSESへリクエストする

import boto3
import os
from aws_lambda_powertools import Logger
from email_send_sqs_modules.email_send_sqs_processor import EmaiSendSqsProcessor

logger = Logger()

ses_client = boto3.client("ses", region_name=os.environ["SES_REGION"])

email_send_sqs_processor = EmaiSendSqsProcessor(
    ses_client=ses_client
)


@logger.inject_lambda_context(log_event=True)
def lambda_handler(event, context):
    records = event["Records"]
    for record in records:
        try:
            email_send_sqs_processor.email_send_sqs_processor(
                record=record
            )
        except Exception:
            logger.exception(record["body"])

SQSのeventはRecordsにリスト型で入ってくるのでループで一件ずつ処理を行います

import os
from aws_lambda_powertools import Logger
from common_modules.ses.ses_module import SesModule
from email_send_sqs_modules.type_1_sender import Type1Sender
from email_send_sqs_modules.type_2_sender import Type2Sender

dirname = os.path.dirname(__file__)

logger = Logger()


class EmaiSendSqsProcessor:
    def __init__(
            self,
            ses_client
    ) -> None:
        self.ses_module = SesModule(
            ses_client=ses_client
        )

    def email_send_sqs_processor(self, record):
        message_group_id = record["attributes"]["MessageGroupId"]

        if message_group_id == "message_type_1":
            Type1Sender(
                ses_module=self.ses_module
            ).sender_processor(
                record=record
            )
        elif message_group_id == "message_type_2":
            Type2Sender(
                ses_module=self.ses_module
            ).sender_processor(
                record=record
            )
        else:
            logger.warning(
                f"message_group_id: {message_group_id} is not process target")

message_group_idによって、送信内容のカテゴリーを分けて処理します

import os
import json
from aws_lambda_powertools import Logger
from common_modules.common.util import Common


dirname = os.path.dirname(__file__)

logger = Logger()


class Type1Sender:
    def __init__(
        self,
        ses_module
    ) -> None:
        self.ses_module = ses_module

    def sender_processor(self, record):
        body = json.loads(record["body"])

        recipient = body.get("email")
        sender = os.environ["SENDER_EMAIL"]
        body_contents = self.get_body_contents(body)

        logger.info(
            f"recipient: {recipient}, sender: {sender}, body_contents: {body_contents}"
        )
        self.ses_module.send_email(
            recipient=recipient,
            sender=sender,
            charset="UTF-8",
            body_text=body_contents["BODY_TEXT"],
            body_html=body_contents["BODY_HTML"],
            subject=body_contents["SUBJECT"]
        )

    @classmethod
    def get_body_contents(cls, body):
        organization_name = body["organization_name"]
        user_name = body["user_name"]
        test_label = "" if os.environ["STAGE"] == "prod" else "【テスト】"
        body_content = {}
        body_content["SUBJECT"] = test_label + \
            "【XXXサービス】お知らせ"

        body_content["BODY_TEXT"] = (
            f"{organization_name} {user_name}様\r\n\r\n"
            "XXXXXのお知らせします\r\n"
        )
        body_content["BODY_HTML"] = (
            "<html>"
            "<head></head>"
            "<body>"
            f"{organization_name} {user_name}様<br>"
            "XXXXXのお知らせします<br>"
            "</body>"
            "</html>"
        )

        return body_content

ここでHTML用とTEXT用で二種類メールの内容を作っています

recipientに送信先emailアドレス、senderに送信元emailアドレスを設定します

class SesModule:

    def __init__(
        self,
        ses_client
    ) -> None:
        self.ses_client = ses_client

    def send_email(
            self,
            recipient,
            sender,
            charset,
            body_text,
            subject,
            body_html=None,
    ):
        message_content = {
            "Body": {
                "Text": {
                    "Charset": charset,
                    "Data": body_text,
                },
            },
            "Subject": {
                "Charset": charset,
                "Data": subject,
            },
        }
        if body_html is not None:
            message_content["Body"]["Html"] = {
                "Charset": charset,
                "Data": body_html,
            }

        response = self.ses_client.send_email(
            Destination={
                "ToAddresses": [
                    recipient,
                ],
            },
            Message=message_content,
            Source=sender
        )
        return response

SES処理の共通部分はモジュール化しています

ここでは、boto3.clientから取得したses_clientを使い、send_emailでメールを送信します

まとめ

CDKはモジュール化すると使い回しに便利だと思います

コンソールでSESの初期設定を行うところだけCDKで一括立ち上げできないので、そこが手間でした

無料相談実施中
AWSを使用したサーバーレスアプリケーションの構築
サーバーレス開発内製化、AWSへの移行等
様々な課題について無料でご相談お受けいたします。
最新情報をチェックしよう!