AWSLambdaのエラーハンドリングパターンの実装

AWS

サーバーレスアプリケーションの開発において、エラーハンドリングは非常に重要な要素です。特にAWS Lambdaを使用する場合、適切なエラーハンドリング戦略を実装することで、アプリケーションの信頼性と回復力を大幅に向上させることができます。

Lambda関数の呼び出し方式とエラーハンドリング

Lambdaには3つの呼び出し方式があります:

  1. 同期呼び出し(Synchronous)
  2. 非同期呼び出し(Asynchronous)
  3. ポーリングベース呼び出し(Poll-based)

同期呼び出し(Synchronous)

API Gatewayなどで使用される同期呼び出しでは、組み込みのリトライ機能はありません。Lambda関数のエラーハンドリングでは、以下の3つの原則を考慮する必要があります。

  1. エラーの適切な分類
  2. 詳細なログ記録とトレーサビリティ
  3. クライアントへの明確なエラー応答

上記のアーキテクチャをAWS CDKを使用した実装例

from aws_cdk import (
    Stack,
    aws_apigateway as apigateway,
    aws_lambda as lambda_,
    aws_logs as logs,
    aws_iam as iam,
    Duration,
)
from constructs import Construct

class ApiStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Lambda関数の作成
        handler = lambda_.Function(
            self, "ApiHandler",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=lambda_.Code.from_asset("lambda"),
            environment={
                "STAGE": "prod",
                "LOG_LEVEL": "INFO",
            },
            timeout=Duration.seconds(29),
            memory_size=256,
            tracing=lambda_.Tracing.ACTIVE,
            log_retention=logs.RetentionDays.ONE_WEEK,
        )

        # API Gatewayの作成
        api = apigateway.RestApi(
            self, "Api",
            rest_api_name="My Service API",
            description="This is my API Gateway",
            deploy_options=apigateway.StageOptions(
                stage_name="prod",
                access_log_destination=apigateway.LogGroupLogDestination(
                    logs.LogGroup(self, "ApiGatewayAccessLogs")
                ),
                access_log_format=apigateway.AccessLogFormat.json_with_standard_fields(),
                metrics_enabled=True,
                tracing_enabled=True,
            ),
            default_cors_preflight_options=apigateway.CorsOptions(
                allow_origins=["*"],
                allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
                allow_headers=["Content-Type", "X-Amz-Date", "Authorization", "X-Api-Key"],
                max_age=Duration.days(1),
            ),
        )

        # APIリソースとメソッドの定義
        items = api.root.add_resource("items")
        
        # GET /items
        items.add_method(
            "GET",
            apigateway.LambdaIntegration(
                handler,
                proxy=False,
                integration_responses=[
                    apigateway.IntegrationResponse(
                        status_code="200",
                        response_parameters={
                            "method.response.header.Access-Control-Allow-Origin": "'*'",
                        },
                    )
                ],
            ),
            method_responses=[
                apigateway.MethodResponse(
                    status_code="200",
                    response_parameters={
                        "method.response.header.Access-Control-Allow-Origin": True,
                    },
                )
            ],
        )

        # GET /items/{id}
        item = items.add_resource("{id}")
        item.add_method("GET", apigateway.LambdaIntegration(handler))

        # POST /items
        items.add_method("POST", apigateway.LambdaIntegration(handler))


エラーハンドリングは以下のように実装します:

from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()
tracer = Tracer()
metrics = Metrics(namespace="MyLambdaFunction")

class BusinessError(Exception):
    def __init__(self, message: str, status_code: int = 400):
        self.message = message
        self.status_code = status_code
        super().__init__(self.message)

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics
def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
    try:
        # メイン処理ロジック
        result = process_request(event)
        metrics.add_metric(name="SuccessfulExecution", unit=MetricUnit.Count, value=1)
        return create_success_response(result)
        
    except BusinessError as e:
        logger.warning(f"Business error: {str(e)}")
        metrics.add_metric(name="BusinessErrors", unit=MetricUnit.Count, value=1)
        return create_error_response(e.status_code, str(e))
        
    except Exception as e:
        logger.exception("Unexpected error occurred")
        metrics.add_metric(name="SystemErrors", unit=MetricUnit.Count, value=1)
        return create_error_response(500, "Internal Server Error")

