ギークなエンジニアを目指す男

機械学習系の知識を蓄えようとするブログ

【書籍メモ】Python実践入門を読了したので機械学習PJにも使えそうなところをメモる

f:id:taxa_program:20200301183114p:plain

こんにちは。たかぱい(@takapy0210)です。

本日は【Python実践入門】を読了したので、それの備忘です。

はじめに

僕自身、Pythonは機械学習を学ぶためのいちツールとして勉強をしてきました。
そのため、Pythonの学習は機械学習に関する書籍だったり、kaggleを始めとしたデータ分析プラットフォームに公開されているkernelを写経したりすることから始めました。

学生時代にC++やobjective-cを触っていたりしたため、Python(≒機械学習)を勉強し始めた時にも、言語としての基本的な構文などには困りませんでしたが、お世辞にもPythonを体系的に学んできたか、と問われたときにYESとは言い辛く、(良くも悪くも)偏った情報源から蓄積された知識しかない状況でした。

そこで今回は、Pythonという言語そのものを体系的に取得できたら、と思い本書籍を読みました。

本の構成や対象読者に関しては、筆者の方のブログをご参照ください。

www.rhoboro.com

本記事は読了してみての所感や、Pythonを利用した機械学習プロジェクトでも使えそうな部分についての備忘録として残したものです。

全体を通して

Pythonの基本的な構文は網羅しつつ、開発環境やパッケージ周りの管理手法、Python特有の様々な機能まで網羅されており、Python歴が浅い自分にとってはとても有用な書籍でした。

個人的には、Kaggleなどでコーディングする際にはあまり意識していなかった下記について学べたのはとても大きかったです。

  • DocString
  • ジェネレータ、デコレータ、コンテキストマネージャー
  • 並行処理
  • ユニットテスト

これらについて、機械学習プロジェクトなどでも使えそうなものとして簡単にまとめていければと思います。
(ユニットテストに関しては毛色が異なるため、別記事で書こうと思います)

Docstring

Pythonの開発は、特定の企業や個人ではなく、コミュニティによって支えられおり、 このPythonコミュニティの特徴とPythonの開発を支えている、PEP(PythonEnhancementProposals)と呼ばれる仕組みがあります。

DocstringはPEP 257で定義されているドキュメントの書き方です。

Docstringは通常のコメントと同様にソースコードを読むときに役立ちますが、 それだけでなく組み込み関数help()などプログラムからも参照して活用できます。

kaggleなどではあまり意識していませんでした、業務などのプロダクションで動かすコードなどには意識して書くようにしています。

Docstringの例

