サーバーレスアプリケーションの開発において、エラーハンドリングは非常に重要な要素です。特にAWS Lambdaを使用する場合、適切なエラーハンドリング戦略を実装することで、アプリケーションの信頼性と回復力を大幅に向上させることができます。
Lambda関数の呼び出し方式とエラーハンドリング
Lambdaには3つの呼び出し方式があります:
- 同期呼び出し(Synchronous)
- 非同期呼び出し(Asynchronous)
- ポーリングベース呼び出し(Poll-based)
同期呼び出し(Synchronous)
API Gatewayなどで使用される同期呼び出しでは、組み込みのリトライ機能はありません。Lambda関数のエラーハンドリングでは、以下の3つの原則を考慮する必要があります。
- エラーの適切な分類
- 詳細なログ記録とトレーサビリティ
- クライアントへの明確なエラー応答

上記のアーキテクチャをAWS CDKを使用した実装例
from aws_cdk import (
Stack,
aws_apigateway as apigateway,
aws_lambda as lambda_,
aws_logs as logs,
aws_iam as iam,
Duration,
)
from constructs import Construct
class ApiStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Lambda関数の作成
handler = lambda_.Function(
self, "ApiHandler",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="index.handler",
code=lambda_.Code.from_asset("lambda"),
environment={
"STAGE": "prod",
"LOG_LEVEL": "INFO",
},
timeout=Duration.seconds(29),
memory_size=256,
tracing=lambda_.Tracing.ACTIVE,
log_retention=logs.RetentionDays.ONE_WEEK,
)
# API Gatewayの作成
api = apigateway.RestApi(
self, "Api",
rest_api_name="My Service API",
description="This is my API Gateway",
deploy_options=apigateway.StageOptions(
stage_name="prod",
access_log_destination=apigateway.LogGroupLogDestination(
logs.LogGroup(self, "ApiGatewayAccessLogs")
),
access_log_format=apigateway.AccessLogFormat.json_with_standard_fields(),
metrics_enabled=True,
tracing_enabled=True,
),
default_cors_preflight_options=apigateway.CorsOptions(
allow_origins=["*"],
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type", "X-Amz-Date", "Authorization", "X-Api-Key"],
max_age=Duration.days(1),
),
)
# APIリソースとメソッドの定義
items = api.root.add_resource("items")
# GET /items
items.add_method(
"GET",
apigateway.LambdaIntegration(
handler,
proxy=False,
integration_responses=[
apigateway.IntegrationResponse(
status_code="200",
response_parameters={
"method.response.header.Access-Control-Allow-Origin": "'*'",
},
)
],
),
method_responses=[
apigateway.MethodResponse(
status_code="200",
response_parameters={
"method.response.header.Access-Control-Allow-Origin": True,
},
)
],
)
# GET /items/{id}
item = items.add_resource("{id}")
item.add_method("GET", apigateway.LambdaIntegration(handler))
# POST /items
items.add_method("POST", apigateway.LambdaIntegration(handler))
エラーハンドリングは以下のように実装します:
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
tracer = Tracer()
metrics = Metrics(namespace="MyLambdaFunction")
class BusinessError(Exception):
def __init__(self, message: str, status_code: int = 400):
self.message = message
self.status_code = status_code
super().__init__(self.message)
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics
def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
try:
# メイン処理ロジック
result = process_request(event)
metrics.add_metric(name="SuccessfulExecution", unit=MetricUnit.Count, value=1)
return create_success_response(result)
except BusinessError as e:
logger.warning(f"Business error: {str(e)}")
metrics.add_metric(name="BusinessErrors", unit=MetricUnit.Count, value=1)
return create_error_response(e.status_code, str(e))
except Exception as e:
logger.exception("Unexpected error occurred")
metrics.add_metric(name="SystemErrors", unit=MetricUnit.Count, value=1)
return create_error_response(500, "Internal Server Error")
エラーを適切に分類することで、より効果的なエラーハンドリングが可能になります。
非同期呼び出し(Asynchronous)
SNSやEventBridgeを使用する非同期呼び出しでは、以下の2つのエラーケースを考慮する必要があります:
- イベントソースからLambdaへの配信失敗
- Lambda関数内での処理エラー

