forked from clan/clan-core
cmd: rework redirecting stdout/stderr
This commit is contained in:
parent
a5137efd48
commit
fa00bb522b
@ -1,10 +1,12 @@
|
||||
import logging
|
||||
import os
|
||||
import select
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any, NamedTuple
|
||||
from typing import IO, Any, NamedTuple
|
||||
|
||||
from .errors import ClanError
|
||||
|
||||
@ -17,47 +19,56 @@ class CmdOut(NamedTuple):
|
||||
cwd: Path | None = None
|
||||
|
||||
|
||||
def handle_output(process: subprocess.Popen) -> tuple[str, str]:
|
||||
rlist = [process.stdout, process.stderr]
|
||||
stdout_buf = b""
|
||||
stderr_buf = b""
|
||||
|
||||
while len(rlist) != 0:
|
||||
r, _, _ = select.select(rlist, [], [], 0)
|
||||
|
||||
def handle_fd(fd: IO[Any] | None) -> bytes:
|
||||
if fd and fd in r:
|
||||
read = os.read(fd.fileno(), 4096)
|
||||
if len(read) != 0:
|
||||
return read
|
||||
rlist.remove(fd)
|
||||
return b""
|
||||
|
||||
ret = handle_fd(process.stdout)
|
||||
sys.stdout.buffer.write(ret)
|
||||
stdout_buf += ret
|
||||
ret = handle_fd(process.stderr)
|
||||
sys.stderr.buffer.write(ret)
|
||||
stderr_buf += ret
|
||||
return stdout_buf.decode("utf-8"), stderr_buf.decode("utf-8")
|
||||
|
||||
|
||||
def run(cmd: list[str], cwd: Path = Path.cwd()) -> CmdOut:
|
||||
# Start the subprocess
|
||||
process = subprocess.Popen(
|
||||
cmd, cwd=str(cwd), stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
cmd, cwd=str(cwd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
|
||||
)
|
||||
|
||||
# Initialize empty strings for output and error
|
||||
output = b""
|
||||
error = b""
|
||||
stdout_buf, stderr_buf = handle_output(process)
|
||||
|
||||
# Iterate over the stdout stream
|
||||
for c in iter(lambda: process.stdout.read(1), b""): # type: ignore
|
||||
# Convert bytes to string and append to output
|
||||
output += c
|
||||
# Write to terminal
|
||||
sys.stdout.buffer.write(c)
|
||||
# Iterate over the stderr stream
|
||||
for c in iter(lambda: process.stderr.read(1), b""): # type: ignore
|
||||
# Convert bytes to string and append to error
|
||||
error += c
|
||||
# Write to terminal
|
||||
sys.stderr.buffer.write(c)
|
||||
# Wait for the subprocess to finish
|
||||
process.wait()
|
||||
rc = process.wait()
|
||||
|
||||
output_str = output.decode("utf-8")
|
||||
error_str = error.decode("utf-8")
|
||||
|
||||
if process.returncode != 0:
|
||||
if rc != 0:
|
||||
raise ClanError(
|
||||
f"""
|
||||
command: {shlex.join(cmd)}
|
||||
working directory: {cwd}
|
||||
exit code: {process.returncode}
|
||||
exit code: {rc}
|
||||
stderr:
|
||||
{error_str}
|
||||
{stderr_buf}
|
||||
stdout:
|
||||
{output_str}
|
||||
{stdout_buf}
|
||||
"""
|
||||
)
|
||||
return CmdOut(output_str, error_str, cwd=cwd)
|
||||
|
||||
return CmdOut(stdout_buf, stderr_buf, cwd=cwd)
|
||||
|
||||
|
||||
def runforcli(func: Callable[..., dict[str, CmdOut]], *args: Any) -> None:
|
||||
|
Loading…
Reference in New Issue
Block a user