やりたいこと、目標
Amazon SESを使ってメールを送信する際、秒間に送信できる数にはクォータが設定されています。上限緩和申請すれば枠の拡張はできますが、複数のサービスでメール処理を行う場合バッチ処理で大量の並列処理でメールを送信すると簡単にクォータを超えてしまうと思われます。
今回、SQS -> lambda -> SESという構成を組むことでアカウント全体でメールを送信する処理は1箇所で行うようにし、メール送信の速度も調整できるようにしたいと思います。
SESの設定
SESサービスを使用するにはまず、送信元のアドレスを登録する必要があります
SESコンソール画面で「IDの作成」を押下、画面に沿ってドメイン登録もしくはEメールアドレス登録を行います
詳細はこちらを参照ください
CDKでリソース定義
構成管理ツールのCDKを用いて、lambda SQS IAM等の構築を一括で行います
import os
from aws_cdk import (
Duration,
aws_sqs as sqs,
aws_lambda_event_sources as lambda_event_sources,
)
from constructs import Construct
from cdk_resources.modules.lambda_module import LambdaModule
env = os.getenv("ENVIRONMENT", "dev")
commit_ref_name = os.getenv("COMMIT_REF_NAME")
class SendEmailSqsResource(Construct):
def __init__(
self,
scope: Construct,
id: str,
backend_dirname: str,
environment={},
layers=[],
layer_hashes=[]
):
super().__init__(scope, id)
service_name = self.node.try_get_context("service_name")
ses_region = self.node.try_get_context("ses_region")
ctx_stage = self.node.try_get_context(env)
sender_email = ctx_stage["sender_email"]
application_url = "https://" + ctx_stage["domain_name"]
if env == "feat":
stage_name = "Feat" + commit_ref_name[:10]
application_url = "http://localhost:3000"
else:
stage_name = env.capitalize()
# SQS
send_email_queue = sqs.Queue(
self, "SendEmailQueue",
content_based_deduplication=False,
queue_name=f"{service_name}{stage_name}SendEmailQueue.fifo",
receive_message_wait_time=Duration.seconds(20),
fifo=True,
visibility_timeout=Duration.minutes(15)
)
sqs_consume_props = {
"name": "EmailSendSqs",
"path": "functions/src/sqs/email_send_sqs",
"memory_size": 512,
"reserved_concurrent_executions": None,
"metric_group_name": "sqs",
"role_props": {
"consume_queues": [send_email_queue],
"is_ses": True
},
"environment": {
"SES_REGION": ses_region,
"SENDER_EMAIL": sender_email,
"APPLICATION_URL": application_url,
},
"timeout": 29
}
sqs_consume_lambda_env = sqs_consume_props["environment"]
sqs_consume_lambda_env.update(environment)
sqs_consume_lambda_module = LambdaModule(
self,
name=sqs_consume_props["name"],
path=os.path.join(
backend_dirname, sqs_consume_props["path"]),
handler="app.lambda_handler",
environment=sqs_consume_lambda_env,
timeout=sqs_consume_props["timeout"],
memory_size=sqs_consume_props["memory_size"],
reserved_concurrent_executions=sqs_consume_props["reserved_concurrent_executions"],
layers=layers,
layer_hashes=layer_hashes,
metric_group_name=sqs_consume_props["metric_group_name"],
role_props=sqs_consume_props["role_props"]
)
sqs_consume_lambda_module.create_lambda_alias()
sqs_consume_lambda_function_alias = sqs_consume_lambda_module.lambda_function_alias
sqs_consume_lambda_function_alias.add_event_source(
lambda_event_sources.SqsEventSource(
send_email_queue,
batch_size=5
)
)
self.send_email_queue = send_email_queue
ここではSQSとlambdaリソースを定義しています
必要に応じてlambdaで処理するbatch_sizeとlambdaのreserved_concurrent_executionsを調整することで
メールを処理する速度を調整します
import os
from aws_cdk import (
AssetHashType,
BundlingOptions,
Duration,
RemovalPolicy,
aws_lambda as lambda_,
aws_logs as logs,
aws_sqs as sqs,
aws_iam as iam,
)
from typing import TypedDict
env = os.getenv("ENVIRONMENT", "dev")
stage_name = env.capitalize()
if env == "feat":
stage_name += os.getenv("COMMIT_REF_NAME")
class RolePropsType(TypedDict, total=False):
send_queues: list[sqs.Queue]
consume_queues: list[sqs.Queue]
is_ses: bool
class LambdaModule:
def __init__(
self,
scope,
name: str,
path: str,
handler: str,
environment: dict,
timeout: int,
memory_size: int,
reserved_concurrent_executions: int,
layers: list,
layer_hashes=None,
role_props: RolePropsType = {}
):
self.scope = scope
self.name = name
self.path = path
self.handler = handler
self.environment = environment
self.timeout = timeout
self.memory_size = memory_size
self.reserved_concurrent_executions = reserved_concurrent_executions
self.layers = layers
self.layer_hashes = layer_hashes
self.role_props = role_props
self.service_name = scope.node.try_get_context("service_name")
def joined_hash(self):
if self.layer_hashes is None:
return ""
return (", ").join(self.layer_hashes)
def create_lambda_function(self):
lambda_function = lambda_.Function(
self.scope, f"{self.name}Function",
code=lambda_.Code.from_asset(
self.path,
asset_hash_type=AssetHashType.SOURCE,
bundling=BundlingOptions(
image=lambda_.Runtime.PYTHON_3_10.bundling_image,
command=[
"bash",
"-c",
" && ".join(
[
"pip install -r requirements.txt -t /asset-output",
"cp -au . /asset-output",
]
),
],
user="root:root"
),
),
handler=self.handler,
runtime=lambda_.Runtime.PYTHON_3_10,
environment=self.environment,
description=f"stage: {self.environment['STAGE']}, branch: {self.environment['COMMIT_REF_NAME']}, name: {self.name}",
timeout=Duration.seconds(
self.timeout),
memory_size=self.memory_size,
reserved_concurrent_executions=self.reserved_concurrent_executions,
current_version_options={
"removal_policy": RemovalPolicy.RETAIN
},
layers=self.layers,
log_retention=logs.RetentionDays.TWO_MONTHS,
architecture=lambda_.Architecture.X86_64
)
self.lambda_function = lambda_function
def create_role_lambda(self):
role_props = self.role_props
lambda_function = self.lambda_function
if role_props.get("send_queues"):
for send_queue in role_props["send_queues"]:
send_queue.grant_send_messages(lambda_function)
if role_props.get("consume_queues"):
for consume_queue in role_props["consume_queues"]:
consume_queue.grant_consume_messages(lambda_function)
if role_props.get("is_ses"):
lambda_function.add_to_role_policy(statement=iam.PolicyStatement(
actions=["ses:SendEmail", "ses:SendRawEmail"], resources=["*"]))
def create_lambda_alias(self):
self.create_lambda_function()
self.create_role_lambda()
self.assign_metrics()
self.lambda_function_alias: lambda_.Alias = self.lambda_function.add_alias(
"Live")
lambdaの定義はモジュール化して1箇所にまとめると個人的には便利です
grant_xxメゾットと使用すると簡単にIAM権限をlambdaへ付与することができます
descriptionを設定することでAWSコンソールからlambdaを探す際に楽になります
{
"service_name": "XXXXXXXXXXXXXXXXXXXXX",
"ses_region": "ap-northeast-1",
"feat": {
"hosted_zone_id": "XXXXXXXXXXXXXX",
"domain_name": "feat.XXXXXXXXXXXXXX.dev.XXXXXXX.com",
"log_level": "DEBUG",
"sender_email": "XXXXXXX@dev.XXXXXXX.com"
},
"dev": {
"hosted_zone_id": "XXXXXXXXXXXXXX",
"domain_name": "XXXXXXXXXXXXXX.dev.XXXXXXX.com",
"log_level": "INFO",
"sender_email": "XXXXXXX@dev.XXXXXXX.com"
},
"prod": {
"hosted_zone_id": "XXXXXXX",
"domain_name": "XXXXXXXXXXXXXX.XXXXXXX.com",
"log_level": "WARNING",
"sender_email": "XXXXXXX@XXXXXXX.com"
}
}
ステージごとの変数はcdk.context.jsonに定義します
lambda内のPythonからSESへリクエストする
import boto3
import os
from aws_lambda_powertools import Logger
from email_send_sqs_modules.email_send_sqs_processor import EmaiSendSqsProcessor
logger = Logger()
ses_client = boto3.client("ses", region_name=os.environ["SES_REGION"])
email_send_sqs_processor = EmaiSendSqsProcessor(
ses_client=ses_client
)
@logger.inject_lambda_context(log_event=True)
def lambda_handler(event, context):
records = event["Records"]
for record in records:
try:
email_send_sqs_processor.email_send_sqs_processor(
record=record
)
except Exception:
logger.exception(record["body"])
SQSのeventはRecordsにリスト型で入ってくるのでループで一件ずつ処理を行います
import os
from aws_lambda_powertools import Logger
from common_modules.ses.ses_module import SesModule
from email_send_sqs_modules.type_1_sender import Type1Sender
from email_send_sqs_modules.type_2_sender import Type2Sender
dirname = os.path.dirname(__file__)
logger = Logger()
class EmaiSendSqsProcessor:
def __init__(
self,
ses_client
) -> None:
self.ses_module = SesModule(
ses_client=ses_client
)
def email_send_sqs_processor(self, record):
message_group_id = record["attributes"]["MessageGroupId"]
if message_group_id == "message_type_1":
Type1Sender(
ses_module=self.ses_module
).sender_processor(
record=record
)
elif message_group_id == "message_type_2":
Type2Sender(
ses_module=self.ses_module
).sender_processor(
record=record
)
else:
logger.warning(
f"message_group_id: {message_group_id} is not process target")
message_group_idによって、送信内容のカテゴリーを分けて処理します
import os
import json
from aws_lambda_powertools import Logger
from common_modules.common.util import Common
dirname = os.path.dirname(__file__)
logger = Logger()
class Type1Sender:
def __init__(
self,
ses_module
) -> None:
self.ses_module = ses_module
def sender_processor(self, record):
body = json.loads(record["body"])
recipient = body.get("email")
sender = os.environ["SENDER_EMAIL"]
body_contents = self.get_body_contents(body)
logger.info(
f"recipient: {recipient}, sender: {sender}, body_contents: {body_contents}"
)
self.ses_module.send_email(
recipient=recipient,
sender=sender,
charset="UTF-8",
body_text=body_contents["BODY_TEXT"],
body_html=body_contents["BODY_HTML"],
subject=body_contents["SUBJECT"]
)
@classmethod
def get_body_contents(cls, body):
organization_name = body["organization_name"]
user_name = body["user_name"]
test_label = "" if os.environ["STAGE"] == "prod" else "【テスト】"
body_content = {}
body_content["SUBJECT"] = test_label + \
"【XXXサービス】お知らせ"
body_content["BODY_TEXT"] = (
f"{organization_name} {user_name}様\r\n\r\n"
"XXXXXのお知らせします\r\n"
)
body_content["BODY_HTML"] = (
"<html>"
"<head></head>"
"<body>"
f"{organization_name} {user_name}様<br>"
"XXXXXのお知らせします<br>"
"</body>"
"</html>"
)
return body_content
ここでHTML用とTEXT用で二種類メールの内容を作っています
recipientに送信先emailアドレス、senderに送信元emailアドレスを設定します
class SesModule:
def __init__(
self,
ses_client
) -> None:
self.ses_client = ses_client
def send_email(
self,
recipient,
sender,
charset,
body_text,
subject,
body_html=None,
):
message_content = {
"Body": {
"Text": {
"Charset": charset,
"Data": body_text,
},
},
"Subject": {
"Charset": charset,
"Data": subject,
},
}
if body_html is not None:
message_content["Body"]["Html"] = {
"Charset": charset,
"Data": body_html,
}
response = self.ses_client.send_email(
Destination={
"ToAddresses": [
recipient,
],
},
Message=message_content,
Source=sender
)
return response
SES処理の共通部分はモジュール化しています
ここでは、boto3.clientから取得したses_clientを使い、send_emailでメールを送信します
まとめ
CDKはモジュール化すると使い回しに便利だと思います
コンソールでSESの初期設定を行うところだけCDKで一括立ち上げできないので、そこが手間でした