上記のアーキテクチャをAWS CDKを使用した実装例:
from aws_cdk import (
Duration,
aws_lambda as _lambda,
aws_sqs as sqs,
aws_sns as sns,
aws_lambda_event_sources as events,
)
from constructs import Construct
class SnsWithDlqHandlingStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# Sns Topics
sns_topic = sns.Topic(
self, "topic",
display_name="topic",
topic_name="lambda_topic")
# DLQ作成
dlq = sqs.Queue(self, "DeadLetterQueue")
# 成功時の送信先キュー
success_queue = sqs.Queue(self, "SuccessQueue")
# 失敗時の送信先キュー
failure_queue = sqs.Queue(self, "FailureQueue")
# Lambda関数の定義
processor_function = _lambda.Function(
self, "Processor",
runtime=_lambda.Runtime.PYTHON_3_9,
code=_lambda.Code.from_asset("lambda"),
retry_attempts=2,
environment={
"POWERTOOLS_SERVICE_NAME": "message-processor",
"POWERTOOLS_METRICS_NAMESPACE": "MessageProcessing",
"LOG_LEVEL": "INFO",
"POWERTOOLS_LOGGER_LOG_EVENT": "true",
"POWERTOOLS_LOGGER_SAMPLE_RATE": "0.1",
"POWERTOOLS_TRACE_SAMPLE_RATE": "0.1",
},
handler="index.handler",
on_success=destinations.SqsDestination(success_queue),
on_failure=destinations.SqsDestination(failure_queue),
timeout=Duration.seconds(15)
)
# associate lambda with sns as event source.
sns_source = events.SnsEventSource(
sns_topic,
dead_letter_queue=dlq)
processor_function.add_event_source(sns_source)
# X-Ray トレーシングの有効化
processor_function.add_to_role_policy(
iam.PolicyStatement(
actions=['xray:PutTraceSegments', 'xray:PutTelemetryRecords'],
resources=['*']
)
)
処理の例として以下のように実装することができる。
# lambda/index.py
import json
import random
import datetime
from typing import Dict, Any
from aws_lambda_powertools import Logger, Metrics, Tracer
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.data_classes import SNSEvent
from aws_lambda_powertools.utilities.validation import validate_schema
from aws_lambda_powertools.utilities.validation.exceptions import SchemaValidationError
# Powertoolsの初期化
logger = Logger(service="message-processor")
metrics = Metrics(namespace="MessageProcessing")
tracer = Tracer(service="message-processor")
# 入力メッセージのJSONスキーマ
MESSAGE_SCHEMA = {
"type": "object",
"properties": {
"id": {"type": "string"},
"data": {
"type": "object",
"properties": {
"name": {"type": "string"},
"value": {"type": "number"}
},
"required": ["name", "value"]
},
"timestamp": {"type": "string"}
},
"required": ["id", "data", "timestamp"]
}
class ProcessingError(Exception):
"""カスタム処理エラー"""
pass
@logger.inject_lambda_context
@metrics.log_metrics
@tracer.capture_lambda_handler
def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
"""
SNSメッセージを処理するLambda関数
Args:
event: SNSイベント
context: Lambda context
Returns:
dict: 処理結果
"""
try:
# SNSイベントとしてパース
sns_event = SNSEvent(event)
# メトリクスの初期化
metrics.add_metric(name="MessagesReceived", unit=MetricUnit.Count, value=1)
for record in sns_event.records:
# SNSメッセージの取得とパース
try:
message_data = json.loads(record.sns.message)
# スキーマ検証
validate_schema(event=message_data, schema=MESSAGE_SCHEMA)
logger.info("Message validation successful", extra={
"message_id": message_data["id"],
"timestamp": message_data["timestamp"]
})
# メッセージの処理
with tracer.capture_method():
processed_result = process_message(message_data)
# 成功メトリクスの記録
metrics.add_metric(
name="ProcessingSuccess",
unit=MetricUnit.Count,
value=1
)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Successfully processed message',
'processed_data': processed_result
})
}
except json.JSONDecodeError as e:
logger.error("Invalid JSON in message", extra={"error": str(e)})
metrics.add_metric(name="InvalidJSON", unit=MetricUnit.Count, value=1)
raise
except SchemaValidationError as e:
logger.error("Schema validation failed", extra={"error": str(e)})
metrics.add_metric(name="ValidationErrors", unit=MetricUnit.Count, value=1)
raise
except Exception as e:
logger.exception("Error processing message")
metrics.add_metric(name="ProcessingErrors", unit=MetricUnit.Count, value=1)
raise
@tracer.capture_method
def process_message(message_data: Dict[str, Any]) -> Dict[str, Any]:
"""
メッセージを処理する関数
Args:
message_data: 処理対象のメッセージデータ
Returns:
dict: 処理結果
"""
start_time = datetime.datetime.now()
try:
# 処理時間のシミュレーション
processing_time = random.uniform(0.1, 0.5)
tracer.put_annotation(key="processing_time", value=str(processing_time))
# ランダムエラーの発生(テスト用)
if random.random() < 0.3:
raise ProcessingError("Random processing error occurred")
# メッセージの処理ロジック
result = {
'original_data': message_data,
'processed': True,
'processed_timestamp': str(datetime.datetime.now())
}
# 処理時間のメトリクス記録
end_time = datetime.datetime.now()
processing_duration = (end_time - start_time).total_seconds()
metrics.add_metric(
name="ProcessingDuration",
unit=MetricUnit.Milliseconds,
value=processing_duration * 1000
)
logger.info(
"Message processing completed",
extra={
"message_id": message_data["id"],
"processing_duration": processing_duration
}
)
return result
except Exception as e:
logger.error(
"Error in message processing",
extra={
"message_id": message_data["id"],
"error": str(e)
}
)
raise
ポーリングベース呼び出し(Poll-based)
SQSを使用するポーリングベース呼び出しでは、メッセージの可視性タイムアウトと保持期間を考慮したエラーハンドリングが必要です。
LambdaとSQSを使用する際、Lambdaがメッセージを処理できず、メッセージの保持期間が切れた場合、SQSはそのメッセージを破棄します。メッセージの処理失敗は、タイムアウトや不正なペイロードなどの関数処理の失敗が原因となることがあります。また、送信先の関数が存在しない場合や、適切な権限がない場合にも処理失敗が発生する可能性があります。
破棄されたメッセージを保持するために、ソースキューに別のデッドレターキュー(DLQ)を設定することができます。DLQは元のメッセージを保持し、根本原因の分析、エラー状態の適切な処理、手動介入が必要な通知の送信に役立ちます。ポールベースの呼び出しシナリオでは、Lambda関数自体はDLQを維持せず、SQSで設定された外部DLQに依存します。
以下は、LambdaがSQSキューからイベントをポーリングしてLambda関数を呼び出すように設定する際の設計パターンを示しています。

