apply TRY lint
This commit is contained in:
parent
68d777166a
commit
403b9cf2cc
@ -28,9 +28,10 @@ def try_bind_port(port: int) -> bool:
|
||||
try:
|
||||
tcp.bind(("127.0.0.1", port))
|
||||
udp.bind(("127.0.0.1", port))
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def try_connect_port(port: int) -> bool:
|
||||
|
@ -54,8 +54,8 @@ class ImplFunc(GObject.Object, Generic[P, B]):
|
||||
result = GLib.SOURCE_REMOVE
|
||||
try:
|
||||
result = self.async_run(**data)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
except Exception:
|
||||
log.exception("Error in async_run")
|
||||
# TODO: send error to js
|
||||
return result
|
||||
|
||||
@ -78,8 +78,8 @@ class MethodExecutor(threading.Thread):
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self.result = self.function(*self.args, **self.kwargs)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
except Exception:
|
||||
log.exception("Error in MethodExecutor")
|
||||
finally:
|
||||
self.finished = True
|
||||
|
||||
|
@ -402,15 +402,12 @@ def main() -> None:
|
||||
try:
|
||||
args.func(args)
|
||||
except ClanError as e:
|
||||
if args.debug:
|
||||
log.exception(e)
|
||||
sys.exit(1)
|
||||
if isinstance(e, ClanCmdError):
|
||||
if e.cmd.msg:
|
||||
log.error(e.cmd.msg)
|
||||
log.exception(e.cmd.msg)
|
||||
sys.exit(1)
|
||||
|
||||
log.error(e)
|
||||
log.exception("An error occurred")
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt:
|
||||
log.warning("Interrupted by user")
|
||||
|
@ -1,3 +1,4 @@
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.inventory import (
|
||||
AdminConfig,
|
||||
ServiceAdmin,
|
||||
@ -39,7 +40,7 @@ def set_admin_service(
|
||||
|
||||
if not allowed_keys:
|
||||
msg = "At least one key must be provided to ensure access"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
instance = ServiceAdmin(
|
||||
meta=ServiceMeta(name=instance_name),
|
||||
|
@ -203,8 +203,8 @@ def generate_facts(
|
||||
was_regenerated |= _generate_facts_for_machine(
|
||||
machine, service, regenerate, tmpdir, prompt
|
||||
)
|
||||
except (OSError, ClanError) as exc:
|
||||
log.error(f"Failed to generate facts for {machine.name}: {exc}")
|
||||
except (OSError, ClanError):
|
||||
log.exception(f"Failed to generate facts for {machine.name}")
|
||||
errors += 1
|
||||
if errors > 0:
|
||||
msg = (
|
||||
|
@ -125,10 +125,11 @@ def load_inventory_eval(flake_dir: str | Path) -> Inventory:
|
||||
res = proc.stdout.strip()
|
||||
data = json.loads(res)
|
||||
inventory = from_dict(Inventory, data)
|
||||
return inventory
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Error decoding inventory from flake: {e}"
|
||||
raise ClanError(msg) from e
|
||||
else:
|
||||
return inventory
|
||||
|
||||
|
||||
def load_inventory_json(
|
||||
|
@ -200,7 +200,7 @@ def generate_machine_hardware_info(
|
||||
try:
|
||||
show_machine_hardware_platform(clan_dir, machine_name)
|
||||
except ClanCmdError as e:
|
||||
log.error(e)
|
||||
log.exception("Failed to evaluate hardware-configuration.nix")
|
||||
# Restore the backup file
|
||||
print(f"Restoring backup file {backup_file}")
|
||||
if backup_file:
|
||||
|
@ -72,10 +72,11 @@ def list_nixos_machines(flake_url: str | Path) -> list[str]:
|
||||
try:
|
||||
res = proc.stdout.strip()
|
||||
data = json.loads(res)
|
||||
return data
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Error decoding machines from flake: {e}"
|
||||
raise ClanError(msg) from e
|
||||
else:
|
||||
return data
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -123,10 +124,10 @@ def check_machine_online(
|
||||
proc = run_no_stdout(cmd)
|
||||
if proc.returncode != 0:
|
||||
return "Offline"
|
||||
|
||||
return "Online"
|
||||
except ClanCmdError:
|
||||
return "Offline"
|
||||
else:
|
||||
return "Online"
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
|
@ -6,6 +6,7 @@ from typing import Any
|
||||
|
||||
from clan_cli.cmd import run, run_no_stdout
|
||||
from clan_cli.dirs import nixpkgs_flake, nixpkgs_source
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
|
||||
def nix_command(flags: list[str]) -> list[str]:
|
||||
|
@ -108,9 +108,9 @@ def profile(func: Callable) -> Callable:
|
||||
profiler.enable()
|
||||
res = func(*args, **kwargs)
|
||||
profiler.disable()
|
||||
except Exception as ex:
|
||||
except Exception:
|
||||
profiler.disable()
|
||||
raise ex
|
||||
raise
|
||||
return res
|
||||
|
||||
if os.getenv("PERF", "0") == "1":
|
||||
|
@ -4,6 +4,8 @@ import socket
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
|
||||
# qga is almost like qmp, but not quite, because:
|
||||
# - server doesn't send initial message
|
||||
@ -16,9 +18,10 @@ class QgaSession:
|
||||
for _ in range(100):
|
||||
try:
|
||||
self.sock.connect(str(socket_file))
|
||||
return
|
||||
except ConnectionRefusedError:
|
||||
sleep(0.1)
|
||||
else:
|
||||
return
|
||||
self.sock.connect(str(socket_file))
|
||||
|
||||
def get_response(self) -> dict:
|
||||
@ -59,7 +62,7 @@ class QgaSession:
|
||||
result = self.get_response()
|
||||
if "error" in result and result["error"]["desc"].startswith("PID"):
|
||||
msg = "PID could not be found"
|
||||
raise Exception(msg)
|
||||
raise ClanError(msg)
|
||||
if result["return"]["exited"]:
|
||||
break
|
||||
sleep(0.1)
|
||||
@ -77,5 +80,5 @@ class QgaSession:
|
||||
)
|
||||
if check and exitcode != 0:
|
||||
msg = f"Command on guest failed\nCommand: {cmd}\nExitcode {exitcode}\nStdout: {stdout}\nStderr: {stderr}"
|
||||
raise Exception(msg)
|
||||
raise ClanError(msg)
|
||||
return exitcode, stdout, stderr
|
||||
|
@ -16,6 +16,8 @@ import socket
|
||||
import types
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
|
||||
class QMPError(Exception):
|
||||
"""
|
||||
@ -159,7 +161,6 @@ class QEMUMonitorProtocol:
|
||||
) -> None:
|
||||
# Implement context manager exit function.
|
||||
self.close()
|
||||
return False
|
||||
|
||||
def connect(self, negotiate: bool = True) -> dict[str, Any] | None:
|
||||
"""
|
||||
@ -211,7 +212,7 @@ class QEMUMonitorProtocol:
|
||||
except OSError as err:
|
||||
if err.errno == errno.EPIPE:
|
||||
return None
|
||||
raise err
|
||||
raise
|
||||
resp = self.__json_read()
|
||||
self.logger.debug("<<< %s", resp)
|
||||
return resp
|
||||
@ -242,7 +243,7 @@ class QEMUMonitorProtocol:
|
||||
"""
|
||||
ret = self.cmd(cmd, kwds)
|
||||
if "error" in ret:
|
||||
raise Exception(ret["error"]["desc"])
|
||||
raise ClanError(ret["error"]["desc"])
|
||||
return ret["return"]
|
||||
|
||||
def pull_event(self, wait: bool | float = False) -> dict[str, Any] | None:
|
||||
|
@ -56,10 +56,11 @@ def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:
|
||||
if out_file:
|
||||
out_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
out_file.write_text(res)
|
||||
return private_key, pubkey
|
||||
except subprocess.CalledProcessError as e:
|
||||
msg = "Failed to generate private sops key"
|
||||
raise ClanError(msg) from e
|
||||
else:
|
||||
return private_key, pubkey
|
||||
|
||||
|
||||
def get_user_name(flake_dir: Path, user: str) -> str:
|
||||
|
@ -751,7 +751,7 @@ class HostGroup:
|
||||
"""
|
||||
threads = []
|
||||
results: list[HostResult[T]] = [
|
||||
HostResult(h, Exception(f"No result set for thread {i}"))
|
||||
HostResult(h, ClanError(f"No result set for thread {i}"))
|
||||
for (i, h) in enumerate(self.hosts)
|
||||
]
|
||||
for i, host in enumerate(self.hosts):
|
||||
@ -800,7 +800,7 @@ def parse_deployment_address(
|
||||
result = urllib.parse.urlsplit("//" + hostname)
|
||||
if not result.hostname:
|
||||
msg = f"Invalid hostname: {hostname}"
|
||||
raise Exception(msg)
|
||||
raise ClanError(msg)
|
||||
hostname = result.hostname
|
||||
port = result.port
|
||||
meta = meta.copy()
|
||||
|
@ -82,9 +82,10 @@ def is_reachable(host: str) -> bool:
|
||||
try:
|
||||
sock.connect((host, 22))
|
||||
sock.close()
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def connect_ssh_from_json(ssh_data: dict[str, str]) -> None:
|
||||
|
@ -274,7 +274,7 @@ def generate_vars(
|
||||
machine, generator_name, regenerate
|
||||
)
|
||||
except Exception as exc:
|
||||
log.error(f"Failed to generate facts for {machine.name}: {exc}")
|
||||
log.exception(f"Failed to generate facts for {machine.name}")
|
||||
errors += [exc]
|
||||
if len(errors) > 0:
|
||||
msg = f"Failed to generate facts for {len(errors)} hosts. Check the logs above"
|
||||
|
@ -58,10 +58,11 @@ def build_vm(
|
||||
try:
|
||||
vm_data = json.loads(Path(nixos_config_file).read_text())
|
||||
vm_data["secrets_dir"] = str(secrets_dir)
|
||||
return vm_data
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Failed to parse vm config: {e}"
|
||||
raise ClanError(msg) from e
|
||||
else:
|
||||
return vm_data
|
||||
|
||||
|
||||
def get_secrets(
|
||||
|
@ -15,9 +15,10 @@ def test_vsock_port(port: int) -> bool:
|
||||
s = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
|
||||
s.connect((VMADDR_CID_HYPERVISOR, port))
|
||||
s.close()
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
2
pkgs/clan-cli/tests/fixture_error.py
Normal file
2
pkgs/clan-cli/tests/fixture_error.py
Normal file
@ -0,0 +1,2 @@
|
||||
class FixtureError(Exception):
|
||||
pass
|
@ -10,6 +10,7 @@ from typing import NamedTuple
|
||||
|
||||
import pytest
|
||||
from clan_cli.dirs import nixpkgs_source
|
||||
from fixture_error import FixtureError
|
||||
from root import CLAN_CORE
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -226,7 +227,7 @@ def test_flake(
|
||||
if git_proc.returncode != 0:
|
||||
log.error(git_proc.stderr.decode())
|
||||
msg = "git diff on ./sops is not empty. This should not happen as all changes should be committed"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -235,7 +236,7 @@ def test_flake_with_core(
|
||||
) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
msg = "clan-core flake not found. This test requires the clan-core flake to be present"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
yield from create_flake(
|
||||
temporary_home,
|
||||
"test_flake_with_core",
|
||||
@ -252,11 +253,11 @@ def test_local_democlan(
|
||||
msg = (
|
||||
"DEMOCLAN_ROOT not set. This test requires the democlan flake to be present"
|
||||
)
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
democlan_p = Path(democlan).resolve()
|
||||
if not democlan_p.is_dir():
|
||||
msg = f"DEMOCLAN_ROOT ({democlan_p}) is not a directory. This test requires the democlan directory to be present"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
|
||||
return FlakeForTest(democlan_p)
|
||||
|
||||
@ -267,7 +268,7 @@ def test_flake_with_core_and_pass(
|
||||
) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
msg = "clan-core flake not found. This test requires the clan-core flake to be present"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
yield from create_flake(
|
||||
temporary_home,
|
||||
"test_flake_with_core_and_pass",
|
||||
@ -281,7 +282,7 @@ def test_flake_minimal(
|
||||
) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
msg = "clan-core flake not found. This test requires the clan-core flake to be present"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
yield from create_flake(
|
||||
temporary_home,
|
||||
CLAN_CORE / "templates" / "minimal",
|
||||
|
@ -10,6 +10,7 @@ from tempfile import TemporaryDirectory
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixture_error import FixtureError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from command import Command
|
||||
@ -134,5 +135,5 @@ def sshd(
|
||||
rc = proc.poll()
|
||||
if rc is not None:
|
||||
msg = f"sshd processes was terminated with {rc}"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
time.sleep(0.1)
|
||||
|
@ -60,10 +60,11 @@ class EmptySplash(Gtk.Box):
|
||||
def load_image(self, file_path: str) -> GdkPixbuf.Pixbuf | None:
|
||||
try:
|
||||
pixbuf = GdkPixbuf.Pixbuf.new_from_file(file_path)
|
||||
return pixbuf
|
||||
except Exception as e:
|
||||
log.error(f"Failed to load image: {e}")
|
||||
except Exception:
|
||||
log.exception("Failed to load image")
|
||||
return None
|
||||
else:
|
||||
return pixbuf
|
||||
|
||||
def _on_join(self, button: Gtk.Button, entry: Gtk.Entry) -> None:
|
||||
"""
|
||||
|
@ -280,8 +280,8 @@ class VMObject(GObject.Object):
|
||||
if not self._log_file:
|
||||
try:
|
||||
self._log_file = Path(proc.out_file).open() # noqa: SIM115
|
||||
except Exception as ex:
|
||||
log.exception(ex)
|
||||
except Exception:
|
||||
log.exception(f"Failed to open log file {proc.out_file}")
|
||||
self._log_file = None
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
|
@ -48,11 +48,12 @@ def send_join_request_api(host: str, port: int) -> bool:
|
||||
body = response.split("\n")[-1]
|
||||
print(body)
|
||||
moonlight.terminate()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
moonlight.terminate()
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def send_join_request_native(host: str, port: int, cert: str) -> bool:
|
||||
@ -78,9 +79,10 @@ def send_join_request_native(host: str, port: int, cert: str) -> bool:
|
||||
lines = response.split("\n")
|
||||
body = "\n".join(lines[2:])[2:]
|
||||
print(body)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
else:
|
||||
return True
|
||||
# TODO: fix
|
||||
try:
|
||||
print(f"response: {response}")
|
||||
|
@ -23,21 +23,23 @@ class MoonlightPairing:
|
||||
)
|
||||
thread.start()
|
||||
print("Thread started")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(
|
||||
"Error occurred while starting the process: ", str(e), file=sys.stderr
|
||||
)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def check(self, host: str) -> bool:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["moonlight", "list", "localhost", host], check=True
|
||||
)
|
||||
return result.returncode == 0
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
else:
|
||||
return result.returncode == 0
|
||||
|
||||
def terminate(self) -> None:
|
||||
if self.process:
|
||||
|
@ -39,6 +39,7 @@ lint.select = [
|
||||
"SLOT",
|
||||
"T10",
|
||||
"TID",
|
||||
"TRY",
|
||||
"U",
|
||||
"YTT",
|
||||
]
|
||||
@ -55,4 +56,5 @@ lint.ignore = [
|
||||
"SIM102",
|
||||
"SIM108",
|
||||
"SIM112",
|
||||
"ISC001",
|
||||
]
|
||||
|
Loading…
Reference in New Issue
Block a user