forked from clan/clan-core
modernisation for python 3.11
This commit is contained in:
parent
26e3e3872c
commit
f1b223d0a1
@ -1,9 +1,9 @@
|
||||
# This file contains type hints that can be prepended to Nix test scripts so they can be type
|
||||
# checked.
|
||||
|
||||
from typing import Callable, List
|
||||
from collections.abc import Callable
|
||||
|
||||
from test_driver import Machine
|
||||
|
||||
start_all: Callable[[], None]
|
||||
machines: List[Machine]
|
||||
machines: list[Machine]
|
||||
|
@ -3,9 +3,10 @@ import os
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Any, Callable, Dict, Optional, Tuple
|
||||
from typing import Any
|
||||
|
||||
|
||||
def prepare_machine_root(machinename: str, root: Path) -> None:
|
||||
@ -88,7 +89,7 @@ class Machine:
|
||||
except ValueError:
|
||||
raise RuntimeError(f"Failed to parse child process id {childs[0]}")
|
||||
|
||||
def get_unit_info(self, unit: str) -> Dict[str, str]:
|
||||
def get_unit_info(self, unit: str) -> dict[str, str]:
|
||||
proc = self.systemctl(f'--no-pager show "{unit}"')
|
||||
if proc.returncode != 0:
|
||||
raise Exception(
|
||||
@ -98,7 +99,7 @@ class Machine:
|
||||
|
||||
line_pattern = re.compile(r"^([^=]+)=(.*)$")
|
||||
|
||||
def tuple_from_line(line: str) -> Tuple[str, str]:
|
||||
def tuple_from_line(line: str) -> tuple[str, str]:
|
||||
match = line_pattern.match(line)
|
||||
assert match is not None
|
||||
return match[1], match[2]
|
||||
@ -114,7 +115,7 @@ class Machine:
|
||||
command: str,
|
||||
check_return: bool = True,
|
||||
check_output: bool = True,
|
||||
timeout: Optional[int] = 900,
|
||||
timeout: int | None = 900,
|
||||
) -> subprocess.CompletedProcess:
|
||||
"""
|
||||
Execute a shell command, returning a list `(status, stdout)`.
|
||||
|
@ -1,9 +1,10 @@
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
from collections.abc import Sequence
|
||||
from pathlib import Path
|
||||
from types import ModuleType
|
||||
from typing import Any, Optional, Sequence
|
||||
from typing import Any
|
||||
|
||||
from . import config, flakes, machines, secrets, vms, webui
|
||||
from .custom_logger import setup_logging
|
||||
@ -12,7 +13,7 @@ from .ssh import cli as ssh_cli
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
argcomplete: Optional[ModuleType] = None
|
||||
argcomplete: ModuleType | None = None
|
||||
try:
|
||||
import argcomplete # type: ignore[no-redef]
|
||||
except ImportError:
|
||||
@ -28,7 +29,7 @@ class AppendOptionAction(argparse.Action):
|
||||
parser: argparse.ArgumentParser,
|
||||
namespace: argparse.Namespace,
|
||||
values: str | Sequence[str] | None,
|
||||
option_string: Optional[str] = None,
|
||||
option_string: str | None = None,
|
||||
) -> None:
|
||||
lst = getattr(namespace, self.dest)
|
||||
lst.append("--option")
|
||||
@ -37,7 +38,7 @@ class AppendOptionAction(argparse.Action):
|
||||
lst.append(values[1])
|
||||
|
||||
|
||||
def create_parser(prog: Optional[str] = None) -> argparse.ArgumentParser:
|
||||
def create_parser(prog: str | None = None) -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(prog=prog, description="cLAN tool")
|
||||
|
||||
parser.add_argument(
|
||||
|
@ -1,8 +1,9 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import shlex
|
||||
from collections.abc import Callable, Coroutine
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Coroutine, Dict, NamedTuple, Optional
|
||||
from typing import Any, NamedTuple
|
||||
|
||||
from .custom_logger import get_caller
|
||||
from .errors import ClanError
|
||||
@ -13,10 +14,10 @@ log = logging.getLogger(__name__)
|
||||
class CmdOut(NamedTuple):
|
||||
stdout: str
|
||||
stderr: str
|
||||
cwd: Optional[Path] = None
|
||||
cwd: Path | None = None
|
||||
|
||||
|
||||
async def run(cmd: list[str], cwd: Optional[Path] = None) -> CmdOut:
|
||||
async def run(cmd: list[str], cwd: Path | None = None) -> CmdOut:
|
||||
cwd_res = None
|
||||
if cwd is not None:
|
||||
if not cwd.exists():
|
||||
@ -52,7 +53,7 @@ stdout:
|
||||
|
||||
|
||||
def runforcli(
|
||||
func: Callable[..., Coroutine[Any, Any, Dict[str, CmdOut]]], *args: Any
|
||||
func: Callable[..., Coroutine[Any, Any, dict[str, CmdOut]]], *args: Any
|
||||
) -> None:
|
||||
try:
|
||||
res = asyncio.run(func(*args))
|
||||
|
@ -1,14 +1,13 @@
|
||||
import json
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from clan_cli.nix import nix_eval
|
||||
|
||||
|
||||
def get_clan_module_names(
|
||||
flake_dir: Path,
|
||||
) -> tuple[list[str], Optional[str]]:
|
||||
) -> tuple[list[str], str | None]:
|
||||
"""
|
||||
Get the list of clan modules from the clan-core flake input
|
||||
"""
|
||||
|
@ -8,7 +8,7 @@ import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, Tuple, get_origin
|
||||
from typing import Any, get_origin
|
||||
|
||||
from clan_cli.dirs import machine_settings_file
|
||||
from clan_cli.errors import ClanError
|
||||
@ -34,7 +34,7 @@ def map_type(type: str) -> Any:
|
||||
return str
|
||||
elif type.startswith("null or "):
|
||||
subtype = type.removeprefix("null or ")
|
||||
return Optional[map_type(subtype)]
|
||||
return map_type(subtype) | None
|
||||
elif type.startswith("attribute set of"):
|
||||
subtype = type.removeprefix("attribute set of ")
|
||||
return dict[str, map_type(subtype)] # type: ignore
|
||||
@ -196,8 +196,8 @@ def get_or_set_option(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
def find_option(
|
||||
option: str, value: Any, options: dict, option_description: Optional[str] = None
|
||||
) -> Tuple[str, Any]:
|
||||
option: str, value: Any, options: dict, option_description: str | None = None
|
||||
) -> tuple[str, Any]:
|
||||
"""
|
||||
The option path specified by the user doesn't have to match exactly to an
|
||||
entry in the options.json file. Examples
|
||||
@ -307,7 +307,7 @@ def set_option(
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
def register_parser(
|
||||
parser: Optional[argparse.ArgumentParser],
|
||||
parser: argparse.ArgumentParser | None,
|
||||
) -> None:
|
||||
if parser is None:
|
||||
parser = argparse.ArgumentParser(
|
||||
@ -361,7 +361,7 @@ def register_parser(
|
||||
)
|
||||
|
||||
|
||||
def main(argv: Optional[list[str]] = None) -> None:
|
||||
def main(argv: list[str] | None = None) -> None:
|
||||
if argv is None:
|
||||
argv = sys.argv
|
||||
parser = argparse.ArgumentParser()
|
||||
|
@ -4,7 +4,6 @@ import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Optional
|
||||
|
||||
from clan_cli.dirs import machine_settings_file, nixpkgs_source, specific_machine_dir
|
||||
from clan_cli.errors import ClanError, ClanHttpError
|
||||
@ -15,8 +14,8 @@ from clan_cli.nix import nix_eval
|
||||
def verify_machine_config(
|
||||
flake_dir: Path,
|
||||
machine_name: str,
|
||||
config: Optional[dict] = None,
|
||||
) -> Optional[str]:
|
||||
config: dict | None = None,
|
||||
) -> str | None:
|
||||
"""
|
||||
Verify that the machine evaluates successfully
|
||||
Returns a tuple of (success, error_message)
|
||||
|
@ -1,7 +1,7 @@
|
||||
import json
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, Type, Union
|
||||
from typing import Any
|
||||
|
||||
from ..errors import ClanError
|
||||
from ..nix import nix_eval
|
||||
@ -19,7 +19,7 @@ type_map: dict[str, type] = {
|
||||
|
||||
|
||||
def schema_from_module_file(
|
||||
file: Union[str, Path] = f"{script_dir}/jsonschema/example-schema.json",
|
||||
file: str | Path = f"{script_dir}/jsonschema/example-schema.json",
|
||||
) -> dict[str, Any]:
|
||||
absolute_path = Path(file).absolute()
|
||||
# define a nix expression that loads the given module file using lib.evalModules
|
||||
@ -36,7 +36,7 @@ def schema_from_module_file(
|
||||
return json.loads(proc.stdout)
|
||||
|
||||
|
||||
def subtype_from_schema(schema: dict[str, Any]) -> Type:
|
||||
def subtype_from_schema(schema: dict[str, Any]) -> type:
|
||||
if schema["type"] == "object":
|
||||
if "additionalProperties" in schema:
|
||||
sub_type = subtype_from_schema(schema["additionalProperties"])
|
||||
@ -57,8 +57,8 @@ def subtype_from_schema(schema: dict[str, Any]) -> Type:
|
||||
def type_from_schema_path(
|
||||
schema: dict[str, Any],
|
||||
path: list[str],
|
||||
full_path: Optional[list[str]] = None,
|
||||
) -> Type:
|
||||
full_path: list[str] | None = None,
|
||||
) -> type:
|
||||
if full_path is None:
|
||||
full_path = path
|
||||
if len(path) == 0:
|
||||
@ -76,8 +76,8 @@ def type_from_schema_path(
|
||||
raise ClanError(f"Unknown type for path {path}")
|
||||
|
||||
|
||||
def options_types_from_schema(schema: dict[str, Any]) -> dict[str, Type]:
|
||||
result: dict[str, Type] = {}
|
||||
def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
|
||||
result: dict[str, type] = {}
|
||||
for name, value in schema.get("properties", {}).items():
|
||||
assert isinstance(value, dict)
|
||||
type_ = value["type"]
|
||||
|
@ -4,7 +4,6 @@ import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Optional
|
||||
|
||||
from clan_cli.dirs import nixpkgs_source
|
||||
from clan_cli.errors import ClanError, ClanHttpError
|
||||
@ -14,7 +13,7 @@ from clan_cli.nix import nix_eval
|
||||
def machine_schema(
|
||||
flake_dir: Path,
|
||||
config: dict,
|
||||
clan_imports: Optional[list[str]] = None,
|
||||
clan_imports: list[str] | None = None,
|
||||
) -> dict:
|
||||
# use nix eval to lib.evalModules .#nixosConfigurations.<machine_name>.options.clan
|
||||
with NamedTemporaryFile(mode="w", dir=flake_dir) as clan_machine_settings_file:
|
||||
|
@ -1,7 +1,8 @@
|
||||
import inspect
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable
|
||||
from typing import Any
|
||||
|
||||
grey = "\x1b[38;20m"
|
||||
yellow = "\x1b[33;20m"
|
||||
|
@ -1,5 +1,6 @@
|
||||
from collections.abc import Callable
|
||||
from types import ModuleType
|
||||
from typing import Any, Callable
|
||||
from typing import Any
|
||||
|
||||
|
||||
class FakeDeal:
|
||||
|
@ -6,15 +6,16 @@ import stat
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
from typing import Any
|
||||
|
||||
import ipdb
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def command_exec(cmd: List[str], work_dir: Path, env: Dict[str, str]) -> None:
|
||||
def command_exec(cmd: list[str], work_dir: Path, env: dict[str, str]) -> None:
|
||||
subprocess.run(cmd, check=True, env=env, cwd=work_dir.resolve())
|
||||
|
||||
|
||||
@ -32,8 +33,8 @@ def block_for_input() -> None:
|
||||
|
||||
def breakpoint_container(
|
||||
work_dir: Path,
|
||||
env: Optional[Dict[str, str]] = None,
|
||||
cmd: Optional[List[str]] = None,
|
||||
env: dict[str, str] | None = None,
|
||||
cmd: list[str] | None = None,
|
||||
) -> None:
|
||||
if env is None:
|
||||
env = os.environ.copy()
|
||||
@ -52,8 +53,8 @@ def breakpoint_container(
|
||||
|
||||
def breakpoint_shell(
|
||||
work_dir: Path = Path(os.getcwd()),
|
||||
env: Optional[Dict[str, str]] = None,
|
||||
cmd: Optional[List[str]] = None,
|
||||
env: dict[str, str] | None = None,
|
||||
cmd: list[str] | None = None,
|
||||
) -> None:
|
||||
if env is None:
|
||||
env = os.environ.copy()
|
||||
@ -91,7 +92,7 @@ def spawn_process(func: Callable, **kwargs: Any) -> mp.Process:
|
||||
return proc
|
||||
|
||||
|
||||
def dump_env(env: Dict[str, str], loc: Path) -> None:
|
||||
def dump_env(env: dict[str, str], loc: Path) -> None:
|
||||
cenv = env.copy()
|
||||
log.info("Dumping environment variables to %s", loc)
|
||||
with open(loc, "w") as f:
|
||||
|
@ -2,20 +2,19 @@ import logging
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_clan_flake_toplevel() -> Optional[Path]:
|
||||
def get_clan_flake_toplevel() -> Path | None:
|
||||
return find_toplevel([".clan-flake", ".git", ".hg", ".svn", "flake.nix"])
|
||||
|
||||
|
||||
def find_git_repo_root() -> Optional[Path]:
|
||||
def find_git_repo_root() -> Path | None:
|
||||
return find_toplevel([".git"])
|
||||
|
||||
|
||||
def find_toplevel(top_level_files: list[str]) -> Optional[Path]:
|
||||
def find_toplevel(top_level_files: list[str]) -> Path | None:
|
||||
"""Returns the path to the toplevel of the clan flake"""
|
||||
for project_file in top_level_files:
|
||||
initial_path = Path(os.getcwd())
|
||||
|
@ -1,14 +1,13 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
from clan_cli.dirs import user_history_file
|
||||
|
||||
from ..async_cmd import CmdOut, runforcli
|
||||
|
||||
|
||||
async def add_flake(path: Path) -> Dict[str, CmdOut]:
|
||||
async def add_flake(path: Path) -> dict[str, CmdOut]:
|
||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||
# append line to history file
|
||||
# TODO: Make this atomic
|
||||
|
@ -1,7 +1,6 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
from ..async_cmd import CmdOut, run, runforcli
|
||||
from ..errors import ClanError
|
||||
@ -10,7 +9,7 @@ from ..nix import nix_command, nix_shell
|
||||
DEFAULT_URL: str = "git+https://git.clan.lol/clan/clan-core?new-clan"
|
||||
|
||||
|
||||
async def create_flake(directory: Path, url: str) -> Dict[str, CmdOut]:
|
||||
async def create_flake(directory: Path, url: str) -> dict[str, CmdOut]:
|
||||
if not directory.exists():
|
||||
directory.mkdir()
|
||||
else:
|
||||
|
@ -1,7 +1,6 @@
|
||||
import shlex
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# from clan_cli.dirs import find_git_repo_root
|
||||
from clan_cli.errors import ClanError
|
||||
@ -12,7 +11,7 @@ from clan_cli.nix import nix_shell
|
||||
def commit_file(
|
||||
file_path: Path,
|
||||
repo_dir: Path,
|
||||
commit_message: Optional[str] = None,
|
||||
commit_message: str | None = None,
|
||||
) -> None:
|
||||
# check that the file is in the git repository and exists
|
||||
if not Path(file_path).resolve().is_relative_to(repo_dir.resolve()):
|
||||
|
@ -3,7 +3,6 @@ import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from ..nix import nix_build, nix_config, nix_eval
|
||||
from ..ssh import Host, parse_deployment_address
|
||||
@ -31,7 +30,7 @@ class Machine:
|
||||
self,
|
||||
name: str,
|
||||
flake_dir: Path,
|
||||
machine_data: Optional[dict] = None,
|
||||
machine_data: dict | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Creates a Machine
|
||||
|
@ -1,7 +1,7 @@
|
||||
import os
|
||||
import shutil
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
from ..errors import ClanError
|
||||
|
||||
|
@ -2,10 +2,11 @@ import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import IO, Iterator
|
||||
from typing import IO
|
||||
|
||||
from ..dirs import user_config_dir
|
||||
from ..errors import ClanError
|
||||
|
@ -1,8 +1,8 @@
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
from ..errors import ClanError
|
||||
from .sops import get_public_key
|
||||
|
@ -10,6 +10,7 @@ import subprocess
|
||||
import sys
|
||||
import time
|
||||
import urllib.parse
|
||||
from collections.abc import Callable, Iterator
|
||||
from contextlib import ExitStack, contextmanager
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
@ -18,16 +19,9 @@ from threading import Thread
|
||||
from typing import (
|
||||
IO,
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Generic,
|
||||
Iterator,
|
||||
List,
|
||||
Literal,
|
||||
Optional,
|
||||
Tuple,
|
||||
TypeVar,
|
||||
Union,
|
||||
overload,
|
||||
)
|
||||
|
||||
@ -48,7 +42,7 @@ class CommandFormatter(logging.Formatter):
|
||||
super().__init__(
|
||||
"%(prefix_color)s[%(command_prefix)s]%(color_reset)s %(color)s%(message)s%(color_reset)s"
|
||||
)
|
||||
self.hostnames: List[str] = []
|
||||
self.hostnames: list[str] = []
|
||||
self.hostname_color_offset = 1 # first host shouldn't get agressive red
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
@ -80,7 +74,7 @@ class CommandFormatter(logging.Formatter):
|
||||
return 31 + (index + self.hostname_color_offset) % 7
|
||||
|
||||
|
||||
def setup_loggers() -> Tuple[logging.Logger, logging.Logger]:
|
||||
def setup_loggers() -> tuple[logging.Logger, logging.Logger]:
|
||||
# If we use the default logger here (logging.error etc) or a logger called
|
||||
# "deploykit", then cmdlog messages are also posted on the default logger.
|
||||
# To avoid this message duplication, we set up a main and command logger
|
||||
@ -115,7 +109,7 @@ error = kitlog.error
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _pipe() -> Iterator[Tuple[IO[str], IO[str]]]:
|
||||
def _pipe() -> Iterator[tuple[IO[str], IO[str]]]:
|
||||
(pipe_r, pipe_w) = os.pipe()
|
||||
read_end = os.fdopen(pipe_r, "r")
|
||||
write_end = os.fdopen(pipe_w, "w")
|
||||
@ -130,7 +124,7 @@ def _pipe() -> Iterator[Tuple[IO[str], IO[str]]]:
|
||||
write_end.close()
|
||||
|
||||
|
||||
FILE = Union[None, int]
|
||||
FILE = None | int
|
||||
|
||||
# Seconds until a message is printed when _run produces no output.
|
||||
NO_OUTPUT_TIMEOUT = 20
|
||||
@ -149,13 +143,13 @@ class Host:
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
user: Optional[str] = None,
|
||||
port: Optional[int] = None,
|
||||
key: Optional[str] = None,
|
||||
user: str | None = None,
|
||||
port: int | None = None,
|
||||
key: str | None = None,
|
||||
forward_agent: bool = False,
|
||||
command_prefix: Optional[str] = None,
|
||||
command_prefix: str | None = None,
|
||||
host_key_check: HostKeyCheck = HostKeyCheck.STRICT,
|
||||
meta: Dict[str, Any] = {},
|
||||
meta: dict[str, Any] = {},
|
||||
verbose_ssh: bool = False,
|
||||
ssh_options: dict[str, str] = {},
|
||||
) -> None:
|
||||
@ -186,12 +180,12 @@ class Host:
|
||||
def _prefix_output(
|
||||
self,
|
||||
displayed_cmd: str,
|
||||
print_std_fd: Optional[IO[str]],
|
||||
print_err_fd: Optional[IO[str]],
|
||||
stdout: Optional[IO[str]],
|
||||
stderr: Optional[IO[str]],
|
||||
print_std_fd: IO[str] | None,
|
||||
print_err_fd: IO[str] | None,
|
||||
stdout: IO[str] | None,
|
||||
stderr: IO[str] | None,
|
||||
timeout: float = math.inf,
|
||||
) -> Tuple[str, str]:
|
||||
) -> tuple[str, str]:
|
||||
rlist = []
|
||||
if print_std_fd is not None:
|
||||
rlist.append(print_std_fd)
|
||||
@ -215,7 +209,7 @@ class Host:
|
||||
|
||||
def print_from(
|
||||
print_fd: IO[str], print_buf: str, is_err: bool = False
|
||||
) -> Tuple[float, str]:
|
||||
) -> tuple[float, str]:
|
||||
read = os.read(print_fd.fileno(), 4096)
|
||||
if len(read) == 0:
|
||||
rlist.remove(print_fd)
|
||||
@ -256,7 +250,7 @@ class Host:
|
||||
extra=dict(command_prefix=self.command_prefix),
|
||||
)
|
||||
|
||||
def handle_fd(fd: Optional[IO[Any]]) -> str:
|
||||
def handle_fd(fd: IO[Any] | None) -> str:
|
||||
if fd and fd in r:
|
||||
read = os.read(fd.fileno(), 4096)
|
||||
if len(read) == 0:
|
||||
@ -274,13 +268,13 @@ class Host:
|
||||
|
||||
def _run(
|
||||
self,
|
||||
cmd: List[str],
|
||||
cmd: list[str],
|
||||
displayed_cmd: str,
|
||||
shell: bool,
|
||||
stdout: FILE = None,
|
||||
stderr: FILE = None,
|
||||
extra_env: Dict[str, str] = {},
|
||||
cwd: Union[None, str, Path] = None,
|
||||
extra_env: dict[str, str] = {},
|
||||
cwd: None | str | Path = None,
|
||||
check: bool = True,
|
||||
timeout: float = math.inf,
|
||||
) -> subprocess.CompletedProcess[str]:
|
||||
@ -362,11 +356,11 @@ class Host:
|
||||
|
||||
def run_local(
|
||||
self,
|
||||
cmd: Union[str, List[str]],
|
||||
cmd: str | list[str],
|
||||
stdout: FILE = None,
|
||||
stderr: FILE = None,
|
||||
extra_env: Dict[str, str] = {},
|
||||
cwd: Union[None, str, Path] = None,
|
||||
extra_env: dict[str, str] = {},
|
||||
cwd: None | str | Path = None,
|
||||
check: bool = True,
|
||||
timeout: float = math.inf,
|
||||
) -> subprocess.CompletedProcess[str]:
|
||||
@ -404,12 +398,12 @@ class Host:
|
||||
|
||||
def run(
|
||||
self,
|
||||
cmd: Union[str, List[str]],
|
||||
cmd: str | list[str],
|
||||
stdout: FILE = None,
|
||||
stderr: FILE = None,
|
||||
become_root: bool = False,
|
||||
extra_env: Dict[str, str] = {},
|
||||
cwd: Union[None, str, Path] = None,
|
||||
extra_env: dict[str, str] = {},
|
||||
cwd: None | str | Path = None,
|
||||
check: bool = True,
|
||||
verbose_ssh: bool = False,
|
||||
timeout: float = math.inf,
|
||||
@ -475,7 +469,7 @@ class Host:
|
||||
def ssh_cmd(
|
||||
self,
|
||||
verbose_ssh: bool = False,
|
||||
) -> List:
|
||||
) -> list:
|
||||
if self.user is not None:
|
||||
ssh_target = f"{self.user}@{self.host}"
|
||||
else:
|
||||
@ -505,12 +499,12 @@ T = TypeVar("T")
|
||||
|
||||
|
||||
class HostResult(Generic[T]):
|
||||
def __init__(self, host: Host, result: Union[T, Exception]) -> None:
|
||||
def __init__(self, host: Host, result: T | Exception) -> None:
|
||||
self.host = host
|
||||
self._result = result
|
||||
|
||||
@property
|
||||
def error(self) -> Optional[Exception]:
|
||||
def error(self) -> Exception | None:
|
||||
"""
|
||||
Returns an error if the command failed
|
||||
"""
|
||||
@ -528,13 +522,13 @@ class HostResult(Generic[T]):
|
||||
return self._result
|
||||
|
||||
|
||||
Results = List[HostResult[subprocess.CompletedProcess[str]]]
|
||||
Results = list[HostResult[subprocess.CompletedProcess[str]]]
|
||||
|
||||
|
||||
def _worker(
|
||||
func: Callable[[Host], T],
|
||||
host: Host,
|
||||
results: List[HostResult[T]],
|
||||
results: list[HostResult[T]],
|
||||
idx: int,
|
||||
) -> None:
|
||||
try:
|
||||
@ -545,18 +539,18 @@ def _worker(
|
||||
|
||||
|
||||
class HostGroup:
|
||||
def __init__(self, hosts: List[Host]) -> None:
|
||||
def __init__(self, hosts: list[Host]) -> None:
|
||||
self.hosts = hosts
|
||||
|
||||
def _run_local(
|
||||
self,
|
||||
cmd: Union[str, List[str]],
|
||||
cmd: str | list[str],
|
||||
host: Host,
|
||||
results: Results,
|
||||
stdout: FILE = None,
|
||||
stderr: FILE = None,
|
||||
extra_env: Dict[str, str] = {},
|
||||
cwd: Union[None, str, Path] = None,
|
||||
extra_env: dict[str, str] = {},
|
||||
cwd: None | str | Path = None,
|
||||
check: bool = True,
|
||||
verbose_ssh: bool = False,
|
||||
timeout: float = math.inf,
|
||||
@ -578,13 +572,13 @@ class HostGroup:
|
||||
|
||||
def _run_remote(
|
||||
self,
|
||||
cmd: Union[str, List[str]],
|
||||
cmd: str | list[str],
|
||||
host: Host,
|
||||
results: Results,
|
||||
stdout: FILE = None,
|
||||
stderr: FILE = None,
|
||||
extra_env: Dict[str, str] = {},
|
||||
cwd: Union[None, str, Path] = None,
|
||||
extra_env: dict[str, str] = {},
|
||||
cwd: None | str | Path = None,
|
||||
check: bool = True,
|
||||
verbose_ssh: bool = False,
|
||||
timeout: float = math.inf,
|
||||
@ -605,7 +599,7 @@ class HostGroup:
|
||||
kitlog.exception(e)
|
||||
results.append(HostResult(host, e))
|
||||
|
||||
def _reraise_errors(self, results: List[HostResult[Any]]) -> None:
|
||||
def _reraise_errors(self, results: list[HostResult[Any]]) -> None:
|
||||
errors = 0
|
||||
for result in results:
|
||||
e = result.error
|
||||
@ -622,12 +616,12 @@ class HostGroup:
|
||||
|
||||
def _run(
|
||||
self,
|
||||
cmd: Union[str, List[str]],
|
||||
cmd: str | list[str],
|
||||
local: bool = False,
|
||||
stdout: FILE = None,
|
||||
stderr: FILE = None,
|
||||
extra_env: Dict[str, str] = {},
|
||||
cwd: Union[None, str, Path] = None,
|
||||
extra_env: dict[str, str] = {},
|
||||
cwd: None | str | Path = None,
|
||||
check: bool = True,
|
||||
verbose_ssh: bool = False,
|
||||
timeout: float = math.inf,
|
||||
@ -664,11 +658,11 @@ class HostGroup:
|
||||
|
||||
def run(
|
||||
self,
|
||||
cmd: Union[str, List[str]],
|
||||
cmd: str | list[str],
|
||||
stdout: FILE = None,
|
||||
stderr: FILE = None,
|
||||
extra_env: Dict[str, str] = {},
|
||||
cwd: Union[None, str, Path] = None,
|
||||
extra_env: dict[str, str] = {},
|
||||
cwd: None | str | Path = None,
|
||||
check: bool = True,
|
||||
verbose_ssh: bool = False,
|
||||
timeout: float = math.inf,
|
||||
@ -696,11 +690,11 @@ class HostGroup:
|
||||
|
||||
def run_local(
|
||||
self,
|
||||
cmd: Union[str, List[str]],
|
||||
cmd: str | list[str],
|
||||
stdout: FILE = None,
|
||||
stderr: FILE = None,
|
||||
extra_env: Dict[str, str] = {},
|
||||
cwd: Union[None, str, Path] = None,
|
||||
extra_env: dict[str, str] = {},
|
||||
cwd: None | str | Path = None,
|
||||
check: bool = True,
|
||||
timeout: float = math.inf,
|
||||
) -> Results:
|
||||
@ -728,14 +722,14 @@ class HostGroup:
|
||||
|
||||
def run_function(
|
||||
self, func: Callable[[Host], T], check: bool = True
|
||||
) -> List[HostResult[T]]:
|
||||
) -> list[HostResult[T]]:
|
||||
"""
|
||||
Function to run for each host in the group in parallel
|
||||
|
||||
@func the function to call
|
||||
"""
|
||||
threads = []
|
||||
results: List[HostResult[T]] = [
|
||||
results: list[HostResult[T]] = [
|
||||
HostResult(h, Exception(f"No result set for thread {i}"))
|
||||
for (i, h) in enumerate(self.hosts)
|
||||
]
|
||||
@ -764,14 +758,14 @@ def parse_deployment_address(
|
||||
machine_name: str, host: str, meta: dict[str, Any] = {}
|
||||
) -> Host:
|
||||
parts = host.split("@")
|
||||
user: Optional[str] = None
|
||||
user: str | None = None
|
||||
if len(parts) > 1:
|
||||
user = parts[0]
|
||||
hostname = parts[1]
|
||||
else:
|
||||
hostname = parts[0]
|
||||
maybe_options = hostname.split("?")
|
||||
options: Dict[str, str] = {}
|
||||
options: dict[str, str] = {}
|
||||
if len(maybe_options) > 1:
|
||||
hostname = maybe_options[0]
|
||||
for option in maybe_options[1].split("&"):
|
||||
@ -796,12 +790,12 @@ def parse_deployment_address(
|
||||
|
||||
@overload
|
||||
def run(
|
||||
cmd: Union[List[str], str],
|
||||
cmd: list[str] | str,
|
||||
text: Literal[True] = ...,
|
||||
stdout: FILE = ...,
|
||||
stderr: FILE = ...,
|
||||
extra_env: Dict[str, str] = ...,
|
||||
cwd: Union[None, str, Path] = ...,
|
||||
extra_env: dict[str, str] = ...,
|
||||
cwd: None | str | Path = ...,
|
||||
check: bool = ...,
|
||||
) -> subprocess.CompletedProcess[str]:
|
||||
...
|
||||
@ -809,24 +803,24 @@ def run(
|
||||
|
||||
@overload
|
||||
def run(
|
||||
cmd: Union[List[str], str],
|
||||
cmd: list[str] | str,
|
||||
text: Literal[False],
|
||||
stdout: FILE = ...,
|
||||
stderr: FILE = ...,
|
||||
extra_env: Dict[str, str] = ...,
|
||||
cwd: Union[None, str, Path] = ...,
|
||||
extra_env: dict[str, str] = ...,
|
||||
cwd: None | str | Path = ...,
|
||||
check: bool = ...,
|
||||
) -> subprocess.CompletedProcess[bytes]:
|
||||
...
|
||||
|
||||
|
||||
def run(
|
||||
cmd: Union[List[str], str],
|
||||
cmd: list[str] | str,
|
||||
text: bool = True,
|
||||
stdout: FILE = None,
|
||||
stderr: FILE = None,
|
||||
extra_env: Dict[str, str] = {},
|
||||
cwd: Union[None, str, Path] = None,
|
||||
extra_env: dict[str, str] = {},
|
||||
cwd: None | str | Path = None,
|
||||
check: bool = True,
|
||||
) -> subprocess.CompletedProcess[Any]:
|
||||
"""
|
||||
|
@ -1,7 +1,6 @@
|
||||
import argparse
|
||||
import json
|
||||
import subprocess
|
||||
from typing import Optional
|
||||
|
||||
from ..nix import nix_shell
|
||||
|
||||
@ -9,7 +8,7 @@ from ..nix import nix_shell
|
||||
def ssh(
|
||||
host: str,
|
||||
user: str = "root",
|
||||
password: Optional[str] = None,
|
||||
password: str | None = None,
|
||||
ssh_args: list[str] = [],
|
||||
) -> None:
|
||||
packages = ["tor", "openssh"]
|
||||
|
@ -7,9 +7,10 @@ import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
from collections.abc import Iterator
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterator, Optional, Type, TypeVar
|
||||
from typing import Any, TypeVar
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from .custom_logger import ThreadFormatter, get_caller
|
||||
@ -36,8 +37,8 @@ class Command:
|
||||
def run(
|
||||
self,
|
||||
cmd: list[str],
|
||||
env: Optional[dict[str, str]] = None,
|
||||
cwd: Optional[Path] = None,
|
||||
env: dict[str, str] | None = None,
|
||||
cwd: Path | None = None,
|
||||
name: str = "command",
|
||||
) -> None:
|
||||
self.running = True
|
||||
@ -188,7 +189,7 @@ T = TypeVar("T", bound="BaseTask")
|
||||
|
||||
|
||||
@deal.raises(ClanError)
|
||||
def create_task(task_type: Type[T], *args: Any) -> T:
|
||||
def create_task(task_type: type[T], *args: Any) -> T:
|
||||
global POOL
|
||||
|
||||
# check if task_type is a callable
|
||||
|
@ -1,5 +1,6 @@
|
||||
import sys
|
||||
from typing import IO, Any, Callable
|
||||
from collections.abc import Callable
|
||||
from typing import IO, Any
|
||||
|
||||
|
||||
def is_interactive() -> bool:
|
||||
|
@ -6,9 +6,9 @@ import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
from threading import Condition, Thread
|
||||
from typing import Iterator
|
||||
from uuid import UUID
|
||||
|
||||
from ..nix import nix_build, nix_config, nix_eval, nix_shell
|
||||
|
@ -1,8 +1,9 @@
|
||||
import argparse
|
||||
from typing import Callable, NoReturn, Optional
|
||||
from collections.abc import Callable
|
||||
from typing import NoReturn
|
||||
|
||||
start_server: Optional[Callable] = None
|
||||
ServerImportError: Optional[ImportError] = None
|
||||
start_server: Callable | None = None
|
||||
ServerImportError: ImportError | None = None
|
||||
try:
|
||||
from .server import start_server
|
||||
except ImportError as e:
|
||||
|
@ -1,5 +1,4 @@
|
||||
from enum import Enum
|
||||
from typing import Dict, List
|
||||
|
||||
from pydantic import BaseModel, Extra, Field
|
||||
|
||||
@ -71,9 +70,9 @@ class FlakeListResponse(BaseModel):
|
||||
|
||||
|
||||
class FlakeCreateResponse(BaseModel):
|
||||
cmd_out: Dict[str, CmdOut]
|
||||
cmd_out: dict[str, CmdOut]
|
||||
|
||||
|
||||
class FlakeResponse(BaseModel):
|
||||
content: str
|
||||
actions: List[FlakeAction]
|
||||
actions: list[FlakeAction]
|
||||
|
@ -5,10 +5,10 @@ import shutil
|
||||
import subprocess
|
||||
import time
|
||||
import urllib.request
|
||||
from collections.abc import Iterator
|
||||
from contextlib import ExitStack, contextmanager
|
||||
from pathlib import Path
|
||||
from threading import Thread
|
||||
from typing import Iterator
|
||||
|
||||
# XXX: can we dynamically load this using nix develop?
|
||||
import uvicorn
|
||||
|
@ -1,5 +1,5 @@
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List
|
||||
from typing import Any
|
||||
|
||||
|
||||
class Tags(Enum):
|
||||
@ -13,7 +13,7 @@ class Tags(Enum):
|
||||
return self.value
|
||||
|
||||
|
||||
tags_metadata: List[Dict[str, Any]] = [
|
||||
tags_metadata: list[dict[str, Any]] = [
|
||||
{
|
||||
"name": str(Tags.flake),
|
||||
"description": "Operations on a flake.",
|
||||
|
@ -1,26 +1,27 @@
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
from typing import IO, Any, Dict, Iterator, List, Optional, Union
|
||||
from typing import IO, Any
|
||||
|
||||
import pytest
|
||||
|
||||
_FILE = Union[None, int, IO[Any]]
|
||||
_FILE = None | int | IO[Any]
|
||||
|
||||
|
||||
class Command:
|
||||
def __init__(self) -> None:
|
||||
self.processes: List[subprocess.Popen[str]] = []
|
||||
self.processes: list[subprocess.Popen[str]] = []
|
||||
|
||||
def run(
|
||||
self,
|
||||
command: List[str],
|
||||
extra_env: Dict[str, str] = {},
|
||||
command: list[str],
|
||||
extra_env: dict[str, str] = {},
|
||||
stdin: _FILE = None,
|
||||
stdout: _FILE = None,
|
||||
stderr: _FILE = None,
|
||||
workdir: Optional[Path] = None,
|
||||
workdir: Path | None = None,
|
||||
) -> subprocess.Popen[str]:
|
||||
env = os.environ.copy()
|
||||
env.update(extra_env)
|
||||
|
@ -4,8 +4,9 @@ import os
|
||||
import shutil
|
||||
import subprocess as sp
|
||||
import tempfile
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
from typing import Iterator, NamedTuple
|
||||
from typing import NamedTuple
|
||||
|
||||
import pytest
|
||||
from pydantic import AnyUrl
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
import contextlib
|
||||
import socket
|
||||
from typing import Callable
|
||||
from collections.abc import Callable
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -3,10 +3,11 @@ import shutil
|
||||
import string
|
||||
import subprocess
|
||||
import time
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
from sys import platform
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import TYPE_CHECKING, Iterator
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from cli import Cli
|
||||
@ -206,18 +206,18 @@ def test_map_type() -> None:
|
||||
assert config.map_type("boolean") == bool
|
||||
assert config.map_type("attribute set of string") == dict[str, str]
|
||||
assert config.map_type("attribute set of integer") == dict[str, int]
|
||||
assert config.map_type("null or string") == Optional[str]
|
||||
assert config.map_type("null or string") == str | None
|
||||
|
||||
|
||||
# test the cast function with simple types
|
||||
def test_cast() -> None:
|
||||
assert config.cast(value=["true"], type=bool, opt_description="foo-option") is True
|
||||
assert (
|
||||
config.cast(value=["null"], type=Optional[str], opt_description="foo-option")
|
||||
config.cast(value=["null"], type=str | None, opt_description="foo-option")
|
||||
is None
|
||||
)
|
||||
assert (
|
||||
config.cast(value=["bar"], type=Optional[str], opt_description="foo-option")
|
||||
config.cast(value=["bar"], type=str | None, opt_description="foo-option")
|
||||
== "bar"
|
||||
)
|
||||
|
||||
|
@ -1,7 +1,8 @@
|
||||
import logging
|
||||
import os
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from typing import TYPE_CHECKING, Iterator
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from cli import Cli
|
||||
|
@ -1,6 +1,5 @@
|
||||
import os
|
||||
import sys
|
||||
from typing import Union
|
||||
|
||||
import pytest
|
||||
import pytest_subprocess.fake_process
|
||||
@ -28,7 +27,7 @@ def test_ssh_no_pass(
|
||||
user = "user"
|
||||
if os.environ.get("IN_NIX_SANDBOX"):
|
||||
monkeypatch.delenv("IN_NIX_SANDBOX")
|
||||
cmd: list[Union[str, utils.Any]] = [
|
||||
cmd: list[str | utils.Any] = [
|
||||
"nix",
|
||||
fp.any(),
|
||||
"shell",
|
||||
@ -58,7 +57,7 @@ def test_ssh_with_pass(
|
||||
user = "user"
|
||||
if os.environ.get("IN_NIX_SANDBOX"):
|
||||
monkeypatch.delenv("IN_NIX_SANDBOX")
|
||||
cmd: list[Union[str, utils.Any]] = [
|
||||
cmd: list[str | utils.Any] = [
|
||||
"nix",
|
||||
fp.any(),
|
||||
"shell",
|
||||
@ -79,7 +78,7 @@ def test_ssh_with_pass(
|
||||
|
||||
|
||||
def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
|
||||
cmd: list[Union[str, utils.Any]] = [fp.any()]
|
||||
cmd: list[str | utils.Any] = [fp.any()]
|
||||
fp.register(cmd, stdout="https://test.test")
|
||||
result = cli.qrcode_scan("test.png")
|
||||
assert result == "https://test.test"
|
||||
|
@ -1,7 +1,7 @@
|
||||
import argparse
|
||||
from typing import Callable, Optional
|
||||
from collections.abc import Callable
|
||||
|
||||
start_app: Optional[Callable] = None
|
||||
start_app: Callable | None = None
|
||||
|
||||
from .app import start_app
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user