1
0
forked from clan/clan-core

Compare commits

...

7 Commits

Author SHA1 Message Date
d4e57df835 Fixed pytest 2023-12-14 17:59:08 +01:00
0a9fb41785 Improved ClanURI 2023-12-12 21:31:47 +01:00
7a82f364c7 Reworked machines list, and history commands 2023-12-12 18:11:38 +01:00
9a81bc1d4e Fixed pytest 2023-12-12 14:03:10 +01:00
0358bf2d30 Fixing pytest 2023-12-12 13:56:54 +01:00
8ca573b159 Improved history command 2023-12-11 19:26:29 +01:00
21ad905285 Moved history to own subcommand 2023-12-11 19:09:34 +01:00
18 changed files with 345 additions and 256 deletions

View File

@ -6,7 +6,7 @@ from pathlib import Path
from types import ModuleType
from typing import Any
from . import backups, config, flakes, machines, secrets, vms, webui
from . import backups, config, flakes, history, machines, secrets, vms, webui
from .custom_logger import setup_logging
from .dirs import get_clan_flake_toplevel, is_clan_flake
from .ssh import cli as ssh_cli
@ -111,6 +111,9 @@ def create_parser(prog: str | None = None) -> argparse.ArgumentParser:
parser_vms = subparsers.add_parser("vms", help="manage virtual machines")
vms.register_parser(parser_vms)
parser_history = subparsers.add_parser("history", help="manage history")
history.register_parser(parser_history)
if argcomplete:
argcomplete.autocomplete(parser)

View File

@ -40,6 +40,7 @@ class ClanParameters:
class ClanURI:
# Initialize the class with a clan:// URI
def __init__(self, uri: str) -> None:
self._full_uri = uri
# Check if the URI starts with clan://
if uri.startswith("clan://"):
self._nested_uri = uri[7:]
@ -53,13 +54,13 @@ class ClanURI:
# Parse the query string into a dictionary
query = urllib.parse.parse_qs(self._components.query)
params: dict[str, str] = {}
new_params: dict[str, str] = {}
for field in dataclasses.fields(ClanParameters):
if field.name in query:
values = query[field.name]
if len(values) > 1:
raise ClanError(f"Multiple values for parameter: {field.name}")
params[field.name] = values[0]
new_params[field.name] = values[0]
# Remove the field from the query dictionary
# clan uri and nested uri share one namespace for query parameters
@ -68,7 +69,7 @@ class ClanURI:
new_query = urllib.parse.urlencode(query, doseq=True)
self._components = self._components._replace(query=new_query)
self.params = ClanParameters(**params)
self.params = ClanParameters(**new_params)
comb = (
self._components.scheme,
@ -96,10 +97,29 @@ class ClanURI:
case _:
raise ClanError(f"Unsupported uri components: {self.scheme}")
def get_full_uri(self) -> str:
return self._full_uri
@classmethod
def from_path(cls, path: Path, params: ClanParameters) -> Self: # noqa
urlparams = urllib.parse.urlencode(params.__dict__)
return cls(f"clan://{path}?{urlparams}")
def from_path(cls, path: Path, params: ClanParameters | None = None) -> Self: # noqa
return cls.from_str(str(path), params)
@classmethod
def from_str(cls, url: str, params: ClanParameters | None = None) -> Self: # noqa
prefix = "clan://"
if url.startswith(prefix):
url = url[len(prefix) :]
if params is None:
return cls(f"clan://{url}")
comp = urllib.parse.urlparse(url)
query = urllib.parse.parse_qs(comp.query)
query.update(params.__dict__)
new_query = urllib.parse.urlencode(query, doseq=True)
comp = comp._replace(query=new_query)
new_url = urllib.parse.urlunparse(comp)
return cls(f"clan://{new_url}")
def __str__(self) -> str:
return f"ClanURI({self._components.geturl()})"
return self.get_full_uri()

View File

@ -1,8 +1,6 @@
# !/usr/bin/env python3
import argparse
from clan_cli.flakes.add import register_add_parser
from clan_cli.flakes.history import register_list_parser
from clan_cli.flakes.inspect import register_inspect_parser
from .create import register_create_parser
@ -18,9 +16,5 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
)
create_parser = subparser.add_parser("create", help="Create a clan flake")
register_create_parser(create_parser)
add_parser = subparser.add_parser("add", help="Add a clan flake")
register_add_parser(add_parser)
list_parser = subparser.add_parser("list", help="List recently used flakes")
register_list_parser(list_parser)
inspect_parser = subparser.add_parser("inspect", help="Inspect a clan flake")
register_inspect_parser(inspect_parser)

