clan-cli: add deploykit

This commit is contained in:
Jörg Thalheim 2023-08-09 13:05:26 +02:00
parent daf1058312
commit 119d68bdcd

View File

@ -0,0 +1,821 @@
# Adapted from https://github.com/numtide/deploykit
import fcntl
import logging
import math
import os
import select
import shlex
import subprocess
import sys
import time
from contextlib import ExitStack, contextmanager
from enum import Enum
from pathlib import Path
from shlex import quote
from threading import Thread
from typing import (
IO,
Any,
Callable,
Dict,
Generic,
Iterator,
List,
Literal,
Optional,
Tuple,
TypeVar,
Union,
overload,
)
# https://no-color.org
DISABLE_COLOR = not sys.stderr.isatty() or os.environ.get("NO_COLOR", "") != ""
def ansi_color(color: int) -> str:
return f"\x1b[{color}m"
class CommandFormatter(logging.Formatter):
"""
print errors in red and warnings in yellow
"""
def __init__(self) -> None:
super().__init__(
"%(prefix_color)s[%(command_prefix)s]%(color_reset)s %(color)s%(message)s%(color_reset)s"
)
self.hostnames: List[str] = []
self.hostname_color_offset = 1 # first host shouldn't get agressive red
def formatMessage(self, record: logging.LogRecord) -> str:
colorcode = 0
if record.levelno == logging.ERROR:
colorcode = 31 # red
if record.levelno == logging.WARN:
colorcode = 33 # yellow
color, prefix_color, color_reset = "", "", ""
if not DISABLE_COLOR:
command_prefix = getattr(record, "command_prefix", "")
color = ansi_color(colorcode)
prefix_color = ansi_color(self.hostname_colorcode(command_prefix))
color_reset = "\x1b[0m"
setattr(record, "color", color)
setattr(record, "prefix_color", prefix_color)
setattr(record, "color_reset", color_reset)
return super().formatMessage(record)
def hostname_colorcode(self, hostname: str) -> int:
try:
index = self.hostnames.index(hostname)
except ValueError:
self.hostnames += [hostname]
index = self.hostnames.index(hostname)
return 31 + (index + self.hostname_color_offset) % 7
def setup_loggers() -> Tuple[logging.Logger, logging.Logger]:
# If we use the default logger here (logging.error etc) or a logger called
# "deploykit", then cmdlog messages are also posted on the default logger.
# To avoid this message duplication, we set up a main and command logger
# and use a "deploykit" main logger.
kitlog = logging.getLogger("deploykit.main")
kitlog.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(logging.Formatter())
kitlog.addHandler(ch)
# use specific logger for command outputs
cmdlog = logging.getLogger("deploykit.command")
cmdlog.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(CommandFormatter())
cmdlog.addHandler(ch)
return (kitlog, cmdlog)
# loggers for: general deploykit, command output
kitlog, cmdlog = setup_loggers()
info = kitlog.info
warn = kitlog.warning
error = kitlog.error
@contextmanager
def _pipe() -> Iterator[Tuple[IO[str], IO[str]]]:
(pipe_r, pipe_w) = os.pipe()
read_end = os.fdopen(pipe_r, "r")
write_end = os.fdopen(pipe_w, "w")
try:
fl = fcntl.fcntl(read_end, fcntl.F_GETFL)
fcntl.fcntl(read_end, fcntl.F_SETFL, fl | os.O_NONBLOCK)
yield (read_end, write_end)
finally:
read_end.close()
write_end.close()
FILE = Union[None, int]
# Seconds until a message is printed when _run produces no output.
NO_OUTPUT_TIMEOUT = 20
class HostKeyCheck(Enum):
# Strictly check ssh host keys, prompt for unknown ones
STRICT = 0
# Trust on ssh keys on first use
TOFU = 1
# Do not check ssh host keys
NONE = 2
class DeployHost:
def __init__(
self,
host: str,
user: Optional[str] = None,
port: Optional[int] = None,
key: Optional[str] = None,
forward_agent: bool = False,
command_prefix: Optional[str] = None,
host_key_check: HostKeyCheck = HostKeyCheck.STRICT,
meta: Dict[str, Any] = {},
verbose_ssh: bool = False,
) -> None:
"""
Creates a DeployHost
@host the hostname to connect to via ssh
@port the port to connect to via ssh
@forward_agent: wheter to forward ssh agent
@command_prefix: string to prefix each line of the command output with, defaults to host
@host_key_check: wether to check ssh host keys
@verbose_ssh: Enables verbose logging on ssh connections
@meta: meta attributes associated with the host. Those can be accessed in custom functions passed to `run_function`
"""
self.host = host
self.user = user
self.port = port
self.key = key
if command_prefix:
self.command_prefix = command_prefix
else:
self.command_prefix = host
self.forward_agent = forward_agent
self.host_key_check = host_key_check
self.meta = meta
self.verbose_ssh = verbose_ssh
def _prefix_output(
self,
displayed_cmd: str,
print_std_fd: Optional[IO[str]],
print_err_fd: Optional[IO[str]],
stdout: Optional[IO[str]],
stderr: Optional[IO[str]],
timeout: float = math.inf,
) -> Tuple[str, str]:
rlist = []
if print_std_fd is not None:
rlist.append(print_std_fd)
if print_err_fd is not None:
rlist.append(print_err_fd)
if stdout is not None:
rlist.append(stdout)
if stderr is not None:
rlist.append(stderr)
print_std_buf = ""
print_err_buf = ""
stdout_buf = ""
stderr_buf = ""
start = time.time()
last_output = time.time()
while len(rlist) != 0:
r, _, _ = select.select(rlist, [], [], min(timeout, NO_OUTPUT_TIMEOUT))
def print_from(
print_fd: IO[str], print_buf: str, is_err: bool = False
) -> Tuple[float, str]:
read = os.read(print_fd.fileno(), 4096)
if len(read) == 0:
rlist.remove(print_fd)
print_buf += read.decode("utf-8")
if (read == b"" and len(print_buf) != 0) or "\n" in print_buf:
# print and empty the print_buf, if the stream is draining,
# but there is still something in the buffer or on newline.
lines = print_buf.rstrip("\n").split("\n")
for line in lines:
if not is_err:
cmdlog.info(
line, extra=dict(command_prefix=self.command_prefix)
)
pass
else:
cmdlog.error(
line, extra=dict(command_prefix=self.command_prefix)
)
print_buf = ""
last_output = time.time()
return (last_output, print_buf)
if print_std_fd in r and print_std_fd is not None:
(last_output, print_std_buf) = print_from(
print_std_fd, print_std_buf, is_err=False
)
if print_err_fd in r and print_err_fd is not None:
(last_output, print_err_buf) = print_from(
print_err_fd, print_err_buf, is_err=True
)
now = time.time()
elapsed = now - start
if now - last_output > NO_OUTPUT_TIMEOUT:
elapsed_msg = time.strftime("%H:%M:%S", time.gmtime(elapsed))
cmdlog.warn(
f"still waiting for '{displayed_cmd}' to finish... ({elapsed_msg} elapsed)",
extra=dict(command_prefix=self.command_prefix),
)
def handle_fd(fd: Optional[IO[Any]]) -> str:
if fd and fd in r:
read = os.read(fd.fileno(), 4096)
if len(read) == 0:
rlist.remove(fd)
else:
return read.decode("utf-8")
return ""
stdout_buf += handle_fd(stdout)
stderr_buf += handle_fd(stderr)
if now - last_output >= timeout:
break
return stdout_buf, stderr_buf
def _run(
self,
cmd: List[str],
displayed_cmd: str,
shell: bool,
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
timeout: float = math.inf,
) -> subprocess.CompletedProcess[str]:
with ExitStack() as stack:
read_std_fd, write_std_fd = (None, None)
read_err_fd, write_err_fd = (None, None)
if stdout is None or stderr is None:
read_std_fd, write_std_fd = stack.enter_context(_pipe())
read_err_fd, write_err_fd = stack.enter_context(_pipe())
if stdout is None:
stdout_read = None
stdout_write = write_std_fd
elif stdout == subprocess.PIPE:
stdout_read, stdout_write = stack.enter_context(_pipe())
else:
raise Exception(f"unsupported value for stdout parameter: {stdout}")
if stderr is None:
stderr_read = None
stderr_write = write_err_fd
elif stderr == subprocess.PIPE:
stderr_read, stderr_write = stack.enter_context(_pipe())
else:
raise Exception(f"unsupported value for stderr parameter: {stderr}")
env = os.environ.copy()
env.update(extra_env)
with subprocess.Popen(
cmd,
text=True,
shell=shell,
stdout=stdout_write,
stderr=stderr_write,
env=env,
cwd=cwd,
) as p:
if write_std_fd is not None:
write_std_fd.close()
if write_err_fd is not None:
write_err_fd.close()
if stdout == subprocess.PIPE:
assert stdout_write is not None
stdout_write.close()
if stderr == subprocess.PIPE:
assert stderr_write is not None
stderr_write.close()
start = time.time()
stdout_data, stderr_data = self._prefix_output(
displayed_cmd,
read_std_fd,
read_err_fd,
stdout_read,
stderr_read,
timeout,
)
try:
ret = p.wait(timeout=max(0, timeout - (time.time() - start)))
except subprocess.TimeoutExpired:
p.kill()
raise
if ret != 0:
if check:
raise subprocess.CalledProcessError(
ret, cmd=cmd, output=stdout_data, stderr=stderr_data
)
else:
cmdlog.warning(
f"[Command failed: {ret}] {displayed_cmd}",
extra=dict(command_prefix=self.command_prefix),
)
return subprocess.CompletedProcess(
cmd, ret, stdout=stdout_data, stderr=stderr_data
)
raise RuntimeError("unreachable")
def run_local(
self,
cmd: Union[str, List[str]],
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
timeout: float = math.inf,
) -> subprocess.CompletedProcess[str]:
"""
Command to run locally for the host
@cmd the commmand to run
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
@extra_env environment variables to override whe running the command
@cwd current working directory to run the process in
@timeout: Timeout in seconds for the command to complete
@return subprocess.CompletedProcess result of the command
"""
shell = False
if isinstance(cmd, str):
cmd = [cmd]
shell = True
displayed_cmd = " ".join(cmd)
cmdlog.info(
f"$ {displayed_cmd}", extra=dict(command_prefix=self.command_prefix)
)
return self._run(
cmd,
displayed_cmd,
shell=shell,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
timeout=timeout,
)
def run(
self,
cmd: Union[str, List[str]],
stdout: FILE = None,
stderr: FILE = None,
become_root: bool = False,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
verbose_ssh: bool = False,
timeout: float = math.inf,
) -> subprocess.CompletedProcess[str]:
"""
Command to run on the host via ssh
@cmd the commmand to run
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
@become_root if the ssh_user is not root than sudo is prepended
@extra_env environment variables to override whe running the command
@cwd current working directory to run the process in
@verbose_ssh: Enables verbose logging on ssh connections
@timeout: Timeout in seconds for the command to complete
@return subprocess.CompletedProcess result of the ssh command
"""
sudo = ""
if become_root and self.user != "root":
sudo = "sudo -- "
vars = []
for k, v in extra_env.items():
vars.append(f"{shlex.quote(k)}={shlex.quote(v)}")
displayed_cmd = ""
export_cmd = ""
if vars:
export_cmd = f"export {' '.join(vars)}; "
displayed_cmd += export_cmd
if isinstance(cmd, list):
displayed_cmd += " ".join(cmd)
else:
displayed_cmd += cmd
cmdlog.info(
f"$ {displayed_cmd}", extra=dict(command_prefix=self.command_prefix)
)
if self.user is not None:
ssh_target = f"{self.user}@{self.host}"
else:
ssh_target = self.host
ssh_opts = ["-A"] if self.forward_agent else []
if self.port:
ssh_opts.extend(["-p", str(self.port)])
if self.key:
ssh_opts.extend(["-i", self.key])
if self.host_key_check != HostKeyCheck.STRICT:
ssh_opts.extend(["-o", "StrictHostKeyChecking=no"])
if self.host_key_check == HostKeyCheck.NONE:
ssh_opts.extend(["-o", "UserKnownHostsFile=/dev/null"])
if verbose_ssh or self.verbose_ssh:
ssh_opts.extend(["-v"])
bash_cmd = export_cmd
bash_args = []
if isinstance(cmd, list):
bash_cmd += 'exec "$@"'
bash_args += cmd
else:
bash_cmd += cmd
# FIXME we assume bash to be present here? Should be documented...
ssh_cmd = (
["ssh", ssh_target]
+ ssh_opts
+ [
"--",
f"{sudo}bash -c {quote(bash_cmd)} -- {' '.join(map(quote, bash_args))}",
]
)
return self._run(
ssh_cmd,
displayed_cmd,
shell=False,
stdout=stdout,
stderr=stderr,
cwd=cwd,
check=check,
timeout=timeout,
)
T = TypeVar("T")
class HostResult(Generic[T]):
def __init__(self, host: DeployHost, result: Union[T, Exception]) -> None:
self.host = host
self._result = result
@property
def error(self) -> Optional[Exception]:
"""
Returns an error if the command failed
"""
if isinstance(self._result, Exception):
return self._result
return None
@property
def result(self) -> T:
"""
Unwrap the result
"""
if isinstance(self._result, Exception):
raise self._result
return self._result
DeployResults = List[HostResult[subprocess.CompletedProcess[str]]]
def _worker(
func: Callable[[DeployHost], T],
host: DeployHost,
results: List[HostResult[T]],
idx: int,
) -> None:
try:
results[idx] = HostResult(host, func(host))
except Exception as e:
kitlog.exception(e)
results[idx] = HostResult(host, e)
class DeployGroup:
def __init__(self, hosts: List[DeployHost]) -> None:
self.hosts = hosts
def _run_local(
self,
cmd: Union[str, List[str]],
host: DeployHost,
results: DeployResults,
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
verbose_ssh: bool = False,
timeout: float = math.inf,
) -> None:
try:
proc = host.run_local(
cmd,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
timeout=timeout,
)
results.append(HostResult(host, proc))
except Exception as e:
kitlog.exception(e)
results.append(HostResult(host, e))
def _run_remote(
self,
cmd: Union[str, List[str]],
host: DeployHost,
results: DeployResults,
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
verbose_ssh: bool = False,
timeout: float = math.inf,
) -> None:
try:
proc = host.run(
cmd,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
verbose_ssh=verbose_ssh,
timeout=timeout,
)
results.append(HostResult(host, proc))
except Exception as e:
kitlog.exception(e)
results.append(HostResult(host, e))
def _reraise_errors(self, results: List[HostResult[Any]]) -> None:
errors = 0
for result in results:
e = result.error
if e:
cmdlog.error(
f"failed with: {e}",
extra=dict(command_prefix=result.host.command_prefix),
)
errors += 1
if errors > 0:
raise Exception(
f"{errors} hosts failed with an error. Check the logs above"
)
def _run(
self,
cmd: Union[str, List[str]],
local: bool = False,
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
verbose_ssh: bool = False,
timeout: float = math.inf,
) -> DeployResults:
results: DeployResults = []
threads = []
for host in self.hosts:
fn = self._run_local if local else self._run_remote
thread = Thread(
target=fn,
kwargs=dict(
results=results,
cmd=cmd,
host=host,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
verbose_ssh=verbose_ssh,
timeout=timeout,
),
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if check:
self._reraise_errors(results)
return results
def run(
self,
cmd: Union[str, List[str]],
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
verbose_ssh: bool = False,
timeout: float = math.inf,
) -> DeployResults:
"""
Command to run on the remote host via ssh
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
@cwd current working directory to run the process in
@verbose_ssh: Enables verbose logging on ssh connections
@timeout: Timeout in seconds for the command to complete
@return a lists of tuples containing DeployNode and the result of the command for this DeployNode
"""
return self._run(
cmd,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
verbose_ssh=verbose_ssh,
timeout=timeout,
)
def run_local(
self,
cmd: Union[str, List[str]],
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
timeout: float = math.inf,
) -> DeployResults:
"""
Command to run locally for each host in the group in parallel
@cmd the commmand to run
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
@cwd current working directory to run the process in
@extra_env environment variables to override whe running the command
@timeout: Timeout in seconds for the command to complete
@return a lists of tuples containing DeployNode and the result of the command for this DeployNode
"""
return self._run(
cmd,
local=True,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
timeout=timeout,
)
def run_function(
self, func: Callable[[DeployHost], T], check: bool = True
) -> List[HostResult[T]]:
"""
Function to run for each host in the group in parallel
@func the function to call
"""
threads = []
results: List[HostResult[T]] = [
HostResult(h, Exception(f"No result set for thread {i}"))
for (i, h) in enumerate(self.hosts)
]
for i, host in enumerate(self.hosts):
thread = Thread(
target=_worker,
args=(func, host, results, i),
)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if check:
self._reraise_errors(results)
return results
def filter(self, pred: Callable[[DeployHost], bool]) -> "DeployGroup":
"""Return a new DeployGroup with the results filtered by the predicate"""
return DeployGroup(list(filter(pred, self.hosts)))
@overload
def run(
cmd: Union[List[str], str],
text: Literal[True] = ...,
stdout: FILE = ...,
stderr: FILE = ...,
extra_env: Dict[str, str] = ...,
cwd: Union[None, str, Path] = ...,
check: bool = ...,
) -> subprocess.CompletedProcess[str]:
...
@overload
def run(
cmd: Union[List[str], str],
text: Literal[False],
stdout: FILE = ...,
stderr: FILE = ...,
extra_env: Dict[str, str] = ...,
cwd: Union[None, str, Path] = ...,
check: bool = ...,
) -> subprocess.CompletedProcess[bytes]:
...
def run(
cmd: Union[List[str], str],
text: bool = True,
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
) -> subprocess.CompletedProcess[Any]:
"""
Run command locally
@cmd if this parameter is a string the command is interpreted as a shell command,
otherwise if it is a list, than the first list element is the command
and the remaining list elements are passed as arguments to the
command.
@text when true, file objects for stdout and stderr are opened in text mode.
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
@extra_env environment variables to override whe running the command
@cwd current working directory to run the process in
@check If check is true, and the process exits with a non-zero exit code, a
CalledProcessError exception will be raised. Attributes of that exception
hold the arguments, the exit code, and stdout and stderr if they were
captured.
"""
if isinstance(cmd, list):
info("$ " + " ".join(cmd))
else:
info(f"$ {cmd}")
env = os.environ.copy()
env.update(extra_env)
return subprocess.run(
cmd,
stdout=stdout,
stderr=stderr,
env=env,
cwd=cwd,
check=check,
shell=not isinstance(cmd, list),
text=text,
)