diff --git a/pkgs/clan-cli/clan_cli/deploy/__init__.py b/pkgs/clan-cli/clan_cli/deploy/__init__.py new file mode 100644 index 00000000..18996740 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/deploy/__init__.py @@ -0,0 +1,821 @@ +# Adapted from https://github.com/numtide/deploykit + +import fcntl +import logging +import math +import os +import select +import shlex +import subprocess +import sys +import time +from contextlib import ExitStack, contextmanager +from enum import Enum +from pathlib import Path +from shlex import quote +from threading import Thread +from typing import ( + IO, + Any, + Callable, + Dict, + Generic, + Iterator, + List, + Literal, + Optional, + Tuple, + TypeVar, + Union, + overload, +) + +# https://no-color.org +DISABLE_COLOR = not sys.stderr.isatty() or os.environ.get("NO_COLOR", "") != "" + + +def ansi_color(color: int) -> str: + return f"\x1b[{color}m" + + +class CommandFormatter(logging.Formatter): + """ + print errors in red and warnings in yellow + """ + + def __init__(self) -> None: + super().__init__( + "%(prefix_color)s[%(command_prefix)s]%(color_reset)s %(color)s%(message)s%(color_reset)s" + ) + self.hostnames: List[str] = [] + self.hostname_color_offset = 1 # first host shouldn't get agressive red + + def formatMessage(self, record: logging.LogRecord) -> str: + colorcode = 0 + if record.levelno == logging.ERROR: + colorcode = 31 # red + if record.levelno == logging.WARN: + colorcode = 33 # yellow + + color, prefix_color, color_reset = "", "", "" + if not DISABLE_COLOR: + command_prefix = getattr(record, "command_prefix", "") + color = ansi_color(colorcode) + prefix_color = ansi_color(self.hostname_colorcode(command_prefix)) + color_reset = "\x1b[0m" + + setattr(record, "color", color) + setattr(record, "prefix_color", prefix_color) + setattr(record, "color_reset", color_reset) + + return super().formatMessage(record) + + def hostname_colorcode(self, hostname: str) -> int: + try: + index = self.hostnames.index(hostname) + except ValueError: + self.hostnames += [hostname] + index = self.hostnames.index(hostname) + return 31 + (index + self.hostname_color_offset) % 7 + + +def setup_loggers() -> Tuple[logging.Logger, logging.Logger]: + # If we use the default logger here (logging.error etc) or a logger called + # "deploykit", then cmdlog messages are also posted on the default logger. + # To avoid this message duplication, we set up a main and command logger + # and use a "deploykit" main logger. + kitlog = logging.getLogger("deploykit.main") + kitlog.setLevel(logging.INFO) + + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + ch.setFormatter(logging.Formatter()) + + kitlog.addHandler(ch) + + # use specific logger for command outputs + cmdlog = logging.getLogger("deploykit.command") + cmdlog.setLevel(logging.INFO) + + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + ch.setFormatter(CommandFormatter()) + + cmdlog.addHandler(ch) + return (kitlog, cmdlog) + + +# loggers for: general deploykit, command output +kitlog, cmdlog = setup_loggers() + +info = kitlog.info +warn = kitlog.warning +error = kitlog.error + + +@contextmanager +def _pipe() -> Iterator[Tuple[IO[str], IO[str]]]: + (pipe_r, pipe_w) = os.pipe() + read_end = os.fdopen(pipe_r, "r") + write_end = os.fdopen(pipe_w, "w") + + try: + fl = fcntl.fcntl(read_end, fcntl.F_GETFL) + fcntl.fcntl(read_end, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + yield (read_end, write_end) + finally: + read_end.close() + write_end.close() + + +FILE = Union[None, int] + +# Seconds until a message is printed when _run produces no output. +NO_OUTPUT_TIMEOUT = 20 + + +class HostKeyCheck(Enum): + # Strictly check ssh host keys, prompt for unknown ones + STRICT = 0 + # Trust on ssh keys on first use + TOFU = 1 + # Do not check ssh host keys + NONE = 2 + + +class DeployHost: + def __init__( + self, + host: str, + user: Optional[str] = None, + port: Optional[int] = None, + key: Optional[str] = None, + forward_agent: bool = False, + command_prefix: Optional[str] = None, + host_key_check: HostKeyCheck = HostKeyCheck.STRICT, + meta: Dict[str, Any] = {}, + verbose_ssh: bool = False, + ) -> None: + """ + Creates a DeployHost + @host the hostname to connect to via ssh + @port the port to connect to via ssh + @forward_agent: wheter to forward ssh agent + @command_prefix: string to prefix each line of the command output with, defaults to host + @host_key_check: wether to check ssh host keys + @verbose_ssh: Enables verbose logging on ssh connections + @meta: meta attributes associated with the host. Those can be accessed in custom functions passed to `run_function` + """ + self.host = host + self.user = user + self.port = port + self.key = key + if command_prefix: + self.command_prefix = command_prefix + else: + self.command_prefix = host + self.forward_agent = forward_agent + self.host_key_check = host_key_check + self.meta = meta + self.verbose_ssh = verbose_ssh + + def _prefix_output( + self, + displayed_cmd: str, + print_std_fd: Optional[IO[str]], + print_err_fd: Optional[IO[str]], + stdout: Optional[IO[str]], + stderr: Optional[IO[str]], + timeout: float = math.inf, + ) -> Tuple[str, str]: + rlist = [] + if print_std_fd is not None: + rlist.append(print_std_fd) + if print_err_fd is not None: + rlist.append(print_err_fd) + if stdout is not None: + rlist.append(stdout) + + if stderr is not None: + rlist.append(stderr) + + print_std_buf = "" + print_err_buf = "" + stdout_buf = "" + stderr_buf = "" + + start = time.time() + last_output = time.time() + while len(rlist) != 0: + r, _, _ = select.select(rlist, [], [], min(timeout, NO_OUTPUT_TIMEOUT)) + + def print_from( + print_fd: IO[str], print_buf: str, is_err: bool = False + ) -> Tuple[float, str]: + read = os.read(print_fd.fileno(), 4096) + if len(read) == 0: + rlist.remove(print_fd) + print_buf += read.decode("utf-8") + if (read == b"" and len(print_buf) != 0) or "\n" in print_buf: + # print and empty the print_buf, if the stream is draining, + # but there is still something in the buffer or on newline. + lines = print_buf.rstrip("\n").split("\n") + for line in lines: + if not is_err: + cmdlog.info( + line, extra=dict(command_prefix=self.command_prefix) + ) + pass + else: + cmdlog.error( + line, extra=dict(command_prefix=self.command_prefix) + ) + print_buf = "" + last_output = time.time() + return (last_output, print_buf) + + if print_std_fd in r and print_std_fd is not None: + (last_output, print_std_buf) = print_from( + print_std_fd, print_std_buf, is_err=False + ) + if print_err_fd in r and print_err_fd is not None: + (last_output, print_err_buf) = print_from( + print_err_fd, print_err_buf, is_err=True + ) + + now = time.time() + elapsed = now - start + if now - last_output > NO_OUTPUT_TIMEOUT: + elapsed_msg = time.strftime("%H:%M:%S", time.gmtime(elapsed)) + cmdlog.warn( + f"still waiting for '{displayed_cmd}' to finish... ({elapsed_msg} elapsed)", + extra=dict(command_prefix=self.command_prefix), + ) + + def handle_fd(fd: Optional[IO[Any]]) -> str: + if fd and fd in r: + read = os.read(fd.fileno(), 4096) + if len(read) == 0: + rlist.remove(fd) + else: + return read.decode("utf-8") + return "" + + stdout_buf += handle_fd(stdout) + stderr_buf += handle_fd(stderr) + + if now - last_output >= timeout: + break + return stdout_buf, stderr_buf + + def _run( + self, + cmd: List[str], + displayed_cmd: str, + shell: bool, + stdout: FILE = None, + stderr: FILE = None, + extra_env: Dict[str, str] = {}, + cwd: Union[None, str, Path] = None, + check: bool = True, + timeout: float = math.inf, + ) -> subprocess.CompletedProcess[str]: + with ExitStack() as stack: + read_std_fd, write_std_fd = (None, None) + read_err_fd, write_err_fd = (None, None) + + if stdout is None or stderr is None: + read_std_fd, write_std_fd = stack.enter_context(_pipe()) + read_err_fd, write_err_fd = stack.enter_context(_pipe()) + + if stdout is None: + stdout_read = None + stdout_write = write_std_fd + elif stdout == subprocess.PIPE: + stdout_read, stdout_write = stack.enter_context(_pipe()) + else: + raise Exception(f"unsupported value for stdout parameter: {stdout}") + + if stderr is None: + stderr_read = None + stderr_write = write_err_fd + elif stderr == subprocess.PIPE: + stderr_read, stderr_write = stack.enter_context(_pipe()) + else: + raise Exception(f"unsupported value for stderr parameter: {stderr}") + + env = os.environ.copy() + env.update(extra_env) + + with subprocess.Popen( + cmd, + text=True, + shell=shell, + stdout=stdout_write, + stderr=stderr_write, + env=env, + cwd=cwd, + ) as p: + if write_std_fd is not None: + write_std_fd.close() + if write_err_fd is not None: + write_err_fd.close() + if stdout == subprocess.PIPE: + assert stdout_write is not None + stdout_write.close() + if stderr == subprocess.PIPE: + assert stderr_write is not None + stderr_write.close() + + start = time.time() + stdout_data, stderr_data = self._prefix_output( + displayed_cmd, + read_std_fd, + read_err_fd, + stdout_read, + stderr_read, + timeout, + ) + try: + ret = p.wait(timeout=max(0, timeout - (time.time() - start))) + except subprocess.TimeoutExpired: + p.kill() + raise + if ret != 0: + if check: + raise subprocess.CalledProcessError( + ret, cmd=cmd, output=stdout_data, stderr=stderr_data + ) + else: + cmdlog.warning( + f"[Command failed: {ret}] {displayed_cmd}", + extra=dict(command_prefix=self.command_prefix), + ) + return subprocess.CompletedProcess( + cmd, ret, stdout=stdout_data, stderr=stderr_data + ) + raise RuntimeError("unreachable") + + def run_local( + self, + cmd: Union[str, List[str]], + stdout: FILE = None, + stderr: FILE = None, + extra_env: Dict[str, str] = {}, + cwd: Union[None, str, Path] = None, + check: bool = True, + timeout: float = math.inf, + ) -> subprocess.CompletedProcess[str]: + """ + Command to run locally for the host + + @cmd the commmand to run + @stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE + @stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE + @extra_env environment variables to override whe running the command + @cwd current working directory to run the process in + @timeout: Timeout in seconds for the command to complete + + @return subprocess.CompletedProcess result of the command + """ + shell = False + if isinstance(cmd, str): + cmd = [cmd] + shell = True + displayed_cmd = " ".join(cmd) + cmdlog.info( + f"$ {displayed_cmd}", extra=dict(command_prefix=self.command_prefix) + ) + return self._run( + cmd, + displayed_cmd, + shell=shell, + stdout=stdout, + stderr=stderr, + extra_env=extra_env, + cwd=cwd, + check=check, + timeout=timeout, + ) + + def run( + self, + cmd: Union[str, List[str]], + stdout: FILE = None, + stderr: FILE = None, + become_root: bool = False, + extra_env: Dict[str, str] = {}, + cwd: Union[None, str, Path] = None, + check: bool = True, + verbose_ssh: bool = False, + timeout: float = math.inf, + ) -> subprocess.CompletedProcess[str]: + """ + Command to run on the host via ssh + + @cmd the commmand to run + @stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE + @stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE + @become_root if the ssh_user is not root than sudo is prepended + @extra_env environment variables to override whe running the command + @cwd current working directory to run the process in + @verbose_ssh: Enables verbose logging on ssh connections + @timeout: Timeout in seconds for the command to complete + + @return subprocess.CompletedProcess result of the ssh command + """ + sudo = "" + if become_root and self.user != "root": + sudo = "sudo -- " + vars = [] + for k, v in extra_env.items(): + vars.append(f"{shlex.quote(k)}={shlex.quote(v)}") + + displayed_cmd = "" + export_cmd = "" + if vars: + export_cmd = f"export {' '.join(vars)}; " + displayed_cmd += export_cmd + if isinstance(cmd, list): + displayed_cmd += " ".join(cmd) + else: + displayed_cmd += cmd + cmdlog.info( + f"$ {displayed_cmd}", extra=dict(command_prefix=self.command_prefix) + ) + + if self.user is not None: + ssh_target = f"{self.user}@{self.host}" + else: + ssh_target = self.host + + ssh_opts = ["-A"] if self.forward_agent else [] + if self.port: + ssh_opts.extend(["-p", str(self.port)]) + if self.key: + ssh_opts.extend(["-i", self.key]) + + if self.host_key_check != HostKeyCheck.STRICT: + ssh_opts.extend(["-o", "StrictHostKeyChecking=no"]) + if self.host_key_check == HostKeyCheck.NONE: + ssh_opts.extend(["-o", "UserKnownHostsFile=/dev/null"]) + if verbose_ssh or self.verbose_ssh: + ssh_opts.extend(["-v"]) + + bash_cmd = export_cmd + bash_args = [] + if isinstance(cmd, list): + bash_cmd += 'exec "$@"' + bash_args += cmd + else: + bash_cmd += cmd + # FIXME we assume bash to be present here? Should be documented... + ssh_cmd = ( + ["ssh", ssh_target] + + ssh_opts + + [ + "--", + f"{sudo}bash -c {quote(bash_cmd)} -- {' '.join(map(quote, bash_args))}", + ] + ) + return self._run( + ssh_cmd, + displayed_cmd, + shell=False, + stdout=stdout, + stderr=stderr, + cwd=cwd, + check=check, + timeout=timeout, + ) + + +T = TypeVar("T") + + +class HostResult(Generic[T]): + def __init__(self, host: DeployHost, result: Union[T, Exception]) -> None: + self.host = host + self._result = result + + @property + def error(self) -> Optional[Exception]: + """ + Returns an error if the command failed + """ + if isinstance(self._result, Exception): + return self._result + return None + + @property + def result(self) -> T: + """ + Unwrap the result + """ + if isinstance(self._result, Exception): + raise self._result + return self._result + + +DeployResults = List[HostResult[subprocess.CompletedProcess[str]]] + + +def _worker( + func: Callable[[DeployHost], T], + host: DeployHost, + results: List[HostResult[T]], + idx: int, +) -> None: + try: + results[idx] = HostResult(host, func(host)) + except Exception as e: + kitlog.exception(e) + results[idx] = HostResult(host, e) + + +class DeployGroup: + def __init__(self, hosts: List[DeployHost]) -> None: + self.hosts = hosts + + def _run_local( + self, + cmd: Union[str, List[str]], + host: DeployHost, + results: DeployResults, + stdout: FILE = None, + stderr: FILE = None, + extra_env: Dict[str, str] = {}, + cwd: Union[None, str, Path] = None, + check: bool = True, + verbose_ssh: bool = False, + timeout: float = math.inf, + ) -> None: + try: + proc = host.run_local( + cmd, + stdout=stdout, + stderr=stderr, + extra_env=extra_env, + cwd=cwd, + check=check, + timeout=timeout, + ) + results.append(HostResult(host, proc)) + except Exception as e: + kitlog.exception(e) + results.append(HostResult(host, e)) + + def _run_remote( + self, + cmd: Union[str, List[str]], + host: DeployHost, + results: DeployResults, + stdout: FILE = None, + stderr: FILE = None, + extra_env: Dict[str, str] = {}, + cwd: Union[None, str, Path] = None, + check: bool = True, + verbose_ssh: bool = False, + timeout: float = math.inf, + ) -> None: + try: + proc = host.run( + cmd, + stdout=stdout, + stderr=stderr, + extra_env=extra_env, + cwd=cwd, + check=check, + verbose_ssh=verbose_ssh, + timeout=timeout, + ) + results.append(HostResult(host, proc)) + except Exception as e: + kitlog.exception(e) + results.append(HostResult(host, e)) + + def _reraise_errors(self, results: List[HostResult[Any]]) -> None: + errors = 0 + for result in results: + e = result.error + if e: + cmdlog.error( + f"failed with: {e}", + extra=dict(command_prefix=result.host.command_prefix), + ) + errors += 1 + if errors > 0: + raise Exception( + f"{errors} hosts failed with an error. Check the logs above" + ) + + def _run( + self, + cmd: Union[str, List[str]], + local: bool = False, + stdout: FILE = None, + stderr: FILE = None, + extra_env: Dict[str, str] = {}, + cwd: Union[None, str, Path] = None, + check: bool = True, + verbose_ssh: bool = False, + timeout: float = math.inf, + ) -> DeployResults: + results: DeployResults = [] + threads = [] + for host in self.hosts: + fn = self._run_local if local else self._run_remote + thread = Thread( + target=fn, + kwargs=dict( + results=results, + cmd=cmd, + host=host, + stdout=stdout, + stderr=stderr, + extra_env=extra_env, + cwd=cwd, + check=check, + verbose_ssh=verbose_ssh, + timeout=timeout, + ), + ) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + if check: + self._reraise_errors(results) + + return results + + def run( + self, + cmd: Union[str, List[str]], + stdout: FILE = None, + stderr: FILE = None, + extra_env: Dict[str, str] = {}, + cwd: Union[None, str, Path] = None, + check: bool = True, + verbose_ssh: bool = False, + timeout: float = math.inf, + ) -> DeployResults: + """ + Command to run on the remote host via ssh + @stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE + @stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE + @cwd current working directory to run the process in + @verbose_ssh: Enables verbose logging on ssh connections + @timeout: Timeout in seconds for the command to complete + + @return a lists of tuples containing DeployNode and the result of the command for this DeployNode + """ + return self._run( + cmd, + stdout=stdout, + stderr=stderr, + extra_env=extra_env, + cwd=cwd, + check=check, + verbose_ssh=verbose_ssh, + timeout=timeout, + ) + + def run_local( + self, + cmd: Union[str, List[str]], + stdout: FILE = None, + stderr: FILE = None, + extra_env: Dict[str, str] = {}, + cwd: Union[None, str, Path] = None, + check: bool = True, + timeout: float = math.inf, + ) -> DeployResults: + """ + Command to run locally for each host in the group in parallel + @cmd the commmand to run + @stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE + @stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE + @cwd current working directory to run the process in + @extra_env environment variables to override whe running the command + @timeout: Timeout in seconds for the command to complete + + @return a lists of tuples containing DeployNode and the result of the command for this DeployNode + """ + return self._run( + cmd, + local=True, + stdout=stdout, + stderr=stderr, + extra_env=extra_env, + cwd=cwd, + check=check, + timeout=timeout, + ) + + def run_function( + self, func: Callable[[DeployHost], T], check: bool = True + ) -> List[HostResult[T]]: + """ + Function to run for each host in the group in parallel + + @func the function to call + """ + threads = [] + results: List[HostResult[T]] = [ + HostResult(h, Exception(f"No result set for thread {i}")) + for (i, h) in enumerate(self.hosts) + ] + for i, host in enumerate(self.hosts): + thread = Thread( + target=_worker, + args=(func, host, results, i), + ) + threads.append(thread) + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + if check: + self._reraise_errors(results) + return results + + def filter(self, pred: Callable[[DeployHost], bool]) -> "DeployGroup": + """Return a new DeployGroup with the results filtered by the predicate""" + return DeployGroup(list(filter(pred, self.hosts))) + + +@overload +def run( + cmd: Union[List[str], str], + text: Literal[True] = ..., + stdout: FILE = ..., + stderr: FILE = ..., + extra_env: Dict[str, str] = ..., + cwd: Union[None, str, Path] = ..., + check: bool = ..., +) -> subprocess.CompletedProcess[str]: + ... + + +@overload +def run( + cmd: Union[List[str], str], + text: Literal[False], + stdout: FILE = ..., + stderr: FILE = ..., + extra_env: Dict[str, str] = ..., + cwd: Union[None, str, Path] = ..., + check: bool = ..., +) -> subprocess.CompletedProcess[bytes]: + ... + + +def run( + cmd: Union[List[str], str], + text: bool = True, + stdout: FILE = None, + stderr: FILE = None, + extra_env: Dict[str, str] = {}, + cwd: Union[None, str, Path] = None, + check: bool = True, +) -> subprocess.CompletedProcess[Any]: + """ + Run command locally + + @cmd if this parameter is a string the command is interpreted as a shell command, + otherwise if it is a list, than the first list element is the command + and the remaining list elements are passed as arguments to the + command. + @text when true, file objects for stdout and stderr are opened in text mode. + @stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE + @stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE + @extra_env environment variables to override whe running the command + @cwd current working directory to run the process in + @check If check is true, and the process exits with a non-zero exit code, a + CalledProcessError exception will be raised. Attributes of that exception + hold the arguments, the exit code, and stdout and stderr if they were + captured. + """ + if isinstance(cmd, list): + info("$ " + " ".join(cmd)) + else: + info(f"$ {cmd}") + env = os.environ.copy() + env.update(extra_env) + + return subprocess.run( + cmd, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd, + check=check, + shell=not isinstance(cmd, list), + text=text, + )