こんにちは。takapy(@takapy0210)です。
表題の件で少し困ったので、備忘がてら記事に残しておこうと思います。
やろうとしていたこと
BigQueryのPython SDKを用いて、Pandasで読み込んだデータをBigQueryのテーブルに保存する処理で、後述するエラーが発生しました。
ちなみに下記のBigQueryクライアントライブラリ(google-cloud-bigquery
)を使っています。
エラー内容
以下のようなエラーが発生しました。
400 Resources exceeded during query execution: UDF out of memory.; Failed to read Parquet file /bigstore/hogehoge. This might happen if the file contains a row that is too large, or if the total size of the pages loaded for the queried columns is too large.
ファイルに大きすぎる行が含まれている場合などに発生するエラーのようです。
今回のデータ全体のサイズが大き過ぎるため発生したと思われます。(詳細は不明...)
ちなみに今回BigQueryに保存しようとしていたデータは、CSVファイルのサイズが3GBくらい、データ件数が300万件ほどのテキストデータを含むDataFrameになっています。
該当箇所のコード
基本的には公式サンプル*1通りの実装です。
下記コードの job.result()
の処理で今回のエラーが発生していました。
# データをimportする関数例 def import_to_bigquery(df, schema, schema_definition): """BQにデータを保存する""" client = bigquery.Client() table_id = f'{schema.project}.{schema.dataset_id}.{schema.table_id}' job_config = bigquery.LoadJobConfig( write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # 上書き schema=schema_definition, ) job = client.load_table_from_dataframe( df, table_id, location='US', job_config=job_config, ) job.result() # ここでエラー発生 table = client.get_table(table_id) LOGGER.info(f'Imported data: {table.num_rows}rows and {len(table.schema)}columns to {schema.table_id}')
work around
試しにデータ件数を10000サンプルで実行してみると問題なく動いたので、DataFrameをchunk化しながらBigQueryに保存すればいけるのでは?ということで下記のような修正して実行したところ、期待する動作になりました。
def import_to_bigquery(df, schema, schema_definition): """BQにデータを保存する""" client = bigquery.Client() table_id = f'{schema.project}.{schema.dataset_id}.{schema.table_id}' for index, df_chunk in enumerate(np.array_split(df, 10)): # 10分割しながらBigQueryにimportする. 最初の1回は上書きをし、残りは追加を行う. if index == 0: job_config = bigquery.LoadJobConfig( write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # 上書き schema=schema_definition, ) else: job_config = bigquery.LoadJobConfig( write_disposition=bigquery.WriteDisposition.WRITE_APPEND, # 追加 schema=schema_definition, ) job = client.load_table_from_dataframe( df_chunk, table_id, location='US', job_config=job_config, ) job.result() table = client.get_table(table_id) LOGGER.info(f'Imported data: {table.num_rows}rows and {len(table.schema)}columns to {schema.table_id}')
コードを見てわかるように、chunk化にはnumpy.array_split
*2 を使用しました。
read_csv()
のchunksize
引数を指定しても良かったのですが、前処理を行っている都合もあり今回は不採用としました。
最後に
公式リポジトリのISSUE*3 などを漁っても解決方法が見つからなかったので、現状はこのようにデータを分割しながら回避するしかなさそうです。