エラーを適切に分類することで、より効果的なエラーハンドリングが可能になります。

非同期呼び出し(Asynchronous)

SNSやEventBridgeを使用する非同期呼び出しでは、以下の2つのエラーケースを考慮する必要があります:

  1. イベントソースからLambdaへの配信失敗
  2. Lambda関数内での処理エラー

上記のアーキテクチャをAWS CDKを使用した実装例:

from aws_cdk import (
    Duration,
    aws_lambda as _lambda,
    aws_sqs as sqs,
    aws_sns as sns,
    aws_lambda_event_sources as events,
)
from constructs import Construct

class SnsWithDlqHandlingStack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)
        
        # Sns Topics
        sns_topic = sns.Topic(
            self, "topic",
            display_name="topic",
            topic_name="lambda_topic")
        
        # DLQ作成
        dlq = sqs.Queue(self, "DeadLetterQueue")
        
        # 成功時の送信先キュー
        success_queue = sqs.Queue(self, "SuccessQueue")
        
        # 失敗時の送信先キュー
        failure_queue = sqs.Queue(self, "FailureQueue")
        
        # Lambda関数の定義
        processor_function = _lambda.Function(
            self, "Processor",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            retry_attempts=2,
       environment={
            "POWERTOOLS_SERVICE_NAME": "message-processor",
            "POWERTOOLS_METRICS_NAMESPACE": "MessageProcessing",
            "LOG_LEVEL": "INFO",
            "POWERTOOLS_LOGGER_LOG_EVENT": "true",
            "POWERTOOLS_LOGGER_SAMPLE_RATE": "0.1",
            "POWERTOOLS_TRACE_SAMPLE_RATE": "0.1",
         },
            handler="index.handler",
            on_success=destinations.SqsDestination(success_queue),
            on_failure=destinations.SqsDestination(failure_queue),
            timeout=Duration.seconds(15)
        )

        # associate lambda with sns as event source.
        sns_source = events.SnsEventSource(
        sns_topic,
        dead_letter_queue=dlq)
        processor_function.add_event_source(sns_source)

     # X-Ray トレーシングの有効化
     processor_function.add_to_role_policy(
        iam.PolicyStatement(
           actions=['xray:PutTraceSegments', 'xray:PutTelemetryRecords'],
           resources=['*']
        )
     )
     
     

処理の例として以下のように実装することができる。

# lambda/index.py

import json
import random
import datetime
from typing import Dict, Any

from aws_lambda_powertools import Logger, Metrics, Tracer
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.data_classes import SNSEvent
from aws_lambda_powertools.utilities.validation import validate_schema
from aws_lambda_powertools.utilities.validation.exceptions import SchemaValidationError

# Powertoolsの初期化
logger = Logger(service="message-processor")
metrics = Metrics(namespace="MessageProcessing")
tracer = Tracer(service="message-processor")

# 入力メッセージのJSONスキーマ
MESSAGE_SCHEMA = {
    "type": "object",
    "properties": {
        "id": {"type": "string"},
        "data": {
            "type": "object",
            "properties": {
                "name": {"type": "string"},
                "value": {"type": "number"}
            },
            "required": ["name", "value"]
        },
        "timestamp": {"type": "string"}
    },
    "required": ["id", "data", "timestamp"]
}

class ProcessingError(Exception):
    """カスタム処理エラー"""
    pass

