disallow variable shadowing
This commit is contained in:
parent
780ffb9c8f
commit
4fd84d1c48
|
@ -19,8 +19,8 @@ test_driver = ["py.typed"]
|
|||
target-version = "py311"
|
||||
line-length = 88
|
||||
|
||||
select = ["E", "F", "I", "U", "N", "RUF", "ANN"]
|
||||
ignore = ["E501", "ANN101", "ANN401"]
|
||||
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||
ignore = ["E501", "ANN101", "ANN401", "A003"]
|
||||
|
||||
[tool.mypy]
|
||||
python_version = "3.11"
|
||||
|
|
|
@ -93,8 +93,8 @@ class ZerotierController:
|
|||
data=data,
|
||||
)
|
||||
|
||||
def get_network(self, id: str) -> dict[str, Any]:
|
||||
return self._http_request(f"/controller/network/{id}")
|
||||
def get_network(self, network_id: str) -> dict[str, Any]:
|
||||
return self._http_request(f"/controller/network/{network_id}")
|
||||
|
||||
|
||||
@contextmanager
|
||||
|
|
|
@ -21,28 +21,28 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
# nixos option type description to python type
|
||||
def map_type(type: str) -> Any:
|
||||
if type == "boolean":
|
||||
def map_type(nix_type: str) -> Any:
|
||||
if nix_type == "boolean":
|
||||
return bool
|
||||
elif type in [
|
||||
elif nix_type in [
|
||||
"integer",
|
||||
"signed integer",
|
||||
"16 bit unsigned integer; between 0 and 65535 (both inclusive)",
|
||||
]:
|
||||
return int
|
||||
elif type.startswith("string"):
|
||||
elif nix_type.startswith("string"):
|
||||
return str
|
||||
elif type.startswith("null or "):
|
||||
subtype = type.removeprefix("null or ")
|
||||
elif nix_type.startswith("null or "):
|
||||
subtype = nix_type.removeprefix("null or ")
|
||||
return map_type(subtype) | None
|
||||
elif type.startswith("attribute set of"):
|
||||
subtype = type.removeprefix("attribute set of ")
|
||||
elif nix_type.startswith("attribute set of"):
|
||||
subtype = nix_type.removeprefix("attribute set of ")
|
||||
return dict[str, map_type(subtype)] # type: ignore
|
||||
elif type.startswith("list of"):
|
||||
subtype = type.removeprefix("list of ")
|
||||
elif nix_type.startswith("list of"):
|
||||
subtype = nix_type.removeprefix("list of ")
|
||||
return list[map_type(subtype)] # type: ignore
|
||||
else:
|
||||
raise ClanError(f"Unknown type {type}")
|
||||
raise ClanError(f"Unknown type {nix_type}")
|
||||
|
||||
|
||||
# merge two dicts recursively
|
||||
|
@ -70,10 +70,10 @@ class AllContainer(list):
|
|||
|
||||
# value is always a list, as the arg parser cannot know the type upfront
|
||||
# and therefore always allows multiple arguments.
|
||||
def cast(value: Any, type: Any, opt_description: str) -> Any:
|
||||
def cast(value: Any, input_type: Any, opt_description: str) -> Any:
|
||||
try:
|
||||
# handle bools
|
||||
if isinstance(type, bool):
|
||||
if isinstance(input_type, bool):
|
||||
if value[0] in ["true", "True", "yes", "y", "1"]:
|
||||
return True
|
||||
elif value[0] in ["false", "False", "no", "n", "0"]:
|
||||
|
@ -81,28 +81,28 @@ def cast(value: Any, type: Any, opt_description: str) -> Any:
|
|||
else:
|
||||
raise ClanError(f"Invalid value {value} for boolean")
|
||||
# handle lists
|
||||
elif get_origin(type) == list:
|
||||
subtype = type.__args__[0]
|
||||
elif get_origin(input_type) == list:
|
||||
subtype = input_type.__args__[0]
|
||||
return [cast([x], subtype, opt_description) for x in value]
|
||||
# handle dicts
|
||||
elif get_origin(type) == dict:
|
||||
elif get_origin(input_type) == dict:
|
||||
if not isinstance(value, dict):
|
||||
raise ClanError(
|
||||
f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
|
||||
)
|
||||
subtype = type.__args__[1]
|
||||
subtype = input_type.__args__[1]
|
||||
return {k: cast(v, subtype, opt_description) for k, v in value.items()}
|
||||
elif str(type) == "typing.Optional[str]":
|
||||
elif str(input_type) == "str | None":
|
||||
if value[0] in ["null", "None"]:
|
||||
return None
|
||||
return value[0]
|
||||
else:
|
||||
if len(value) > 1:
|
||||
raise ClanError(f"Too many values for {opt_description}")
|
||||
return type(value[0])
|
||||
return input_type(value[0])
|
||||
except ValueError:
|
||||
raise ClanError(
|
||||
f"Invalid type for option {opt_description} (expected {type.__name__})"
|
||||
f"Invalid type for option {opt_description} (expected {input_type.__name__})"
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
from collections.abc import Callable
|
||||
from types import ModuleType
|
||||
from typing import Any
|
||||
from typing import Any, Protocol
|
||||
|
||||
|
||||
class AnyCall(Protocol):
|
||||
def __call__(self, *args: Any, **kwargs: Any) -> Any:
|
||||
...
|
||||
|
||||
|
||||
class FakeDeal:
|
||||
def __getattr__(self, name: str) -> "Callable":
|
||||
def __getattr__(self, name: str) -> AnyCall:
|
||||
return self.mock_call
|
||||
|
||||
def mock_call(self, *args: Any, **kwargs: Any) -> Callable:
|
||||
def wrapper(func: Callable) -> Callable:
|
||||
def mock_call(self, *args: Any, **kwargs: Any) -> Callable[[AnyCall], AnyCall]:
|
||||
def wrapper(func: AnyCall) -> AnyCall:
|
||||
return func
|
||||
|
||||
return wrapper
|
||||
|
|
|
@ -1,24 +1,17 @@
|
|||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import shlex
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import ipdb
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def command_exec(cmd: list[str], work_dir: Path, env: dict[str, str]) -> None:
|
||||
subprocess.run(cmd, check=True, env=env, cwd=work_dir.resolve())
|
||||
|
||||
|
||||
def block_for_input() -> None:
|
||||
log = logging.getLogger(__name__)
|
||||
procid = os.getpid()
|
||||
|
@ -66,12 +59,13 @@ def breakpoint_shell(
|
|||
if cmd is not None:
|
||||
mycommand = shlex.join(cmd)
|
||||
write_command(mycommand, work_dir / "cmd.sh")
|
||||
proc = spawn_process(func=command_exec, cmd=args, work_dir=work_dir, env=env)
|
||||
proc = subprocess.Popen(args, env=env, cwd=work_dir.resolve())
|
||||
|
||||
try:
|
||||
ipdb.set_trace()
|
||||
finally:
|
||||
proc.terminate()
|
||||
with proc:
|
||||
try:
|
||||
ipdb.set_trace()
|
||||
finally:
|
||||
proc.terminate()
|
||||
|
||||
|
||||
def write_command(command: str, loc: Path) -> None:
|
||||
|
@ -83,15 +77,6 @@ def write_command(command: str, loc: Path) -> None:
|
|||
os.chmod(loc, st.st_mode | stat.S_IEXEC)
|
||||
|
||||
|
||||
def spawn_process(func: Callable, **kwargs: Any) -> mp.Process:
|
||||
if mp.get_start_method(allow_none=True) is None:
|
||||
mp.set_start_method(method="spawn")
|
||||
|
||||
proc = mp.Process(target=func, name="python-debug-process", kwargs=kwargs)
|
||||
proc.start()
|
||||
return proc
|
||||
|
||||
|
||||
def dump_env(env: dict[str, str], loc: Path) -> None:
|
||||
cenv = env.copy()
|
||||
log.info("Dumping environment variables to %s", loc)
|
||||
|
|
|
@ -11,7 +11,7 @@ async def add_flake(path: Path) -> dict[str, CmdOut]:
|
|||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||
# append line to history file
|
||||
# TODO: Make this atomic
|
||||
lines: set = set()
|
||||
lines: set[str] = set()
|
||||
if user_history_file().exists():
|
||||
with open(user_history_file()) as f:
|
||||
lines = set(f.readlines())
|
||||
|
|
|
@ -425,14 +425,14 @@ class Host:
|
|||
sudo = ""
|
||||
if become_root and self.user != "root":
|
||||
sudo = "sudo -- "
|
||||
vars = []
|
||||
env_vars = []
|
||||
for k, v in extra_env.items():
|
||||
vars.append(f"{shlex.quote(k)}={shlex.quote(v)}")
|
||||
env_vars.append(f"{shlex.quote(k)}={shlex.quote(v)}")
|
||||
|
||||
displayed_cmd = ""
|
||||
export_cmd = ""
|
||||
if vars:
|
||||
export_cmd = f"export {' '.join(vars)}; "
|
||||
if env_vars:
|
||||
export_cmd = f"export {' '.join(env_vars)}; "
|
||||
displayed_cmd += export_cmd
|
||||
if isinstance(cmd, list):
|
||||
displayed_cmd += " ".join(cmd)
|
||||
|
@ -469,7 +469,7 @@ class Host:
|
|||
def ssh_cmd(
|
||||
self,
|
||||
verbose_ssh: bool = False,
|
||||
) -> list:
|
||||
) -> list[str]:
|
||||
if self.user is not None:
|
||||
ssh_target = f"{self.user}@{self.host}"
|
||||
else:
|
||||
|
|
|
@ -21,8 +21,8 @@ from .errors import ClanError
|
|||
class Command:
|
||||
def __init__(self, log: logging.Logger) -> None:
|
||||
self.log: logging.Logger = log
|
||||
self.p: subprocess.Popen | None = None
|
||||
self._output: queue.SimpleQueue = queue.SimpleQueue()
|
||||
self.p: subprocess.Popen[str] | None = None
|
||||
self._output: queue.SimpleQueue[str | None] = queue.SimpleQueue()
|
||||
self.returncode: int | None = None
|
||||
self.done: bool = False
|
||||
self.stdout: list[str] = []
|
||||
|
@ -148,8 +148,8 @@ class BaseTask:
|
|||
for line in proc.stderr:
|
||||
yield line
|
||||
else:
|
||||
while line := proc._output.get():
|
||||
yield line
|
||||
while maybe_line := proc._output.get():
|
||||
yield maybe_line
|
||||
|
||||
def commands(self) -> Iterator[Command]:
|
||||
yield from self.procs
|
||||
|
|
|
@ -55,5 +55,5 @@ ignore_missing_imports = true
|
|||
[tool.ruff]
|
||||
target-version = "py311"
|
||||
line-length = 88
|
||||
select = ["E", "F", "I", "U", "N", "RUF", "ANN"]
|
||||
ignore = ["E501", "E402", "ANN101", "ANN401"]
|
||||
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||
ignore = ["E501", "E402", "ANN101", "ANN401", "A003"]
|
||||
|
|
|
@ -38,14 +38,14 @@ def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
|
|||
# FIXME, if any parent of the sshd directory is world-writable than sshd will refuse it.
|
||||
# we use .direnv instead since it's already in .gitignore
|
||||
with TemporaryDirectory() as _dir:
|
||||
dir = Path(_dir)
|
||||
tmpdir = Path(_dir)
|
||||
host_key = test_root / "data" / "ssh_host_ed25519_key"
|
||||
host_key.chmod(0o600)
|
||||
template = (test_root / "data" / "sshd_config").read_text()
|
||||
content = string.Template(template).substitute(dict(host_key=host_key))
|
||||
config = dir / "sshd_config"
|
||||
config = tmpdir / "sshd_config"
|
||||
config.write_text(content)
|
||||
login_shell = dir / "shell"
|
||||
login_shell = tmpdir / "shell"
|
||||
|
||||
bash = shutil.which("bash")
|
||||
path = os.environ["PATH"]
|
||||
|
@ -72,7 +72,7 @@ exec {bash} -l "${{@}}"
|
|||
), "we do not support the ld_preload trick on non-linux just now"
|
||||
|
||||
# This enforces a login shell by overriding the login shell of `getpwnam(3)`
|
||||
lib_path = dir / "libgetpwnam-preload.so"
|
||||
lib_path = tmpdir / "libgetpwnam-preload.so"
|
||||
subprocess.run(
|
||||
[
|
||||
os.environ.get("CC", "cc"),
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from cli import Cli
|
||||
|
@ -211,13 +211,16 @@ def test_map_type() -> None:
|
|||
|
||||
# test the cast function with simple types
|
||||
def test_cast() -> None:
|
||||
assert config.cast(value=["true"], type=bool, opt_description="foo-option") is True
|
||||
assert (
|
||||
config.cast(value=["null"], type=Optional[str], opt_description="foo-option") # noqa: UP007
|
||||
config.cast(value=["true"], input_type=bool, opt_description="foo-option")
|
||||
is True
|
||||
)
|
||||
assert (
|
||||
config.cast(value=["null"], input_type=str | None, opt_description="foo-option")
|
||||
is None
|
||||
)
|
||||
assert (
|
||||
config.cast(value=["bar"], type=Optional[str], opt_description="foo-option") # noqa: UP007
|
||||
config.cast(value=["bar"], input_type=str | None, opt_description="foo-option")
|
||||
== "bar"
|
||||
)
|
||||
|
||||
|
|
|
@ -24,5 +24,5 @@ ignore_missing_imports = true
|
|||
[tool.ruff]
|
||||
target-version = "py311"
|
||||
line-length = 88
|
||||
select = [ "E", "F", "I", "U", "N", "RUF", "ANN" ]
|
||||
ignore = ["E501", "E402", "N802", "ANN101", "ANN401"]
|
||||
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||
ignore = ["E501", "E402", "N802", "ANN101", "ANN401", "A003"]
|
||||
|
|
|
@ -10,5 +10,5 @@ exclude = "clan_cli.nixpkgs"
|
|||
[tool.ruff]
|
||||
line-length = 88
|
||||
target-version = "py311"
|
||||
select = [ "E", "F", "I", "U", "N", "RUF", "ANN" ]
|
||||
ignore = [ "E501", "ANN101", "ANN401"]
|
||||
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||
ignore = [ "E501", "ANN101", "ANN401", "A003"]
|
||||
|
|
Loading…
Reference in New Issue
Block a user