Python で外部コマンドを実行してその出力(標準出力・標準エラー)をリアルタイムにキャプチャする方法についてです。
「 Python で外部コマンド実行」といえばまっさきに思いつくのは標準ライブラリの subprocess
ですが、(私の理解が正しければ) subprocess
を使うとコマンドが終了するまでその出力を Python でキャプチャできません。
外部コマンドの出力をある程度リアルタイムに Python でキャプチャしたい場合は asyncio
の subprocess
系の機能が有用です subprocess 。
具体的には asyncio.create_subprocess_exec()
または asyncio.create_subprocess_shell()
を使用します。
追記 2022/01/24
この方法で stdout.readline()
や stderr.readline()
を使用すると、まれに while
ループを抜けられないことがあります。 Python 公式ドキュメントにも次のような警告が書かれています。
Warning: Use thecommunicate()
method rather thanprocess.stdin.write()
,await process.stdout.read()
orawait process.stderr.read
. This avoids deadlocks due to streams pausing reading or writing and blocking the child process.
参考にされる際はご注意ください。
次のコードは Python 公式の asyncio.create_subprocess_exec()
のサンプルコード(こちら)を少し変更したものです。
asyncio_run.py
:
import asyncio
import sys
async def run(program: str, args: list[str]) -> None:
"""外部コマンドを実行する"""
proc = await asyncio.create_subprocess_exec(
program,
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
while True:
if proc.stdout.at_eof() and proc.stderr.at_eof():
break
stdout = (await proc.stdout.readline()).decode()
if stdout:
print(f'[stdout] {stdout}', end='', flush=True)
stderr = (await proc.stderr.readline()).decode()
if stderr:
print(f'[sdterr] {stderr}', end='', flush=True, file=sys.stderr)
await asyncio.sleep(1)
await proc.communicate()
print(f'{program} {" ".join(args)} exited with {proc.returncode}')
asyncio.run(run('sh', ['./never-ending-script.sh']))
このスクリプトを実行すると、カレントディレクトリにある never-ending-script.sh
を実行し、その出力を 1 秒間隔で Python でキャプチャしてから出力します。
たとえば never-ending-script.sh
に次の内容を書き込んで実行すると挙動を確認することができます。
echo 'started.'
sleep 5
echo 'in progress.'
sleep 5
echo 'finished.'
ポイントは、 await asyncio.create_subprocess_exec()
で生成した proc
に対して proc.communicate()
をすぐに実行して完了まで待機するのではなく、 proc.stdout
proc.stderr
をチェックするループを回すことです。
proc.stdout.at_eof()
と proc.stderr.at_eof()
の両方が True
であれば出力がなくなった(≒処理が終了した)とみなすことができます。
このようにすることで、実行に時間がかかる処理を実行した場合でも途中経過を Python 側でウォッチできます。
ちなみに、もし標準出力と標準エラーを Python 側でキャプチャする必要がなければ(=目視確認でよければ)書くべきコードは非常にシンプルです。
asyncio
を使う場合は次のような感じで書けます(し、わざわざ asyncio
を使わずとも subprocess
で十分です)。
asyncio_run_2.py
:
import asyncio
import sys
async def run2(program: str, args: list[str]) -> None:
"""外部コマンドを実行する(出力のキャプチャは行わない)"""
proc = await asyncio.create_subprocess_exec(program, *args)
await proc.communicate()
print(f'{program} {" ".join(args)} exited with {proc.returncode}')
asyncio.run(run2('sh', ['./never-ending-script.sh']))
ちなみに、コマンドの出力をキャプチャしてさらに終了ステータスも取得したい場合は、もう少し長いコードを書く必要があります。 一連のロジックをラップしたクラスを書くと、たとえば次のような感じになるでしょうか。
asyncio_run_3.py
:
import asyncio
class Runner:
"""外部コマンドを実行する"""
def __init__(self, program, args, interval=1):
self.program = program
self.args = args
self.interval = interval
self.proc = None
async def start(self):
"""コマンドの実行を開始する"""
self.proc = await asyncio.create_subprocess_exec(
self.program,
*self.args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
async def stream(self):
"""実行中のコマンドの出力(標準出力・標準エラー)を返す async generator"""
while True:
if self.proc.stdout.at_eof() and self.proc.stderr.at_eof():
break
stdout = await self.proc.stdout.readline()
stderr = await self.proc.stderr.readline()
yield stdout.decode(), stderr.decode()
await asyncio.sleep(self.interval)
async def wait(self):
"""コマンドが終了するまで待機してリターンコードを返す"""
if self.proc is None:
return None
await self.proc.communicate()
return self.proc.returncode
この Runner
クラスは次のようにして使用します。
import sys
async def main():
runner = Runner('sh', ['./never-ending-script.sh'])
# コマンドを実行する
await runner.start()
# コマンドの出力を取得して標準出力・標準エラーに流す
async for stdout, stderr in runner.stream():
if stdout:
print(f'[stdout] {stdout}', end='', flush=True)
if stderr:
print(f'[sdterr] {stderr}', end='', flush=True, file=sys.stderr)
# コマンドの終了を待つ
returncode = await runner.wait()
print(f'return code is {returncode}')
asyncio.run(main())
この場合 Runner.stream()
が async generator なので async for
を使って中身を取り出すことができます。
Runner
インスタンスの利用には async
キーワードを使う必要があるため、 async
関数 main()
を定義して asyncio.run(main())
で実行しています。
ということで、Python で外部コマンドを実行してその出力をリアルタイムに取得する方法についてでした。
動くサンプルを GitHub Gist に置いたので興味のある方はそちらもご覧ください。
Python: Stream output of asyncio.create_subprocess_exec()
· GitHub
- 標準出力・標準エラーを Python でキャプチャせずにそのまま流せばよい場合は
subprocess
が使えます。↩