こんにちは、中山( @k1nakayama ) です。
皆さんはMomentoがHTTP APIエンドポイントを提供していることをご存知ですか?当初はSDKやCLIからのみキャッシュへのアクセスが行えていましたが、2023年9月頃にサポートを開始しました。APIが提供されると、様々なことをAPI経由で行いやすくなります。例えば先日発表されたStep Functionsからの外部APIコールを行う機能を使用することで、ワークフローの中でキャッシュからデータを読み書きしたり、Topicsに対してパブリッシュすることが可能です。
今回は、そんなMomentoのHTTP APIを活用し、DynamoDBに書き込まれたデータを、自動的にキャッシュとして登録・更新する処理をEventBridgeを使用してノーコードで実装したいと思います。
- DynamoDBのデータを自動的にキャッシュして、低レイテンシーで効率的に活用したい方
- DynamoDBに書き込まれた後、HTTP APIに対するイベントを発火させる処理を実装したい方
- APIコールを行う処理をノーコードで実装することに興味がある方
アーキテクチャ

- DynamoDBのアイテムが更新されると、更新情報がDynamoDB Streamsに流れる
- EventBridge PipesによってStreamからデータを取得し、EventBridge API Destinationsに送信
- API Destinationsにて、MomentoのHTTP APIをコールし、Cache Setを行う
流れはとてもシンプルに実装できそうです。
実装してみる
DynamoDBテーブルの作成
DynamoDBの画面から「テーブルの作成」を選択し、テーブル名「user」パーティションキーを文字列型の「id」を指定し、オンデマンドモードで作成します。



次に、作成したテーブルを選択し、「エクスポートおよびストリーム」タブを選択します。DynamoDBストリームの詳細セクションで、「オンにする」を押下してください。



新旧イメージを選択し、「ストリームをオンにする」を押下してください。

Momentoの設定
ここでは細かな手順を省略させてもらいますが、AWSのap-northeast-1にキャッシュ名「user」のキャッシュが存在し、APIキーを取得してあることを前提とします。
API Destinations設定
続いて、EventBridgeのコンソールに移動し、API Destinationsの設定を行います。API Destinationsは、メニューの「APIの送信先」という名称になっています。

接続タブに切り替えて、「接続を作成」を押下します。

下記のように設定し、APIキーの値にはMomentoのAPIキーをそのまま貼り付けてください

続いて、APIの送信先タブに切り替えて、「API送信先を作成」を押下します。

下記のように設定します。
※API送信先エンドポイントはAPIキーを作成した際に表示されるエンドポイントに「/cache/*」を付加したエンドポイントになります。
https://api.cache.cell-ap-northeast-1-1.prod.a.momentohq.com/cache/*

1秒あたりの呼び出しレート制限は、Momentoの場合、データプレーンに対するレートリミットが秒間100リクエストがデフォルトとなっていますので、それを設定していますが、別途アプリケーションからの読み書きが発生することになると思いますので、50など少なめの制限に設定しておくとよいです。
EventBridge Pipesの設定
いよいよ最後のEventBridge Pipesの設定です。
EventBridgeのメニューより「パイプ」を選択し、「パイプの作成」を押下してください。

まずは、パイプ名を「ddb2momento」とつけました。(この時点で右側の作成ボタンは押さず、ソース欄の設定を行います)

ソースの設定としてDynamoDBを選択し、DynamoDBストリームをuserを選択します

続いてターゲットを選択し、下記のように設定してください。

「ターゲット入力トランスフォーマー – オプション」の欄が閉じている場合は開き、真ん中のトランスフォーマーに下記の様に入力します。
"{\"id\": \"<$.dynamodb.Keys.id.S>\", \"name\": \"<$.dynamodb.NewImage.name.S>\", \"mail\": \"<$.dynamodb.NewImage.mail.S>\", \"address\": \"<$.dynamodb.NewImage.address.S>\", \"phone\": \"<$.dynamodb.NewImage.phone.S>\", \"created_at\": \"<$.dynamodb.NewImage.created_at.S>\", \"last_purchesed_at\": \"<$.dynamodb.NewImage.last_purchesed_at.S>\"}"
この入力トランスフォーマーは、APIでBodyに渡す情報を指定します。ここではDynamoDBアイテムの更新後の値をJSON文字列としてキャッシュに格納したいため、DynamoDB Streamsからの新イメージの値をそれぞれマッピングしています。
パイプを作成したら設定終了です。
検証してみる
DynamoDBのコンソールから「user」テーブルに対し、項目を作成で下記のようにユーザー情報を登録してみます。
{
"id": "001",
"address": "東京都新宿区1-2-3",
"created_at": "2023-01-15",
"last_purchased_at": "2023-11-20",
"mail": "tanaka1@example.com",
"name": "田中一郎",
"phone": "090-1234-5678"
}

Momentoのコンソールに移動し、キャッシュのData Explorerで、キーの取得欄に「001」を指定してみましょう

見事先程登録したデータが取得できました。
今回はAPI Destinationsによって下記のcurlコマンドを実行したのと同様のリクエストが自動的に送信されました。
curl -X PUT -H 'Authorization: <Momento API Key>' -d '"{\"id\": \"<$.dynamodb.Keys.id.S>\", \"name\": \"<$.dynamodb.NewImage.name.S>\", \"mail\": \"<$.dynamodb.NewImage.mail.S>\", \"address\": \"<$.dynamodb.NewImage.address.S>\", \"phone\": \"<$.dynamodb.NewImage.phone.S>\", \"created_at\": \"<$.dynamodb.NewImage.created_at.S>\", \"last_purchesed_at\": \"<$.dynamodb.NewImage.last_purchesed_at.S>\"}"' "https://api.cache.cell-ap-northeast-1-1.prod.a.momentohq.com/cache/user?key=001&ttl_seconds=300"
まとめ
今回は、EventBridge Pipes、EventBridge API Destinationsを使いMomentoのHTTP APIをコールする形で、DynamoDBに作成されたアイテムのデータを、自動的にMomento Cacheに登録・更新する流れを作ってみました。
今回はLambda等からの呼び出しまでは触れませんでしたが、以前に私が検証した結果をもめんと会で発表した際のスライドに、DynamoDBとMomentoを組み合わせると、パフォーマンスもコストも両方向上する結果が得られておりましたので、今回のようなキャッシュ化をしたものをアプリケーションから使っていけると良いかと思います。
また、今回のMomentoのHTTP APIを別の外部サービスに差し替えるだけで、様々なイベントドリブンな処理を実装することが出来ます。
ノーコードで気軽に実装できるので、是非試してみると良いかと思います。