@logger.inject_lambda_context
@metrics.log_metrics
@tracer.capture_lambda_handler
def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
    """
    SNSメッセージを処理するLambda関数
    
    Args:
        event: SNSイベント
        context: Lambda context
    
    Returns:
        dict: 処理結果
    """
    try:
        # SNSイベントとしてパース
        sns_event = SNSEvent(event)
        
        # メトリクスの初期化
        metrics.add_metric(name="MessagesReceived", unit=MetricUnit.Count, value=1)
        
        for record in sns_event.records:
            # SNSメッセージの取得とパース
            try:
                message_data = json.loads(record.sns.message)
                
                # スキーマ検証
                validate_schema(event=message_data, schema=MESSAGE_SCHEMA)
                
                logger.info("Message validation successful", extra={
                    "message_id": message_data["id"],
                    "timestamp": message_data["timestamp"]
                })
                
                # メッセージの処理
                with tracer.capture_method():
                    processed_result = process_message(message_data)
                
                # 成功メトリクスの記録
                metrics.add_metric(
                    name="ProcessingSuccess",
                    unit=MetricUnit.Count,
                    value=1
                )
                
                return {
                    'statusCode': 200,
                    'body': json.dumps({
                        'message': 'Successfully processed message',
                        'processed_data': processed_result
                    })
                }
                
            except json.JSONDecodeError as e:
                logger.error("Invalid JSON in message", extra={"error": str(e)})
                metrics.add_metric(name="InvalidJSON", unit=MetricUnit.Count, value=1)
                raise
                
            except SchemaValidationError as e:
                logger.error("Schema validation failed", extra={"error": str(e)})
                metrics.add_metric(name="ValidationErrors", unit=MetricUnit.Count, value=1)
                raise
    
    except Exception as e:
        logger.exception("Error processing message")
        metrics.add_metric(name="ProcessingErrors", unit=MetricUnit.Count, value=1)
        raise

