forked from clan/clan-core
CLI: History supports multiple attrs from the same url now. Errors when executing the cli are formatted better
This commit is contained in:
parent
abfa2f218c
commit
d20f47ad5b
@ -1,6 +1,7 @@
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
import traceback
|
||||
from collections.abc import Sequence
|
||||
from pathlib import Path
|
||||
from types import ModuleType
|
||||
@ -9,6 +10,7 @@ from typing import Any
|
||||
from . import backups, config, flakes, history, machines, secrets, vms
|
||||
from .custom_logger import setup_logging
|
||||
from .dirs import get_clan_flake_toplevel
|
||||
from .errors import ClanCmdError, ClanError
|
||||
from .ssh import cli as ssh_cli
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -130,7 +132,18 @@ def main() -> None:
|
||||
if not hasattr(args, "func"):
|
||||
return
|
||||
|
||||
args.func(args)
|
||||
try:
|
||||
args.func(args)
|
||||
except ClanError as e:
|
||||
if args.debug:
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
if isinstance(e, ClanCmdError):
|
||||
if e.cmd.msg:
|
||||
print(e.cmd.msg, file=sys.stderr)
|
||||
else:
|
||||
print(e, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -1,7 +1,21 @@
|
||||
import os
|
||||
from math import floor
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple
|
||||
|
||||
|
||||
def get_term_filler(name: str) -> int:
|
||||
width, height = os.get_terminal_size()
|
||||
|
||||
filler = floor((width - len(name)) / 2)
|
||||
return filler - 1
|
||||
|
||||
|
||||
def text_heading(heading: str) -> str:
|
||||
filler = get_term_filler(heading)
|
||||
return f"{'=' * filler} {heading} {'=' * filler}"
|
||||
|
||||
|
||||
class CmdOut(NamedTuple):
|
||||
stdout: str
|
||||
stderr: str
|
||||
@ -12,15 +26,16 @@ class CmdOut(NamedTuple):
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"""
|
||||
{text_heading(heading="Command")}
|
||||
{self.command}
|
||||
{text_heading(heading="Stderr")}
|
||||
{self.stderr}
|
||||
{text_heading(heading="Stdout")}
|
||||
{self.stdout}
|
||||
{text_heading(heading="Metadata")}
|
||||
Message: {self.msg}
|
||||
Working Directory: '{self.cwd}'
|
||||
Return Code: {self.returncode}
|
||||
=================== Command ===================
|
||||
{self.command}
|
||||
=================== STDERR ===================
|
||||
{self.stderr}
|
||||
=================== STDOUT ===================
|
||||
{self.stdout}
|
||||
"""
|
||||
|
||||
|
||||
|
@ -63,26 +63,32 @@ def list_history() -> list[HistoryEntry]:
|
||||
return logs
|
||||
|
||||
|
||||
# TODO: Add all vm entries to history
|
||||
def new_history_entry(uri: ClanURI) -> HistoryEntry:
|
||||
flake = inspect_flake(uri.get_internal(), uri.params.flake_attr)
|
||||
flake.flake_url = str(flake.flake_url)
|
||||
return HistoryEntry(
|
||||
flake=flake,
|
||||
last_used=datetime.datetime.now().isoformat(),
|
||||
)
|
||||
|
||||
|
||||
def add_history(uri: ClanURI) -> list[HistoryEntry]:
|
||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||
logs = list_history()
|
||||
found = False
|
||||
path = uri.get_internal()
|
||||
machine = uri.params.flake_attr
|
||||
uri_path = uri.get_internal()
|
||||
uri_machine = uri.params.flake_attr
|
||||
|
||||
for entry in logs:
|
||||
if entry.flake.flake_url == str(path):
|
||||
if (
|
||||
entry.flake.flake_url == str(uri_path)
|
||||
and entry.flake.flake_attr == uri_machine
|
||||
):
|
||||
found = True
|
||||
entry.last_used = datetime.datetime.now().isoformat()
|
||||
|
||||
if not found:
|
||||
flake = inspect_flake(path, machine)
|
||||
flake.flake_url = str(flake.flake_url)
|
||||
history = HistoryEntry(
|
||||
flake=flake,
|
||||
last_used=datetime.datetime.now().isoformat(),
|
||||
)
|
||||
history = new_history_entry(uri)
|
||||
logs.append(history)
|
||||
|
||||
write_history_file(logs)
|
||||
|
@ -1,34 +1,36 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
import copy
|
||||
import datetime
|
||||
|
||||
from ..clan_uri import ClanParameters, ClanURI
|
||||
from ..errors import ClanCmdError
|
||||
from ..locked_open import write_history_file
|
||||
from ..nix import nix_metadata
|
||||
from .add import HistoryEntry, list_history
|
||||
from .add import HistoryEntry, list_history, new_history_entry
|
||||
|
||||
|
||||
def update_history() -> list[HistoryEntry]:
|
||||
logs = list_history()
|
||||
|
||||
new_logs = []
|
||||
for entry in logs:
|
||||
new_entry = copy.deepcopy(entry)
|
||||
try:
|
||||
meta = nix_metadata(entry.flake.flake_url)
|
||||
except ClanCmdError as e:
|
||||
print(f"Failed to update {entry.flake.flake_url}: {e}")
|
||||
continue
|
||||
|
||||
meta = nix_metadata(entry.flake.flake_url)
|
||||
new_hash = meta["locked"]["narHash"]
|
||||
if new_hash != entry.flake.nar_hash:
|
||||
print(
|
||||
f"Updating {entry.flake.flake_url} from {entry.flake.nar_hash} to {new_hash}"
|
||||
)
|
||||
new_entry.last_used = datetime.datetime.now().isoformat()
|
||||
new_entry.flake.nar_hash = new_hash
|
||||
uri = ClanURI.from_str(
|
||||
url=str(entry.flake.flake_url),
|
||||
params=ClanParameters(entry.flake.flake_attr),
|
||||
)
|
||||
entry = new_history_entry(uri)
|
||||
|
||||
# TODO: Delete stale entries
|
||||
new_logs.append(new_entry)
|
||||
|
||||
write_history_file(new_logs)
|
||||
return new_logs
|
||||
write_history_file(logs)
|
||||
return logs
|
||||
|
||||
|
||||
def add_update_command(args: argparse.Namespace) -> None:
|
||||
|
@ -25,7 +25,8 @@ def inspect_vm(flake_url: str | Path, flake_attr: str) -> VmConfig:
|
||||
|
||||
cmd = nix_eval(
|
||||
[
|
||||
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.vm.inspect'
|
||||
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.vm.inspect',
|
||||
"--refresh",
|
||||
]
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user