Reworked machines list, and history commands
This commit is contained in:
parent
9d952ba534
commit
c90053834a
@ -40,6 +40,7 @@ class ClanParameters:
|
||||
class ClanURI:
|
||||
# Initialize the class with a clan:// URI
|
||||
def __init__(self, uri: str) -> None:
|
||||
|
||||
# Check if the URI starts with clan://
|
||||
if uri.startswith("clan://"):
|
||||
self._nested_uri = uri[7:]
|
||||
|
@ -13,6 +13,7 @@ class FlakeConfig:
|
||||
flake_url: str | Path
|
||||
flake_attr: str
|
||||
|
||||
nar_hash: str
|
||||
icon: str | None
|
||||
description: str | None
|
||||
last_updated: str
|
||||
@ -29,7 +30,7 @@ def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
|
||||
]
|
||||
)
|
||||
|
||||
proc = subprocess.run(cmd, check=True, text=True, stdout=subprocess.PIPE)
|
||||
proc = subprocess.run(cmd, text=True, capture_output=True)
|
||||
assert proc.stdout is not None
|
||||
if proc.returncode != 0:
|
||||
raise ClanError(
|
||||
@ -38,6 +39,8 @@ command: {shlex.join(cmd)}
|
||||
exit code: {proc.returncode}
|
||||
stdout:
|
||||
{proc.stdout}
|
||||
stderr:
|
||||
{proc.stderr}
|
||||
"""
|
||||
)
|
||||
res = proc.stdout.strip()
|
||||
@ -51,6 +54,7 @@ stdout:
|
||||
return FlakeConfig(
|
||||
flake_url=flake_url,
|
||||
flake_attr=flake_attr,
|
||||
nar_hash=meta["locked"]["narHash"],
|
||||
icon=icon_path,
|
||||
description=meta.get("description"),
|
||||
last_updated=meta["lastModified"],
|
||||
|
@ -3,7 +3,6 @@ import argparse
|
||||
import dataclasses
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
@ -22,11 +21,13 @@ class EnhancedJSONEncoder(json.JSONEncoder):
|
||||
|
||||
@dataclasses.dataclass
|
||||
class HistoryEntry:
|
||||
path: str
|
||||
last_used: str
|
||||
dir_datetime: str
|
||||
flake: FlakeConfig
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if isinstance(self.flake, dict):
|
||||
self.flake = FlakeConfig(**self.flake)
|
||||
|
||||
|
||||
def list_history() -> list[HistoryEntry]:
|
||||
logs: list[HistoryEntry] = []
|
||||
@ -45,35 +46,26 @@ def list_history() -> list[HistoryEntry]:
|
||||
return logs
|
||||
|
||||
|
||||
def get_dir_time(path: Path) -> str:
|
||||
# Get the last modified dir time in seconds
|
||||
dir_mtime = os.path.getmtime(path)
|
||||
dir_datetime = datetime.datetime.fromtimestamp(dir_mtime).isoformat()
|
||||
return dir_datetime
|
||||
|
||||
|
||||
def add_history(path: Path) -> list[HistoryEntry]:
|
||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||
logs = list_history()
|
||||
found = False
|
||||
|
||||
for entry in logs:
|
||||
if entry.path == str(path):
|
||||
if entry.flake.flake_url == str(path):
|
||||
found = True
|
||||
entry.last_used = datetime.datetime.now().isoformat()
|
||||
|
||||
if found:
|
||||
break
|
||||
|
||||
flake = inspect_flake(path, "defaultVM")
|
||||
flake.flake_url = str(flake.flake_url)
|
||||
dir_datetime = get_dir_time(path)
|
||||
|
||||
history = HistoryEntry(
|
||||
flake=flake,
|
||||
dir_datetime=dir_datetime,
|
||||
path=str(path),
|
||||
last_used=datetime.datetime.now().isoformat(),
|
||||
)
|
||||
if not found:
|
||||
logs.append(history)
|
||||
logs.append(history)
|
||||
|
||||
with locked_open(user_history_file(), "w+") as f:
|
||||
f.write(json.dumps(logs, cls=EnhancedJSONEncoder, indent=4))
|
||||
|
@ -6,7 +6,7 @@ from .add import list_history
|
||||
|
||||
def list_history_command(args: argparse.Namespace) -> None:
|
||||
for history_entry in list_history():
|
||||
print(history_entry.path)
|
||||
print(history_entry.flake.flake_url)
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
|
@ -3,11 +3,11 @@ import argparse
|
||||
import copy
|
||||
import datetime
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from ..dirs import user_history_file
|
||||
from ..locked_open import locked_open
|
||||
from .add import EnhancedJSONEncoder, HistoryEntry, get_dir_time, list_history
|
||||
from ..nix import nix_metadata
|
||||
from .add import EnhancedJSONEncoder, HistoryEntry, list_history
|
||||
|
||||
|
||||
def update_history() -> list[HistoryEntry]:
|
||||
@ -16,11 +16,17 @@ def update_history() -> list[HistoryEntry]:
|
||||
new_logs = []
|
||||
for entry in logs:
|
||||
new_entry = copy.deepcopy(entry)
|
||||
new_time = get_dir_time(Path(entry.path))
|
||||
if new_time != entry.dir_datetime:
|
||||
print(f"Updating {entry.path} from {entry.dir_datetime} to {new_time}")
|
||||
new_entry.dir_datetime = new_time
|
||||
|
||||
meta = nix_metadata(entry.flake.flake_url)
|
||||
new_hash = meta["locked"]["narHash"]
|
||||
if new_hash != entry.flake.nar_hash:
|
||||
print(
|
||||
f"Updating {entry.flake.flake_url} from {entry.flake.nar_hash} to {new_hash}"
|
||||
)
|
||||
new_entry.last_used = datetime.datetime.now().isoformat()
|
||||
new_entry.flake.nar_hash = new_hash
|
||||
|
||||
# TODO: Delete stale entries
|
||||
new_logs.append(new_entry)
|
||||
|
||||
with locked_open(user_history_file(), "w+") as f:
|
||||
|
@ -1,24 +1,42 @@
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from ..dirs import machines_dir
|
||||
from .types import validate_hostname
|
||||
from ..errors import ClanError
|
||||
from ..nix import nix_config, nix_eval
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def list_machines(flake_dir: Path) -> list[str]:
|
||||
path = machines_dir(flake_dir)
|
||||
log.debug(f"Listing machines in {path}")
|
||||
if not path.exists():
|
||||
return []
|
||||
objs: list[str] = []
|
||||
for f in os.listdir(path):
|
||||
if validate_hostname(f):
|
||||
objs.append(f)
|
||||
return objs
|
||||
config = nix_config()
|
||||
system = config["system"]
|
||||
cmd = nix_eval(
|
||||
[
|
||||
f"{flake_dir}#clanInternals.machines.{system}",
|
||||
"--apply",
|
||||
"builtins.attrNames",
|
||||
"--json",
|
||||
]
|
||||
)
|
||||
proc = subprocess.run(cmd, text=True, capture_output=True)
|
||||
assert proc.stdout is not None
|
||||
if proc.returncode != 0:
|
||||
raise ClanError(
|
||||
f"""
|
||||
command: {shlex.join(cmd)}
|
||||
exit code: {proc.returncode}
|
||||
stdout:
|
||||
{proc.stdout}
|
||||
stderr:
|
||||
{proc.stderr}
|
||||
"""
|
||||
)
|
||||
res = proc.stdout.strip()
|
||||
return json.loads(res)
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
|
@ -21,13 +21,13 @@ def test_history_add(
|
||||
"add",
|
||||
str(test_flake.path),
|
||||
]
|
||||
|
||||
breakpoint()
|
||||
cli.run(cmd)
|
||||
|
||||
history_file = user_history_file()
|
||||
assert history_file.exists()
|
||||
history = [HistoryEntry(**entry) for entry in json.loads(open(history_file).read())]
|
||||
assert history[0].path == str(test_flake.path)
|
||||
assert history[0].flake.flake_url == str(test_flake.path)
|
||||
|
||||
|
||||
def test_history_list(
|
||||
|
@ -11,6 +11,22 @@
|
||||
}
|
||||
],
|
||||
"settings": {
|
||||
"python.linting.mypyEnabled": true
|
||||
"python.linting.mypyEnabled": true,
|
||||
"files.exclude": {
|
||||
"**/.direnv": true,
|
||||
"**/.mypy_cache": true,
|
||||
"**/.ruff_cache": true,
|
||||
"**/.hypothesis": true,
|
||||
"**/__pycache__": true,
|
||||
"**/.reports": true
|
||||
},
|
||||
"search.exclude": {
|
||||
"**/.direnv": true,
|
||||
"**/.mypy_cache": true,
|
||||
"**/.ruff_cache": true,
|
||||
"**/.hypothesis": true,
|
||||
"**/__pycache__": true,
|
||||
"**/.reports": true
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user