ギークなエンジニアを目指す男

機械学習系の知識を蓄えようとするブログ

pythonを使ってDynamoDBの複数テーブルから非同期でデータ取得してみる

こんにちは。たかぱい(@takapy0210)です。

DynamoDBの複数テーブルからなるべく高速にデータを取得するために、非同期でデータ取得することはできるのか?を少し調べてみたのですが、あまり事例が無かったのでメモ程度に残しておきます。

ユースケースとしては、例えば user_id をkeyとしたテーブルが複数あり、それぞれからデータを取得し、最終的なレスポンスを生成したい場合などに使えるかと思います。

一般的なデータ取得方法

まず初めに、boto3を使ってDynamoDBからデータを取得する方法をみていきます。

後述する非同期処理の恩恵が分かりやすいように、sleepを入れて意図的に時間がかかるようにしています。

import time
import boto3

dynamodb = boto3.resource("dynamodb")

def get_item_from_table_sync(table_name, key, delay):
    """DynamoDBからデータを取得する"""
    time.sleep(delay)  # 意図的にsleepを入れている
    table = dynamodb.Table(table_name)
    response = table.get_item(Key=key)
    return response.get('Item')

def main_sync():
    user_id = xxxx
    results = []
    start_time = time.time()

    # 1つ目のテーブルから取得
    dynamo_response = get_item_from_table_sync(
        table_name="dynamo_tableA",
        key={"user_id": user_id},
        delay=10
    )
    results.append(dynamo_response)

    # 2つ目のテーブルから取得
    dynamo_response = get_item_from_table_sync(
        table_name="dynamo_tableB",
        key={"user_id": user_id},
        delay=7
    )
    results.append(dynamo_response)
    
    print("Total time:", time.time() - start_time)
    return results

result = main_sync()

# >>> Total time: 17.16487693786621

上記処理は各テーブルからデータ取得する際に10s、7sのディレイを入れているので、合計の処理時間は17秒ほどかかります。

次に、これを非同期処理に書き換えてみます。

boto3のAPIを非同期で使えるaioboto3ライブラリ

探してみると、aiboto3というboto3 と aiobotocore を組み合わせたaioboto3というラッパーがあったので、これを利用していきます。

github.com

今回はDynamoDBしか使用していませんが、READMEを読む限りS3やAthenaなども非同期で扱うことができるようです。

非同期でDynamoDBからデータ取得する

以下ではpythonのasyncioという標準ライブラリを使って非同期処理(並行処理)をしていきますが、asyncioそのものについては触れませんので、気になる方は公式ドキュメントなどを参照してみてください。

docs.python.org

import time

import asyncio
import aioboto3


async def get_item_from_table_async(dynamo_resource, table_name, key, delay):
    """DynamoDBからデータを取得する"""
    await asyncio.sleep(delay)
    table = await dynamo_resource.Table(table_name)
    response = await table.get_item(Key=key)
    return response.get('Item')

async def main_async():
    user_id = xxxx
    start_time = time.time()
    
    session = aioboto3.Session()
    async with session.resource('dynamodb', region_name='ap-northeast-1') as dynamo_resource:
        # 同時に実行する非同期タスクのリスト
        tasks = [
            get_item_from_table_async(dynamo_resource, 'dynamo_tableA', {'user_id': user_id}, 10),
            get_item_from_table_async(dynamo_resource, 'dynamo_tableB', {'user_id': user_id}, 7),
        ]
 
        # asyncio.gatherを使用して複数のタスクを同時に実行し、結果を取得
        results = await asyncio.gather(*tasks)

    print("Total time:", time.time() - start_time)
    return results
    
# jupyterなどから実行する場合は関数に直接 await をつけて実行する必要があります
# .pyで実行する場合は. asyncio.run(main_async())のように記述します
result = await main_async()

# >>> Total time: 10.13998532295227

上記処理を実行すると、処理時間は10秒ほどになり、うまく非同期処理ができていることが分かります。