View File

@ -1,22 +0,0 @@
# !/usr/bin/env python3
import argparse
from pathlib import Path
from clan_cli.flakes.history import push_history
from ..async_cmd import CmdOut, runforcli
async def add_flake(path: Path) -> dict[str, CmdOut]:
push_history(path)
return {}
def add_flake_command(args: argparse.Namespace) -> None:
runforcli(add_flake, args.path)
# takes a (sub)parser and configures it
def register_add_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument("path", type=Path, help="Path to the flake", default=Path("."))
parser.set_defaults(func=add_flake_command)

View File

@ -1,74 +0,0 @@
# !/usr/bin/env python3
import argparse
import dataclasses
import json
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
from clan_cli.dirs import user_history_file
from ..locked_open import locked_open
class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o: Any) -> Any:
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return super().default(o)
@dataclass
class HistoryEntry:
path: str
last_used: str
def list_history() -> list[HistoryEntry]:
logs: list[HistoryEntry] = []
if not user_history_file().exists():
return []
with locked_open(user_history_file(), "r") as f:
try:
content: str = f.read()
parsed: list[dict] = json.loads(content)
logs = [HistoryEntry(**p) for p in parsed]
except json.JSONDecodeError as ex:
print("Failed to load history. Invalid JSON.")
print(f"{user_history_file()}: {ex}")
return logs
def push_history(path: Path) -> list[HistoryEntry]:
user_history_file().parent.mkdir(parents=True, exist_ok=True)
logs = list_history()
found = False
with locked_open(user_history_file(), "w+") as f:
for entry in logs:
if entry.path == str(path):
found = True
entry.last_used = datetime.now().isoformat()
if not found:
logs.append(
HistoryEntry(path=str(path), last_used=datetime.now().isoformat())
)
f.write(json.dumps(logs, cls=EnhancedJSONEncoder))
f.truncate()
return logs
def list_history_command(args: argparse.Namespace) -> None:
for history_entry in list_history():
print(history_entry.path)
# takes a (sub)parser and configures it
def register_list_parser(parser: argparse.ArgumentParser) -> None:
parser.set_defaults(func=list_history_command)

View File