上記のアーキテクチャをAWS CDKを使用した実装例:
from aws_cdk import (
aws_sqs as sqs,
aws_lambda as _lambda,
Stack,
)
from constructs import Construct
class SqsProcessingStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# DLQ
dlq = sqs.Queue(
self, "DeadLetterQueue",
retention_period=Duration.days(14)
)
# メインキュー
main_queue = sqs.Queue(
self, "MainQueue",
visibility_timeout=Duration.seconds(30),
retention_period=Duration.days(14),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=10,
queue=dlq)
)
# Pooling Lambda を作成
pooling_function = _lambda.Function(
self, "Pooling",
runtime=_lambda.Runtime.PYTHON_3_9,
code=_lambda.Code.from_asset("lambda"),
handler="index.handler",
timeout=Duration.seconds(15),
environment={
"POWERTOOLS_SERVICE_NAME": "sqs-processing",
"POWERTOOLS_METRICS_NAMESPACE": "SQSProcessing",
"LOG_LEVEL": "INFO"
},
)
# Lambda関数にSQSの権限を付与
main_queue.grant_consume_messages(pooling_function)
# X-Ray トレーシングの有効化
pooling_function.add_to_role_policy(
iam.PolicyStatement(
actions=['xray:PutTraceSegments', 'xray:PutTelemetryRecords'],
resources=['*']
)
)
# associate lambda with sqs as event source.
sqs_source = events.SnsEventSource(
main_queue,
batch_size=1)
pooling_function.add_event_source(sns_source)
# SNSトピックの作成(アラート通知用)
alert_topic = sns.Topic(
self, "DLQAlertTopic",
display_name="DLQ Alert Topic",
topic_name="dlq-alert-topic"
)
# メール通知の設定
alert_topic.add_subscription(
sns_subscriptions.EmailSubscription('myemailaddress@example.com')
)
# DLQのメトリクスアラームを作成
dlq_alarm = cloudwatch.Alarm(
self, "DLQMessagesAlarm",
metric=dlq.metric_approximate_number_of_messages_visible(),
evaluation_periods=1,
threshold=1, # DLQに1つ以上メッセージが存在する場合
comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
alarm_description="Messages detected in DLQ"
)
# アラームをSNSトピックに関連付け
dlq_alarm.add_alarm_action(
cloudwatch_actions.SnsAction(alert_topic)
)
実際のLambdaの実装例として以下のようになる。
from aws_lambda_powertools import Logger, Metrics, Tracer
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext
from typing import Any, Dict
# Powertools の初期化
logger = Logger()
metrics = Metrics()
tracer = Tracer()
@logger.inject_lambda_context
@metrics.log_metrics
@tracer.capture_lambda_handler
def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
try:
# SQSメッセージの処理
for record in event.get('Records', []):
message = record.get('body', '')
message_id = record.get('messageId', '')
logger.info(f"Processing message",
extra={
"message_id": message_id,
"message": message
})
# メッセージ処理のメトリクス
metrics.add_metric(name="ProcessedMessages", unit=MetricUnit.Count, value=1)
# ここにメッセージ処理のビジネスロジックを実装
process_message(message)
# 成功メトリクスの記録
metrics.add_metric(name="SuccessfulProcessing", unit=MetricUnit.Count, value=1)
return {
'statusCode': 200,
'body': 'Successfully processed messages'
}
except Exception as e:
# エラーログの出力
logger.exception("Error processing messages")
# エラーメトリクスの記録
metrics.add_metric(name="ProcessingErrors", unit=MetricUnit.Count, value=1)
raise e
@tracer.capture_method
def process_message(message: str) -> None:
"""
メッセージ処理のビジネスロジック
"""
# メッセージ処理のロジックをここに実装
logger.info(f"Processing business logic for message")
# 処理の実装...
参考
https://aws.amazon.com/blogs/compute/implementing-aws-lambda-error-handling-patterns/