こんにちは。たかぱい(@takapy0210)です。
DynamoDBの複数テーブルからなるべく高速にデータを取得するために、非同期でデータ取得することはできるのか?を少し調べてみたのですが、あまり事例が無かったのでメモ程度に残しておきます。
ユースケースとしては、例えば user_id
をkeyとしたテーブルが複数あり、それぞれからデータを取得し、最終的なレスポンスを生成したい場合などに使えるかと思います。
一般的なデータ取得方法
まず初めに、boto3を使ってDynamoDBからデータを取得する方法をみていきます。
後述する非同期処理の恩恵が分かりやすいように、sleepを入れて意図的に時間がかかるようにしています。
import time import boto3 dynamodb = boto3.resource("dynamodb") def get_item_from_table_sync(table_name, key, delay): """DynamoDBからデータを取得する""" time.sleep(delay) # 意図的にsleepを入れている table = dynamodb.Table(table_name) response = table.get_item(Key=key) return response.get('Item') def main_sync(): user_id = xxxx results = [] start_time = time.time() # 1つ目のテーブルから取得 dynamo_response = get_item_from_table_sync( table_name="dynamo_tableA", key={"user_id": user_id}, delay=10 ) results.append(dynamo_response) # 2つ目のテーブルから取得 dynamo_response = get_item_from_table_sync( table_name="dynamo_tableB", key={"user_id": user_id}, delay=7 ) results.append(dynamo_response) print("Total time:", time.time() - start_time) return results result = main_sync() # >>> Total time: 17.16487693786621
上記処理は各テーブルからデータ取得する際に10s、7sのディレイを入れているので、合計の処理時間は17秒ほどかかります。
次に、これを非同期処理に書き換えてみます。
boto3のAPIを非同期で使えるaioboto3ライブラリ
探してみると、aiboto3というboto3 と aiobotocore を組み合わせたaioboto3というラッパーがあったので、これを利用していきます。
今回はDynamoDBしか使用していませんが、READMEを読む限りS3やAthenaなども非同期で扱うことができるようです。
非同期でDynamoDBからデータ取得する
以下ではpythonのasyncioという標準ライブラリを使って非同期処理(並行処理)をしていきますが、asyncioそのものについては触れませんので、気になる方は公式ドキュメントなどを参照してみてください。
import time import asyncio import aioboto3 async def get_item_from_table_async(dynamo_resource, table_name, key, delay): """DynamoDBからデータを取得する""" await asyncio.sleep(delay) table = await dynamo_resource.Table(table_name) response = await table.get_item(Key=key) return response.get('Item') async def main_async(): user_id = xxxx start_time = time.time() session = aioboto3.Session() async with session.resource('dynamodb', region_name='ap-northeast-1') as dynamo_resource: # 同時に実行する非同期タスクのリスト tasks = [ get_item_from_table_async(dynamo_resource, 'dynamo_tableA', {'user_id': user_id}, 10), get_item_from_table_async(dynamo_resource, 'dynamo_tableB', {'user_id': user_id}, 7), ] # asyncio.gatherを使用して複数のタスクを同時に実行し、結果を取得 results = await asyncio.gather(*tasks) print("Total time:", time.time() - start_time) return results # jupyterなどから実行する場合は関数に直接 await をつけて実行する必要があります # .pyで実行する場合は. asyncio.run(main_async())のように記述します result = await main_async() # >>> Total time: 10.13998532295227
上記処理を実行すると、処理時間は10秒ほどになり、うまく非同期処理ができていることが分かります。