Docstringを書くときは、次のように3つのクオート("""または''')で囲んで記述します。

def increment(n):
    """引数に1を加えて返す関数
    
    :paramn:数値
    """
    return n + 1

PEP 257は文法を規定しているものではないので「引数や戻り値の情報をどう書くか」などはプロジェクト毎に決めて良さそうです。

よく使われる手法には下記があると紹介されていました。
(個人的にはGoogle Python StyleGuideが見易く好みです)

ジェネレータ、デコレータ、コンテキストマネージャー

Python特有の機能でジェネレータ、デコレータ、コンテキストマネージャーというものがあります。
使わなくてもコーディングはできますが、上手く活用することで、コード量の削減、パフォーマンスの向上や可読性の向上などにつながりそうだな、と感じました。

以下で簡単にまとめます。

ジェネレータ

ジェネレータはリストやタプルのように、for文で利用できるイテラブルなオブジェクトです。
リストやタプルは全ての要素をメモリ上に保持するため、要素数が増えれば増えるほどメモリ使用量も増える欠点があります。
これに対しジェネレータは、次の要素が求められるたびに新たな要素を生成して返せます。

つまり、要素数にかかわらずメモリ使用量を小さく保つことのできる、メモリ効率の良いイテラブルなオブジェクトです。

具体的な使用例

ファイルの内容を変換するジェネレータの実例として、ファイルの中身を大文字に変換するプログラムを書いてみます。

このプログラムではファイルを1行ずつ読み込むジェネレータ関数reader()を作成し、その戻り値をwriter()関数に渡します。
writer()関数は、受け取ったイテレータを利用してファイルを1行ずつ読み込み、convert()関数で変換しながら、結果を新しいファイルに1行ずつ書き込んでいきます。

読み込み→変換→書き込みの一連の流れを1行ずつ行うため、元のファイルのサイズが大きくてもメモリを圧迫することなく処理させることが可能になります。

ここでは、reader()関数の内部にあるyield式がジェネレータの目印です。

def convert(line):
    return line.upper()

def writer(dest, reader):
    with open(dest, 'w') as f:
        for line in reader:
            f.write(convert(line))
            
def reader(src):
    with open(src) as f:
        for line in f:
            yield line  # ジェネレータ

# 存在する任意のファイルのパスを渡す
r = reader('src.txt')
writer('dest.txt', r)

ジェネレータは値を無限に返したい時や大きなデータを扱いたいときに効果を発揮しそうです。

データ分析の文脈でも、大量のテキストや画像データを扱うシーンが少なくないと思うので、そのようなときにジェネレータを使うことでパフォーマンスの向上が期待できそうです。

また、本書では実装中のコードでリストを返却している箇所がある場合は積極的にジェネレータに置き換えることを推奨しています。
たしかにメモリ効率の面で受ける恩恵を大きそうなので、この辺りも意識して実装していこうと思いました。

また、ジェネレータ式というものもあります。
ジェネレータ式はリストやタプルなどのイテラブルがあるときは、内包表記を使ってイテラブルからジェネレータを作成できるというものです。
生成方法はリスト内包表記と同じ構文で、[]の代わりに()を使います。

x = [1,2,3,4,5]

# これはリスト内包表記
listcomp = [i**2 for i in x]
listcomp  # すべての要素がメモリ上にすぐ展開される
>> [1,4,9,16,25]

# これはジェネレータ式
gen = (i**2 for i in x)
gen  # 各要素は必要になるまで計算されない
>> '<generatorobject<genexpr>at0x10bc10408>'

# リストにしたときに初めて最後の要素まで計算される
list(gen)
>> [1,4,9,16,25]

デコレータ

デコレータは、関数やクラスの前後に任意の処理を追加できるシンプルな機能です。
用途は多岐に渡り、例えば次の用途でよく利用されるようです。

  • 関数の引数チェック
  • 関数の呼び出し結果のキャッシュ
  • 関数の実行時間の計測
  • WebAPIでのハンドラの登録、ログイン状態による制限

デコレータは関数やクラスの定義の前に@で始まる文字列を記述するだけで使用できます。

業務ではFlaskでML APIの開発をしているので、@app.route('/')を付けてWebAPIのハンドラを指定する、みたいな事はやっていたのですが、本書の中には実際に知らない使い方もありとても勉強になりました。

複数のデコレータを使ってすごい事(語彙力)もできそうでしたが、沼にハマりそうなのでここではサラッと触れます。

具体的な使用例(関数の結果をキャッシュ)

ここではキャッシュの例を紹介します。 下記のようにfunctools.lru_cache()を利用することで、関数の結果をキャッシュすることができます。
実際に2回目に呼び出した際には、キャッシュがヒットし、すぐに結果を得ることができました。

from functools import lru_cache
from time import sleep

# 最近の呼び出し最大16回分までキャッシュ
@lru_cache(maxsize=16)
def heavy_funcion(n):
    sleep(3)  # 重い処理をシミュレート
    return n + 1

# 初回は時間がかかる
heavy_funcion(2)

# キャッシュにヒットするのですぐに結果を得られる
heavy_funcion(2)

具体的な使用例(関数の処理時間を計測する自作デコレータ)

デコレータの2つ目の実例として、関数の処理時間を計測するデコレータを自作してみます。
デコレータelapsed_time()を次のように定義します。

こうすることで、処理時間を計測したい関数に@elapsed_timeをつけることで計測することができます。

from functools import wraps
import time
def elapsed_time(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        start = time.time()
        v = f(*args, **kwargs)
        print('{} done in {:.2f} s'.format(f.__name__, (time.time() - start)))
        return v
    return wrapper

# 0からn-1までの総和を計算する関数
@elapsed_time
def func_hoge(n):
    return sum(i for i in range(n))

print(func_hoge(1000000))

>> func_hoge done in 0.06 s

実際に関数の処理時間を計測したい場面は多々あると思います。
そんなときに上記のようなデコレータを1つ定義しておくことで、手軽に計測できるのはとても便利だと思いました。

コンテキストマネージャー

with文に対応したオブジェクトをコンテキストマネージャーと呼びます。
コンテキストマネージャーを利用している代表例にopen()があり、これは多くの人が使ったことのある使い方だと思います。

with文はよくtry: finally:の置き換えで利用されますが、その本質はサンドイッチのように、ある処理の前後の処理をまとめて再利用可能にしてくれる点にあります。

open()関数での利用例

with open('some.txt', 'w') as f:
    f.write('python')

具体的な使用例

例えば下記のようなコンテキストマネージャーを設定することで、特定箇所のデバッグレベルを変更することができます。

最初にINFOレベルを指定しているため、DEBUGレベルのログを出力するlogger.debug()のログは無視されます。
しかし、withブロック内に限っては、一時的にログレベルをDEBUGレベルまで引き下げているため、withブロック内で実行したlogger.debug()のログは特別に出力されます。

import logging
from contextlib import contextmanager

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())

# デフォルトをINFOレベルとし、DEBUGレベルのログは無視する
logger.setLevel(logging.INFO)

@contextmanager
def debug_context():
    level = logger.level
    try:
        # ログレベルを変更する
        logger.setLevel(logging.DEBUG)
        yield
    finally:
        # もとのログレベルに戻す
        logger.setLevel(level)

def main():
    logger.info('before: info log')
    logger.debug('before: debug log')

    # DEBUGログを見たい処理をwithブロック内で実行する
    with debug_context():
        logger.info('inside the block: info log')
        logger.debug('inside the block: debug log')

    logger.info('after: info log')
    logger.debug('after: debug log')

if __name__ == '__main__':
    main()

コンテキストマネージャーは、ある処理の前後の処理をまとめて再利用可能にしてくれます。
この視点で見ると、活用できるシーンは非常に多そうです。

たとえば、次の処理はコンテキストマネージャーを使って実現できると本書では述べています。

  • 開始/終了のステータス変更や通知
  • ネットワークやDBの接続/切断処理

デコレータと被ってしまいますが、たとえば下記のようなコンテキストマネージャーを定義することで、処理の開始/終了通知と処理時間を出力させることができます。

@contextmanager
def timer(name):
    try:
        t0 = time.time()
        logging.info('{} start'.format(name))
        yield
    finally:
        logging.info('{} done in {:.2f} s'.format(name, (time.time() - t0)))
        
with timer('前処理'):
    train = preprosessing(train)

>> 前処理 start
>> 前処理 done in 0.05s

並行処理

並行処理は、プログラム実行時のパフォーマンスを向上させるために必要不可欠です。
kaggleなどの分析コンペにおいても、実行時間に制限のあるコンペもあり学んでおいて損はない分野だと思います。

この章では

  • マルチスレッドを使う方法
  • マルチプロセスを使う方法
  • イベントループを使う方法

の3つの方法が説明されています。

並行処理を導入する際、どの方法が適しているかはその処理が 「CPUバウンドな処理か」 「I/Oバウンドな処理か」 で分かれます。

それぞれの特徴は次のとおりです。

  • CPUバウンドな処理(暗号化など)
    数値計算などCPUのリソースを使って計算を行う処理複数のコアを同時に使って並列処理を行えるマルチプロセスが有効です。 PythonにはGILがあるため、マルチスレッド、イベントループによる処理高速化は期待できないようです。

  • I/Oバウンドな処理(データベースへの接続など)
    WebAPIの利用など通信による待ち時間が発生する処理に対して、マルチプロセス、マルチスレッド、イベントループいずれも有効です。 どの方法を選択するかはオーバーヘッドや実装しやすさを考慮して決めると良いようです。 一般的に、実行時のオーバーヘッドは大きいものから順にマルチプロセス、マルチスレッド、イベントループとなります。

ここでもそれぞれについて簡単にまとめます。

マルチスレッドベースの非同期実行が効果的なケース(ThreadPoolExecutorクラスを使用)

前述したようにI/Oバウンドな処理では、マルチスレッド化は有効な選択肢になります。
I/Oを伴う処理は、その処理にかかる時間がハードウェアやネットワークなどの外部に依存します。
そのため、プログラムを書き換えても個々の処理の高速化は期待できません。

しかし、複数の処理がある場合は非同期実行で並行化すると、通信中の待ち時間を有効活用できて合計時間を短縮できます。

具体的な使用例

複数のサイトのトップページをダウンロードする処理を考えます。
ダウンロード処理は待ち時間が発生するI/O処理の典型的な例です。
比較のためまずは逐次処理で実装し、その後マルチスレッド処理に変更します。

from hashlib import md5
from pathlib import Path
from urllib import request
import time
from functools import wraps
from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed
)

# 対象ページのURL一覧
urls = [
    'https://twitter.com',
    'https://facebook.com',
    'https://instagram.com',
]

# 処理時間を計測するデコレータ
def elapsed_time(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        start = time.time()
        v = f(*args, **kwargs)
        print('{} done in {:.2f} s'.format(f.__name__, (time.time() - start)))
        return v
    return wrapper

def download(url):
    req = request.Request(url)
    # ファイル名に/などが含まれないようにする
    name = md5(url.encode('utf-8')).hexdigest()
    file_path = './' + name
    with request.urlopen(req) as res:
        Path(file_path).write_bytes(res.read())
        return url, file_path

# 逐次処理
@elapsed_time
def get_sequential():
    for url in urls:
        print(download(url))

# 並行処理
@elapsed_time
def get_multi_thread():
    # max_workersのデフォルトはコア数x5
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(download, url)
                   for url in urls]
        for future in as_completed(futures):
            # 完了したものから取得できる
            print(future.result())

# 逐次処理
get_sequential()
>> ('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
>> ('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
>> ('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
>> get_sequential: 2.929287910461426

# 並行処理
get_multi_thread()
>> ('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
>> ('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
>> ('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
>> get_multi_thread: 1.8627910614013672

実際に実行してみると、並行処理の方が1sほど時間を短縮することができました。

マルチプロセスベースの非同期実行が効果的なケース(ProcessPoolExecutorクラスを使用)

マルチプロセスは、I/Oバウンドな処理だけでなく数値計算などのCPUバウンドな処理の高速化にも有効です。
これは、マルチプロセスであればGILの制約を受けずに、複数コアを同時に使って並列処理を行えるためだそうです。

具体的な使用例

CPUバウンドな処理には暗号化(復号)や次元数の大きい行列演算などがありますが、今回は本書の例に従いフィボナッチ数列を計算します。
(マルチプロセスは対話モードでの実行は適しません。そのため、ここで実行するマルチプロセス処理はスクリプトとして実行する必要があります)

import os
import time
import sys
from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed
)

def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, b + a
    else:
        return a

def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

@elapsed_time
def get_sequential(nums):
    for num in nums:
        fibonacci(num)

@elapsed_time
def get_multi_thread(nums):
    with ThreadPoolExecutor() as e:
        futures = [e.submit(fibonacci, num)
                   for num in nums]
        for future in as_completed(futures):
            future.result()

def main():
    n = int(sys.argv[1])
    nums = [n] * os.cpu_count()
    get_sequential(nums)
    get_multi_thread(nums)

if __name__ == '__main__':
    main()

私の環境で実行してみたところ、マルチプロセスの方が若干時間がかかる、という結果になりました。
物理コア数:6, 論理コア数:12 なので結構速くなることを期待しておりましたが、これはなぜか分かりません。。。
(引数の値をもう少し大きくすれば、また違った結果が得られたのかもしれません。気が向いたら調査してみようと思います)

python fib.py 500000
>> get_sequential: 27.83519196510315
>> get_multi_thread: 28.488017320632935

イベントループを利用した並行処理(asyncioモジュール)

asyncioモジュールを用いる事で、途中で処理を中断させ、別の処理を実行させることができます。
処理を中断させるポイントは、I/O処理による待ち時間が発生する箇所とその処理を呼び出している箇所です。

asyncioモジュールを使ううえで、欠かせない要素がコルーチンです。
コルーチンとはサブルーチンと同じく一連の処理をまとめたものです。

サブルーチンはPythonでは関数にあたり、一度呼び出されると先頭から最後まで(または途中で何かが返されるまで)一気に実行されます。
一方、コルーチンは処理の途中で中断、再開ができる性質を持ちます。この性質を利用すると、複数の処理を並行して動作させられます。

具体的な使用例

例としてここではWebAPIの利用を想定します。 まずWebAPIを利用する処理をコルーチンとして定義します。 そして、そのコルーチンを呼び出す処理もまたコルーチンとして定義します。

このように実装していくと必然的にコルーチンの中でコルーチンを呼び出す箇所が出てきます。
そこが処理を中断できるポイントで、コード上ではawaitキーワードを記述します。awaitキーワードがあっても、戻り値は通常の関数呼び出しと同様に扱えます。

下記例では、普通にcall_web_api()を定義し呼び出すと、レスポンスが返ってくるまでは次の処理に進めません。
しかし、コルーチンを定義して呼び出すことで、レスポンスは先に返ってきたものから順に処理されます。

import asyncio
import random

async def call_web_api(url):
    # Web APIの処理をここではスリープで代用
    print(f'send a request: {url}')
    await asyncio.sleep(random.random())
    print(f'got a response: {url}')
    return url

async def async_download(url):
    # awaitを使ってコルーチンを呼び出す
    response = await call_web_api(url)
    return response

async def main():
    task = asyncio.gather(
        async_download('https://twitter.com/'),
        async_download('https://facebook.com'),
        async_download('https://instagram.com'),
    )
    return await task

# スクリプトから実行する場合はasyncioを使用する
result = asyncio.run(main())

# notebookで実行する場合はawaitを使用する
result = await main()

ここでの処理の流れをみてみます。

  1. gather()関数の引数に渡した最初のコルーチンが実行されます。その処理中に、asyncio.sleep()まで進むと処理が中断され、2番目のコルーチンが動き始めます。asyncio.sleep()で処理が中断される理由は、I/O処理による待ち時間が発生するためです。
  2. 同様に2番目のコルーチンもasyncio.sleep()まで進むと処理が中断され、3番目のコルーチンが動き始めます。
  3. そして、3番目のコルーチンがasyncio.sleep()まで進み処理が中断されたあとは、レスポンスが返ってくるまで待機されます。
  4. そのあと、レスポンスが返ってきて再開可能になったコルーチンから順次処理が再開されています。最後のレスポンスの処理まで終わると、gather()関数は戻り値となるリストを返します。

resultの中身をみてみると、リストの順番は処理順に関わらず、gather()関数に渡したコルーチンの順番と一致します。

result

>> ['https://twitter.com/','https://facebook.com','https://instagram.com'] 

コルーチンを動かすために必要なイベントループとタスク

コルーチンを動かすために必要なものがイベントループとタスクです。
コルーチンはその実行がスケジューリングされるとタスクになります。 そして、イベントループがI/Oイベントに応じてタスクの実行を制御します。

イベントループ

acyncio.run()関数を呼び出すと新しいイベントループが作成され、このイベントループがコルーチンの実行を制御します。
コルーチンの内部では、現在実行中のイベントループをasyncio.get_running_loop()関数で取得できます。

import asyncio
async def main():
    loop = asyncio.get_running_loop()
    print(loop)
    
await main()
>> <_UnixSelectorEventLoop running=True closed=False debug=False>

イベントループが実行されていることが分かります。
このイベントループがさまざまなI/Oイベントに応じてスケジューリングされた処理を実行してくれます。

タスク

コルーチンを実行する方法は3つ用意されています。

  • 1つ目はasyncio.run()に渡す方法
  • 2つ目はコルーチンの内部でawaitコルーチンとする方法
  • 3つ目は後述するタスクを作成して実行する方法

タスクとは、実行がスケジューリングされたコルーチンをカプセル化したものです。
タスクの作成は、次のようにasyncio.create_task()関数を使います。この呼び出しの裏では、先ほどのasyncio.get_running_loop()関数で取得されるループを使ってタスクが作成されます。

async def coro(n):
    await asyncio.sleep(n)
    return n

async def main():
    task = asyncio.create_task(coro(1))
    print(task)
    return await task

# print()時点ではまだPending状態
# awaitで初めて実行されていることが分かる
await main()
>> <Task pending coro=<coro() running at <ipython-input-49-adc0461ab5af>:1>>
>> 1

タスクを作成すると、次のように並行して実行できます。
コルーチンのまま呼び出した場合と比較するとその違いを感じられます。(コルーチンのまま呼び出しても並行処理はされない)

どちらもawaitを使ってコルーチンを呼び出しているので、main処理自体は一瞬で終了していることが分かります。

タスクを作成して実行

# 3秒で完了する
@elapsed_time
async def main():
    task1 = asyncio.create_task(coro(1))
    task2 = asyncio.create_task(coro(2))
    task3 = asyncio.create_task(coro(3))
    print(await task1)
    print(await task2)
    print(await task3)
    
await main()
>> main: 9.5367431640625e-07
>> 1
>> 2
>> 3

コルーチンのまま実行

# 並行処理できないため6秒かかる
@elapsed_time
async def main():
    print(await coro(1))
    print(await coro(2))
    print(await coro(3))
    
await main()
>> main: 1.9073486328125e-06
>> 1
>> 2
>> 3

最後に

冒頭でも述べましたが、Pythonを体系的には学んできていない人にとっては一読する価値のある本だと思いました。

並行処理など、まだ詳細を把握仕切れていない部分はありつつも、パフォーマンスを考慮する際には避けては通れない箇所だと思うので実践を通して身につけていこう思います。