ギークなエンジニアを目指す男

機械学習系の知識を蓄えようとするブログ

PandasからBigQueryにデータを保存する際に「Resources exceeded during query execution: UDF out of memory. ..... columns is too large」エラーが出た時の対処方法

こんにちは。takapy(@takapy0210)です。

表題の件で少し困ったので、備忘がてら記事に残しておこうと思います。

やろうとしていたこと

BigQueryのPython SDKを用いて、Pandasで読み込んだデータをBigQueryのテーブルに保存する処理で、後述するエラーが発生しました。

ちなみに下記のBigQueryクライアントライブラリ(google-cloud-bigquery)を使っています。

cloud.google.com

エラー内容

以下のようなエラーが発生しました。

400 Resources exceeded during query execution: UDF out of memory.; Failed to read Parquet file /bigstore/hogehoge. This might happen if the file contains a row that is too large, or if the total size of the pages loaded for the queried columns is too large.

ファイルに大きすぎる行が含まれている場合などに発生するエラーのようです。
今回のデータ全体のサイズが大き過ぎるため発生したと思われます。(詳細は不明...)

ちなみに今回BigQueryに保存しようとしていたデータは、CSVファイルのサイズが3GBくらい、データ件数が300万件ほどのテキストデータを含むDataFrameになっています。

該当箇所のコード

基本的には公式サンプル*1通りの実装です。

下記コードの job.result() の処理で今回のエラーが発生していました。

# データをimportする関数例
def import_to_bigquery(df, schema, schema_definition):
    """BQにデータを保存する"""

    client = bigquery.Client()
    table_id = f'{schema.project}.{schema.dataset_id}.{schema.table_id}'

    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  # 上書き
        schema=schema_definition, 
    )
    job = client.load_table_from_dataframe(
        df,
        table_id,
        location='US',
        job_config=job_config,
    )
    job.result()  # ここでエラー発生
    table = client.get_table(table_id)
    LOGGER.info(f'Imported data: {table.num_rows}rows and {len(table.schema)}columns to {schema.table_id}')

work around

試しにデータ件数を10000サンプルで実行してみると問題なく動いたので、DataFrameをchunk化しながらBigQueryに保存すればいけるのでは?ということで下記のような修正して実行したところ、期待する動作になりました。

def import_to_bigquery(df, schema, schema_definition):
    """BQにデータを保存する"""

    client = bigquery.Client()
    table_id = f'{schema.project}.{schema.dataset_id}.{schema.table_id}'

    for index, df_chunk in enumerate(np.array_split(df, 10)):
        # 10分割しながらBigQueryにimportする. 最初の1回は上書きをし、残りは追加を行う.
        if index == 0:
            job_config = bigquery.LoadJobConfig(
                write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  # 上書き
                schema=schema_definition,
            )
        else:
            job_config = bigquery.LoadJobConfig(
                write_disposition=bigquery.WriteDisposition.WRITE_APPEND,  # 追加
                schema=schema_definition,
            )
        job = client.load_table_from_dataframe(
            df_chunk,
            table_id,
            location='US',
            job_config=job_config,
        )
        job.result()

    table = client.get_table(table_id)
    LOGGER.info(f'Imported data: {table.num_rows}rows and {len(table.schema)}columns to {schema.table_id}')

コードを見てわかるように、chunk化にはnumpy.array_split*2 を使用しました。
read_csv()chunksize 引数を指定しても良かったのですが、前処理を行っている都合もあり今回は不採用としました。

最後に

公式リポジトリのISSUE*3 などを漁っても解決方法が見つからなかったので、現状はこのようにデータを分割しながら回避するしかなさそうです。