If you want to treat stdout and stderr separately (as opposed to sending stderr to stdout, see my simplified answer), you can spawn two threads that handle them concurrently (live as the output is produced).
Adapted from my more detailed answer:
import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen
def stream_command(
args,
*,
stdout_handler=logging.info,
stderr_handler=logging.error,
check=True,
text=True,
stdout=PIPE,
stderr=PIPE,
**kwargs,
):
"""Mimic subprocess.run, while processing the command output in real time."""
with (
Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process,
ThreadPoolExecutor(2) as pool, # two threads to handle the (live) streams separately
):
exhaust = partial(deque, maxlen=0) # collections recipe: exhaust an iterable at C-speed
exhaust_async = partial(pool.submit, exhaust) # exhaust non-blocking in a background thread
exhaust_async(stdout_handler(line[:-1]) for line in process.stdout)
exhaust_async(stderr_handler(line[:-1]) for line in process.stderr)
retcode = process.poll() # block until both iterables are exhausted (process finished)
if check and retcode:
raise CalledProcessError(retcode, process.args)
return CompletedProcess(process.args, retcode)
Call with simple print handlers:
stream_command(["echo", "test"], stdout_handler=print, stderr_handler=print)
# test
Or with custom handlers:
outs, errs = [], []
def stdout_handler(line):
outs.append(line)
print(line)
def stderr_handler(line):
errs.append(line)
print(line)
stream_command(
["echo", "test"],
stdout_handler=stdout_handler,
stderr_handler=stderr_handler,
)
# test
print(outs)
# ['test']