@tracer.capture_method
def process_message(message_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    メッセージを処理する関数
    
    Args:
        message_data: 処理対象のメッセージデータ
    
    Returns:
        dict: 処理結果
    """
    start_time = datetime.datetime.now()
    
    try:
        # 処理時間のシミュレーション
        processing_time = random.uniform(0.1, 0.5)
        tracer.put_annotation(key="processing_time", value=str(processing_time))
        
        # ランダムエラーの発生(テスト用)
        if random.random() < 0.3:
            raise ProcessingError("Random processing error occurred")
        
        # メッセージの処理ロジック
        result = {
            'original_data': message_data,
            'processed': True,
            'processed_timestamp': str(datetime.datetime.now())
        }
        
        # 処理時間のメトリクス記録
        end_time = datetime.datetime.now()
        processing_duration = (end_time - start_time).total_seconds()
        metrics.add_metric(
            name="ProcessingDuration",
            unit=MetricUnit.Milliseconds,
            value=processing_duration * 1000
        )
        
        logger.info(
            "Message processing completed",
            extra={
                "message_id": message_data["id"],
                "processing_duration": processing_duration
            }
        )
        
        return result
        
    except Exception as e:
        logger.error(
            "Error in message processing",
            extra={
                "message_id": message_data["id"],
                "error": str(e)
            }
        )
        raise

ポーリングベース呼び出し(Poll-based)

SQSを使用するポーリングベース呼び出しでは、メッセージの可視性タイムアウトと保持期間を考慮したエラーハンドリングが必要です。

LambdaとSQSを使用する際、Lambdaがメッセージを処理できず、メッセージの保持期間が切れた場合、SQSはそのメッセージを破棄します。メッセージの処理失敗は、タイムアウトや不正なペイロードなどの関数処理の失敗が原因となることがあります。また、送信先の関数が存在しない場合や、適切な権限がない場合にも処理失敗が発生する可能性があります。

破棄されたメッセージを保持するために、ソースキューに別のデッドレターキュー(DLQ)を設定することができます。DLQは元のメッセージを保持し、根本原因の分析、エラー状態の適切な処理、手動介入が必要な通知の送信に役立ちます。ポールベースの呼び出しシナリオでは、Lambda関数自体はDLQを維持せず、SQSで設定された外部DLQに依存します。

以下は、LambdaがSQSキューからイベントをポーリングしてLambda関数を呼び出すように設定する際の設計パターンを示しています。


上記のアーキテクチャをAWS CDKを使用した実装例:

from aws_cdk import (
    aws_sqs as sqs,
    aws_lambda as _lambda,
    Stack,
)
from constructs import Construct
class SqsProcessingStack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)
        
        # DLQ
        dlq = sqs.Queue(
            self, "DeadLetterQueue",
            retention_period=Duration.days(14)
        )

     # メインキュー
        main_queue = sqs.Queue(
            self, "MainQueue",
            visibility_timeout=Duration.seconds(30),
            retention_period=Duration.days(14),
            dead_letter_queue=sqs.DeadLetterQueue(
                 max_receive_count=10,
                 queue=dlq)
        
        )
        
        # Pooling Lambda を作成
     pooling_function = _lambda.Function(
            self, "Pooling",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="index.handler",
            timeout=Duration.seconds(15),
            environment={
              "POWERTOOLS_SERVICE_NAME": "sqs-processing",
              "POWERTOOLS_METRICS_NAMESPACE": "SQSProcessing",
              "LOG_LEVEL": "INFO"
           },
        )
        
        # Lambda関数にSQSの権限を付与
        main_queue.grant_consume_messages(pooling_function)

     # X-Ray トレーシングの有効化
     pooling_function.add_to_role_policy(
        iam.PolicyStatement(
           actions=['xray:PutTraceSegments', 'xray:PutTelemetryRecords'],
           resources=['*']
        )
     )
        
        # associate lambda with sqs as event source.
        sqs_source = events.SnsEventSource(
        main_queue,
        batch_size=1)
        pooling_function.add_event_source(sns_source)
        

     # SNSトピックの作成(アラート通知用)
        alert_topic = sns.Topic(
            self, "DLQAlertTopic",
            display_name="DLQ Alert Topic",
            topic_name="dlq-alert-topic"
        )
        
        # メール通知の設定
        alert_topic.add_subscription(
            sns_subscriptions.EmailSubscription('myemailaddress@example.com')
        )

     # DLQのメトリクスアラームを作成
        dlq_alarm = cloudwatch.Alarm(
            self, "DLQMessagesAlarm",
            metric=dlq.metric_approximate_number_of_messages_visible(),
            evaluation_periods=1,
            threshold=1,  # DLQに1つ以上メッセージが存在する場合
            comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
            alarm_description="Messages detected in DLQ"
        )
        
        # アラームをSNSトピックに関連付け
        dlq_alarm.add_alarm_action(
            cloudwatch_actions.SnsAction(alert_topic)
        )

実際のLambdaの実装例として以下のようになる。

from aws_lambda_powertools import Logger, Metrics, Tracer
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext
from typing import Any, Dict

# Powertools の初期化
logger = Logger()
metrics = Metrics()
tracer = Tracer()

@logger.inject_lambda_context
@metrics.log_metrics
@tracer.capture_lambda_handler
def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
    try:
        # SQSメッセージの処理
        for record in event.get('Records', []):
            message = record.get('body', '')
            message_id = record.get('messageId', '')
            
            logger.info(f"Processing message", 
                extra={
                    "message_id": message_id,
                    "message": message
                })
            
            # メッセージ処理のメトリクス
            metrics.add_metric(name="ProcessedMessages", unit=MetricUnit.Count, value=1)
            
            # ここにメッセージ処理のビジネスロジックを実装
            process_message(message)
            
        # 成功メトリクスの記録
        metrics.add_metric(name="SuccessfulProcessing", unit=MetricUnit.Count, value=1)
        
        return {
            'statusCode': 200,
            'body': 'Successfully processed messages'
        }
        
    except Exception as e:
        # エラーログの出力
        logger.exception("Error processing messages")
        
        # エラーメトリクスの記録
        metrics.add_metric(name="ProcessingErrors", unit=MetricUnit.Count, value=1)
        
        raise e

@tracer.capture_method
def process_message(message: str) -> None:
    """
    メッセージ処理のビジネスロジック
    """
    # メッセージ処理のロジックをここに実装
    logger.info(f"Processing business logic for message")
    # 処理の実装...

参考

https://aws.amazon.com/blogs/compute/implementing-aws-lambda-error-handling-patterns/

無料相談実施中
AWSを使用したサーバーレスアプリケーションの構築
サーバーレス開発内製化、AWSへの移行等
様々な課題について無料でご相談お受けいたします。
最新情報をチェックしよう!