@ -5,6 +5,7 @@ from dataclasses import dataclass
from pathlib import Path
from ..errors import ClanError
from ..machines.list import list_machines
from ..nix import nix_config, nix_eval, nix_metadata
@ -13,6 +14,7 @@ class FlakeConfig:
flake_url: str | Path
flake_attr: str
nar_hash: str
icon: str | None
description: str | None
last_updated: str
@ -23,13 +25,19 @@ def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
config = nix_config()
system = config["system"]
machines = list_machines(flake_url)
if flake_attr not in machines:
raise ClanError(
f"Machine {flake_attr} not found in {flake_url}. Available machines: {', '.join(machines)}"
)
cmd = nix_eval(
[
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.clanIcon'
]
)
proc = subprocess.run(cmd, check=True, text=True, stdout=subprocess.PIPE)
proc = subprocess.run(cmd, text=True, capture_output=True)
assert proc.stdout is not None
if proc.returncode != 0:
raise ClanError(
@ -38,6 +46,8 @@ command: {shlex.join(cmd)}
exit code: {proc.returncode}
stdout:
{proc.stdout}
stderr:
{proc.stderr}
"""
)
res = proc.stdout.strip()
@ -51,6 +61,7 @@ stdout:
return FlakeConfig(
flake_url=flake_url,
flake_attr=flake_attr,
nar_hash=meta["locked"]["narHash"],
icon=icon_path,
description=meta.get("description"),
last_updated=meta["lastModified"],

View File

@ -0,0 +1,22 @@
# !/usr/bin/env python3
import argparse
from .add import register_add_parser
from .list import register_list_parser
from .update import register_update_parser
# takes a (sub)parser and configures it
def register_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers(
title="command",
description="the command to run",
help="the command to run",
required=True,
)
add_parser = subparser.add_parser("add", help="Add a clan flake")
register_add_parser(add_parser)
list_parser = subparser.add_parser("list", help="List recently used flakes")
register_list_parser(list_parser)
update_parser = subparser.add_parser("update", help="Update a clan flake")
register_update_parser(update_parser)

View File

@ -0,0 +1,88 @@
# !/usr/bin/env python3
import argparse
import dataclasses
import datetime
import json
from typing import Any
from clan_cli.flakes.inspect import FlakeConfig, inspect_flake
from ..clan_uri import ClanURI
from ..dirs import user_history_file
from ..locked_open import locked_open
class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o: Any) -> Any:
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return super().default(o)
@dataclasses.dataclass
class HistoryEntry:
last_used: str
flake: FlakeConfig
def __post_init__(self) -> None:
if isinstance(self.flake, dict):
self.flake = FlakeConfig(**self.flake)
def list_history() -> list[HistoryEntry]:
logs: list[HistoryEntry] = []
if not user_history_file().exists():
return []
with locked_open(user_history_file(), "r") as f:
try:
content: str = f.read()
parsed: list[dict] = json.loads(content)
logs = [HistoryEntry(**p) for p in parsed]
except json.JSONDecodeError as ex:
print("Failed to load history. Invalid JSON.")
print(f"{user_history_file()}: {ex}")
return logs
def add_history(uri: ClanURI) -> list[HistoryEntry]:
user_history_file().parent.mkdir(parents=True, exist_ok=True)
logs = list_history()
found = False
path = uri.get_internal()
machine = uri.params.flake_attr
for entry in logs:
if entry.flake.flake_url == str(path):
found = True
entry.last_used = datetime.datetime.now().isoformat()
if found:
break
flake = inspect_flake(path, machine)
flake.flake_url = str(flake.flake_url)
history = HistoryEntry(
flake=flake,
last_used=datetime.datetime.now().isoformat(),
)
logs.append(history)
with locked_open(user_history_file(), "w+") as f:
f.write(json.dumps(logs, cls=EnhancedJSONEncoder, indent=4))
f.truncate()
return logs
def add_history_command(args: argparse.Namespace) -> None:
add_history(args.uri)
# takes a (sub)parser and configures it
def register_add_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"uri", type=ClanURI.from_str, help="Path to the flake", default="."
)
parser.set_defaults(func=add_history_command)

View File

@ -0,0 +1,14 @@
# !/usr/bin/env python3
import argparse
from .add import list_history
def list_history_command(args: argparse.Namespace) -> None:
for history_entry in list_history():
print(history_entry.flake.flake_url)
# takes a (sub)parser and configures it
def register_list_parser(parser: argparse.ArgumentParser) -> None:
parser.set_defaults(func=list_history_command)

View File

@ -0,0 +1,44 @@
# !/usr/bin/env python3
import argparse
import copy
import datetime
import json
from ..dirs import user_history_file
from ..locked_open import locked_open
from ..nix import nix_metadata
from .add import EnhancedJSONEncoder, HistoryEntry, list_history
def update_history() -> list[HistoryEntry]:
logs = list_history()
new_logs = []
for entry in logs:
new_entry = copy.deepcopy(entry)
meta = nix_metadata(entry.flake.flake_url)
new_hash = meta["locked"]["narHash"]
if new_hash != entry.flake.nar_hash:
print(
f"Updating {entry.flake.flake_url} from {entry.flake.nar_hash} to {new_hash}"
)
new_entry.last_used = datetime.datetime.now().isoformat()
new_entry.flake.nar_hash = new_hash
# TODO: Delete stale entries
new_logs.append(new_entry)
with locked_open(user_history_file(), "w+") as f:
f.write(json.dumps(new_logs, cls=EnhancedJSONEncoder, indent=4))
f.truncate()
return new_logs
def add_update_command(args: argparse.Namespace) -> None:
update_history()
# takes a (sub)parser and configures it
def register_update_parser(parser: argparse.ArgumentParser) -> None:
parser.set_defaults(func=add_update_command)

View File

@ -1,24 +1,42 @@
import argparse
import json
import logging
import os
import shlex
import subprocess
from pathlib import Path
from ..dirs import machines_dir
from .types import validate_hostname
from ..errors import ClanError
from ..nix import nix_config, nix_eval
log = logging.getLogger(__name__)
def list_machines(flake_dir: Path) -> list[str]:
path = machines_dir(flake_dir)
log.debug(f"Listing machines in {path}")
if not path.exists():
return []
objs: list[str] = []
for f in os.listdir(path):
if validate_hostname(f):
objs.append(f)
return objs
def list_machines(flake_url: Path | str) -> list[str]:
config = nix_config()
system = config["system"]
cmd = nix_eval(
[
f"{flake_url}#clanInternals.machines.{system}",
"--apply",
"builtins.attrNames",
"--json",
]
)
proc = subprocess.run(cmd, text=True, capture_output=True)
assert proc.stdout is not None
if proc.returncode != 0:
raise ClanError(
f"""
command: {shlex.join(cmd)}
exit code: {proc.returncode}
stdout:
{proc.stdout}
stderr:
{proc.stderr}
"""
)
res = proc.stdout.strip()
return json.loads(res)
def list_command(args: argparse.Namespace) -> None:

View File

@ -17,7 +17,7 @@ from clan_cli.webui.api_outputs import (
)
from ...async_cmd import run
from ...flakes import add, create
from ...flakes import create
from ...nix import nix_command, nix_flake_show
from ..tags import Tags
@ -45,16 +45,6 @@ async def get_attrs(url: AnyUrl | Path) -> list[str]:
return flake_attrs
@router.post("/api/flake/history", tags=[Tags.flake])
async def flake_history_append(flake_dir: Path) -> None:
await add.add_flake(flake_dir)
@router.get("/api/flake/history", tags=[Tags.flake])
async def flake_history_list() -> list[Path]:
return []
# TODO: Check for directory traversal
@router.get("/api/flake/attrs", tags=[Tags.flake])
async def inspect_flake_attrs(url: AnyUrl | Path) -> FlakeAttrResponse:

View File

@ -101,6 +101,36 @@ def test_from_path_with_default() -> None:
assert False
def test_from_str() -> None:
# Create a ClanURI object from a remote URI with parameters
uri_str = "https://example.com?password=asdasd&test=1234"
params = ClanParameters(flake_attr="myVM")
uri = ClanURI.from_str(url=uri_str, params=params)
assert uri.params.flake_attr == "myVM"
match uri.scheme:
case ClanScheme.HTTP.value(url):
assert url == "https://example.com?password=asdasd&test=1234" # type: ignore
case _:
assert False
uri_str = "~/Downloads/democlan"
params = ClanParameters(flake_attr="myVM")
uri = ClanURI.from_str(url=uri_str, params=params)
assert uri.params.flake_attr == "myVM"
assert uri.get_internal() == "~/Downloads/democlan"
uri_str = "~/Downloads/democlan"
uri = ClanURI.from_str(url=uri_str)
assert uri.params.flake_attr == "defaultVM"
assert uri.get_internal() == "~/Downloads/democlan"
uri_str = "clan://~/Downloads/democlan"
uri = ClanURI.from_str(url=uri_str)
assert uri.params.flake_attr == "defaultVM"
assert uri.get_internal() == "~/Downloads/democlan"
def test_remote_with_all_params() -> None:
# Create a ClanURI object from a remote URI with parameters
uri = ClanURI("clan://https://example.com?flake_attr=myVM&password=1234")

View File

@ -4,48 +4,10 @@ import logging
import pytest
from api import TestClient
from fixtures_flakes import FlakeForTest
from path import Path
from clan_cli.dirs import user_history_file
log = logging.getLogger(__name__)
def test_flake_history_append(
api: TestClient, test_flake: FlakeForTest, temporary_home: Path
) -> None:
response = api.post(
f"/api/flake/history?flake_dir={test_flake.path!s}",
json={},
)
assert response.status_code == 200, response.json()
assert user_history_file().exists()
# def test_flake_history_list(
# api: TestClient, test_flake: FlakeForTest, temporary_home: Path
# ) -> None:
# response = api.get(
# "/api/flake/history",
# )
# assert response.status_code == 200, response.text
# assert response.json() == []
# # add the test_flake
# response = api.post(
# f"/api/flake/history?flake_dir={test_flake.path!s}",
# json={},
# )
# assert response.status_code == 200, response.text
# # list the flakes again
# response = api.get(
# "/api/flake/history",
# )
# assert response.status_code == 200, response.text
# assert response.json() == [str(test_flake.path)]
@pytest.mark.impure
def test_inspect_ok(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
params = {"url": str(test_flake_with_core.path)}

View File

@ -1,54 +1,13 @@
import json
from typing import TYPE_CHECKING
import pytest
from cli import Cli
from fixtures_flakes import FlakeForTest
from pytest import CaptureFixture
from clan_cli.dirs import user_history_file
from clan_cli.flakes.history import HistoryEntry
if TYPE_CHECKING:
pass
def test_flakes_add(
test_flake: FlakeForTest,
) -> None:
cli = Cli()
cmd = [
"flakes",
"add",
str(test_flake.path),
]
cli.run(cmd)
history_file = user_history_file()
assert history_file.exists()
history = [HistoryEntry(**entry) for entry in json.loads(open(history_file).read())]
assert history[0].path == str(test_flake.path)
def test_flakes_list(
capsys: CaptureFixture,
test_flake: FlakeForTest,
) -> None:
cli = Cli()
cmd = [
"flakes",
"list",
]
cli.run(cmd)
assert str(test_flake.path) not in capsys.readouterr().out
cli.run(["flakes", "add", str(test_flake.path)])
cli.run(cmd)
assert str(test_flake.path) in capsys.readouterr().out
@pytest.mark.impure
def test_flakes_inspect(
test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture

View File

@ -0,0 +1,53 @@
import json
from typing import TYPE_CHECKING
from cli import Cli
from fixtures_flakes import FlakeForTest
from pytest import CaptureFixture
from clan_cli.clan_uri import ClanParameters, ClanURI
from clan_cli.dirs import user_history_file
from clan_cli.history.add import HistoryEntry
if TYPE_CHECKING:
pass
def test_history_add(
test_flake_with_core: FlakeForTest,
) -> None:
cli = Cli()
params = ClanParameters(flake_attr="vm1")
uri = ClanURI.from_path(test_flake_with_core.path, params=params)
cmd = [
"history",
"add",
str(uri),
]
cli.run(cmd)
history_file = user_history_file()
assert history_file.exists()
history = [HistoryEntry(**entry) for entry in json.loads(open(history_file).read())]
assert history[0].flake.flake_url == str(test_flake_with_core.path)
def test_history_list(
capsys: CaptureFixture,
test_flake_with_core: FlakeForTest,
) -> None:
cli = Cli()
params = ClanParameters(flake_attr="vm1")
uri = ClanURI.from_path(test_flake_with_core.path, params=params)
cmd = [
"history",
"list",
]
cli.run(cmd)
assert str(test_flake_with_core.path) not in capsys.readouterr().out
cli.run(["history", "add", str(uri)])
cli.run(cmd)
assert str(test_flake_with_core.path) in capsys.readouterr().out

View File

@ -11,6 +11,22 @@
}
],
"settings": {
"python.linting.mypyEnabled": true
"python.linting.mypyEnabled": true,
"files.exclude": {
"**/.direnv": true,
"**/.mypy_cache": true,
"**/.ruff_cache": true,
"**/.hypothesis": true,
"**/__pycache__": true,
"**/.reports": true
},
"search.exclude": {
"**/.direnv": true,
"**/.mypy_cache": true,
"**/.ruff_cache": true,
"**/.hypothesis": true,
"**/__pycache__": true,
"**/.reports": true
}
}
}

View File

@ -4,7 +4,7 @@ from pathlib import Path
from typing import Any
import gi
from clan_cli import flakes, vms
from clan_cli import flakes, vms, history
gi.require_version("GdkPixbuf", "2.0")
from gi.repository import GdkPixbuf
@ -73,50 +73,11 @@ class VM:
# start/end indexes can be used optionally for pagination
def get_initial_vms(start: int = 0, end: int | None = None) -> list[VM]:
# vms = [
# VM(
# base=VMBase(
# icon=assets.loc / "cybernet.jpeg",
# name="Cybernet Clan",
# url="clan://cybernet.lol",
# _path=Path(__file__).parent.parent / "test_democlan",
# status=False,
# ),
# ),
# VM(
# base=VMBase(
# icon=assets.loc / "zenith.jpeg",
# name="Zenith Clan",
# url="clan://zenith.lol",
# _path=Path(__file__).parent.parent / "test_democlan",
# status=False,
# )
# ),
# VM(
# base=VMBase(
# icon=assets.loc / "firestorm.jpeg",
# name="Firestorm Clan",
# url="clan://firestorm.lol",
# _path=Path(__file__).parent.parent / "test_democlan",
# status=False,
# ),
# ),
# VM(
# base=VMBase(
# icon=assets.loc / "placeholder.jpeg",
# name="Placeholder Clan",
# url="clan://demo.lol",
# _path=Path(__file__).parent.parent / "test_democlan",
# status=True,
# ),
# ),
# ]
vm_list = []
# TODO: list_history() should return a list of dicts, not a list of paths
# Execute `clan flakes add <path>` to democlan for this to work
for entry in flakes.history.list_history():
for entry in history.list.list_history():
flake_config = flakes.inspect.inspect_flake(entry.path, "defaultVM")
vm_config = vms.inspect.inspect_vm(entry.path, "defaultVM")