asyncio / async / await
| 対応: | Python 3.4(2014) |
|---|
非同期処理の基本概念(Promise的な仕組み)を使います。JavaScriptの『async/await』に慣れている方はイメージしやすいでしょう。
『asyncio』はPython標準の非同期I/Oフレームワークです。『async def』で非同期関数(コルーチン)を定義し、『await』で別のコルーチンの完了を待ちます。スレッドを使わずに単一スレッドでI/O待ち処理を効率よく並行実行でき、多数のHTTPリクエストやファイルI/Oが発生するアプリケーションで特に効果的です。
構文
import asyncio
# 非同期関数(コルーチン)の定義
async def my_coroutine():
await asyncio.sleep(1) # 非同期で待機
return result
# コルーチンの実行
asyncio.run(my_coroutine())
# 複数コルーチンの並行実行
results = await asyncio.gather(coro1(), coro2(), coro3())
関数・クラス一覧
| 関数・クラス | 概要 |
|---|---|
| async def func() | 非同期関数(コルーチン関数)を定義する。コルーチンとは、処理の途中で一時停止・再開できる特殊な関数のことです。 |
| await 式 | コルーチン・タスク・Futureの完了を待つ。async def内でのみ使える。 |
| asyncio.run(coro) | イベントループを起動してコルーチンを実行する。Python 3.7+。 |
| asyncio.gather(*coros) | 複数のコルーチンを並行実行して結果のリストを返す。 |
| asyncio.sleep(seconds) | 非同期で指定秒数待機する(他のコルーチンの実行を妨げない)。 |
| asyncio.create_task(coro) | コルーチンをタスクとしてスケジューリングする。 |
| asyncio.wait_for(coro, timeout) | タイムアウト付きでコルーチンを実行する。 |
| asyncio.Queue() | 非同期処理用のFIFOキュー。FIFO(First In, First Out)は先に入れたデータを先に取り出す方式です。 |
サンプルコード
asyncio.py
import asyncio
import time
# 基本的な非同期関数
async def say_hello(name, delay):
await asyncio.sleep(delay) # この間、他のコルーチンが実行できる
print(f"こんにちは、{name}!")
async def main():
# 順次実行(合計3秒)
await say_hello('桐生一馬', 1)
await say_hello('真島吾朗', 2)
asyncio.run(main())
# gather: 並行実行(合計2秒、最長のもの)
async def main_parallel():
start = time.time()
await asyncio.gather(
say_hello('桐生一馬', 1),
say_hello('真島吾朗', 2),
say_hello('秋山駿', 1.5),
)
print(f"合計時間: {time.time() - start:.1f}秒") # 約2.0秒
asyncio.run(main_parallel())
# create_task: タスクとして即座にスケジューリング
async def fetch_data(url, delay):
await asyncio.sleep(delay)
return f"{url} のデータ"
async def main_tasks():
# タスクとして登録(すぐに開始)
task1 = asyncio.create_task(fetch_data('https://api1.example.com', 1))
task2 = asyncio.create_task(fetch_data('https://api2.example.com', 2))
result1 = await task1
result2 = await task2
print(result1)
print(result2)
asyncio.run(main_tasks())
# asyncio.Queue: プロデューサー・コンシューマーパターン
async def producer(q, items):
for item in items:
await q.put(item)
await asyncio.sleep(0.5)
await q.put(None) # 終了シグナル
async def consumer(q):
while True:
item = await q.get()
if item is None:
break
print(f"処理中: {item}")
q.task_done()
async def main_queue():
q = asyncio.Queue()
await asyncio.gather(
producer(q, [1, 2, 3, 4, 5]),
consumer(q),
)
asyncio.run(main_queue())
python3 asyncio.py こんにちは、桐生一馬! こんにちは、真島吾朗! こんにちは、桐生一馬! こんにちは、秋山駿! こんにちは、真島吾朗! 合計時間: 2.0秒 https://api1.example.com のデータ https://api2.example.com のデータ 処理中: 1 処理中: 2 処理中: 3 処理中: 4 処理中: 5
実践パターン: 複数URLへの並行リクエスト
fetch_parallel.py
import asyncio
# httpx(非同期HTTPクライアント)を使った並行リクエストのパターン
# インストール: pip install httpx
async def fetch(client, url):
try:
response = await client.get(url, timeout=5.0)
return {"url": url, "status": response.status_code}
except Exception as e:
return {"url": url, "error": str(e)}
async def fetch_all(urls):
import httpx
async with httpx.AsyncClient() as client:
tasks = [fetch(client, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
]
# asyncio.run() で非同期エントリーポイントを呼び出す
results = asyncio.run(fetch_all(urls))
for r in results:
print(r)
pip install httpx
python3 fetch_parallel.py
{'url': 'https://httpbin.org/delay/1', 'status': 200}
{'url': 'https://httpbin.org/delay/2', 'status': 200}
{'url': 'https://httpbin.org/status/200', 'status': 200}
3つのURLに並行リクエストを送信します。すべてのリクエストが完了するまで待機し、各URLのステータスコードを出力します。接続できない場合は『error』キーにエラーメッセージが入ります。
よくあるミス1: time.sleep()の使用
async def 内で time.sleep() を使うとイベントループ全体がブロックされます。必ず asyncio.sleep() を使ってください。
import asyncio
# NG: async def 内で time.sleep() を使うとイベントループ全体がブロックされる
async def bad_wait():
import time
time.sleep(2) # ブロッキング!他のコルーチンが動けなくなる
import asyncio
# OK: 必ず asyncio.sleep() を使う
async def good_wait():
await asyncio.sleep(2) # 他のコルーチンに制御を渡せる
よくあるミス2: awaitの省略
コルーチンを await せずに呼び出してもすぐには実行されません(RuntimeWarning)。
import asyncio
# NG: コルーチンを await せずに呼び出してもすぐには実行されない
async def main():
asyncio.sleep(1) # 警告: コルーチンが実行されない(RuntimeWarning)
import asyncio
# OK: コルーチンは必ず await する
async def main():
await asyncio.sleep(1)
よくあるミス3: asyncio.run()のネスト
asyncio.run() の中で再度 asyncio.run() を呼ぶとエラーになります。
asyncio_run_ng.py
import asyncio
# NG: asyncio.run() の中で再度 asyncio.run() を呼ぶとエラーになる
async def outer():
asyncio.run(inner()) # RuntimeError: This event loop is already running.
python3 asyncio_run_ng.py Traceback (most recent call last): ... RuntimeError: This event loop is already running.
import asyncio
# OK: async def の中では await を使う
async def outer():
await inner()
asyncio を使う関数はすべてコルーチン(async def)にする必要があります。通常の関数(def)の中で await は使えません。
概要
asyncioはシングルスレッドで動作するイベントループを使って、複数のコルーチンを切り替えながら並行実行します。『await』に到達するとイベントループが別のコルーチンに制御を渡すため、I/O待ち時間を有効活用できます。
『asyncio.gather()』は全コルーチンを並行実行して、すべての結果をリストで返します。いずれかのコルーチンで例外が発生した場合はその例外を送出します。例外を無視したい場合はreturn_exceptions=Trueを指定します。
asyncioはI/O待ちには強力ですが、CPU密集処理はイベントループをブロックするため効果がありません。CPU密集処理と組み合わせたい場合はasyncio.to_thread()(Python 3.9+)を使ってスレッドに処理を委譲します。また、httpxやaiohttpなど非同期対応のHTTPクライアントライブラリと組み合わせて使うことが多いです。
記事の間違いや著作権の侵害等ございましたらお手数ですがこちらまでご連絡頂ければ幸いです。