Compare commits
7 Commits
main
...
Luis-Heben
Author | SHA1 | Date | |
---|---|---|---|
d4e57df835 | |||
0a9fb41785 | |||
7a82f364c7 | |||
9a81bc1d4e | |||
0358bf2d30 | |||
8ca573b159 | |||
21ad905285 |
@ -6,7 +6,7 @@ from pathlib import Path
|
||||
from types import ModuleType
|
||||
from typing import Any
|
||||
|
||||
from . import backups, config, flakes, machines, secrets, vms, webui
|
||||
from . import backups, config, flakes, history, machines, secrets, vms, webui
|
||||
from .custom_logger import setup_logging
|
||||
from .dirs import get_clan_flake_toplevel, is_clan_flake
|
||||
from .ssh import cli as ssh_cli
|
||||
@ -111,6 +111,9 @@ def create_parser(prog: str | None = None) -> argparse.ArgumentParser:
|
||||
parser_vms = subparsers.add_parser("vms", help="manage virtual machines")
|
||||
vms.register_parser(parser_vms)
|
||||
|
||||
parser_history = subparsers.add_parser("history", help="manage history")
|
||||
history.register_parser(parser_history)
|
||||
|
||||
if argcomplete:
|
||||
argcomplete.autocomplete(parser)
|
||||
|
||||
|
@ -40,6 +40,7 @@ class ClanParameters:
|
||||
class ClanURI:
|
||||
# Initialize the class with a clan:// URI
|
||||
def __init__(self, uri: str) -> None:
|
||||
self._full_uri = uri
|
||||
# Check if the URI starts with clan://
|
||||
if uri.startswith("clan://"):
|
||||
self._nested_uri = uri[7:]
|
||||
@ -53,13 +54,13 @@ class ClanURI:
|
||||
# Parse the query string into a dictionary
|
||||
query = urllib.parse.parse_qs(self._components.query)
|
||||
|
||||
params: dict[str, str] = {}
|
||||
new_params: dict[str, str] = {}
|
||||
for field in dataclasses.fields(ClanParameters):
|
||||
if field.name in query:
|
||||
values = query[field.name]
|
||||
if len(values) > 1:
|
||||
raise ClanError(f"Multiple values for parameter: {field.name}")
|
||||
params[field.name] = values[0]
|
||||
new_params[field.name] = values[0]
|
||||
|
||||
# Remove the field from the query dictionary
|
||||
# clan uri and nested uri share one namespace for query parameters
|
||||
@ -68,7 +69,7 @@ class ClanURI:
|
||||
|
||||
new_query = urllib.parse.urlencode(query, doseq=True)
|
||||
self._components = self._components._replace(query=new_query)
|
||||
self.params = ClanParameters(**params)
|
||||
self.params = ClanParameters(**new_params)
|
||||
|
||||
comb = (
|
||||
self._components.scheme,
|
||||
@ -96,10 +97,29 @@ class ClanURI:
|
||||
case _:
|
||||
raise ClanError(f"Unsupported uri components: {self.scheme}")
|
||||
|
||||
def get_full_uri(self) -> str:
|
||||
return self._full_uri
|
||||
|
||||
@classmethod
|
||||
def from_path(cls, path: Path, params: ClanParameters) -> Self: # noqa
|
||||
urlparams = urllib.parse.urlencode(params.__dict__)
|
||||
return cls(f"clan://{path}?{urlparams}")
|
||||
def from_path(cls, path: Path, params: ClanParameters | None = None) -> Self: # noqa
|
||||
return cls.from_str(str(path), params)
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, url: str, params: ClanParameters | None = None) -> Self: # noqa
|
||||
prefix = "clan://"
|
||||
if url.startswith(prefix):
|
||||
url = url[len(prefix) :]
|
||||
|
||||
if params is None:
|
||||
return cls(f"clan://{url}")
|
||||
|
||||
comp = urllib.parse.urlparse(url)
|
||||
query = urllib.parse.parse_qs(comp.query)
|
||||
query.update(params.__dict__)
|
||||
new_query = urllib.parse.urlencode(query, doseq=True)
|
||||
comp = comp._replace(query=new_query)
|
||||
new_url = urllib.parse.urlunparse(comp)
|
||||
return cls(f"clan://{new_url}")
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"ClanURI({self._components.geturl()})"
|
||||
return self.get_full_uri()
|
||||
|
@ -1,8 +1,6 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
|
||||
from clan_cli.flakes.add import register_add_parser
|
||||
from clan_cli.flakes.history import register_list_parser
|
||||
from clan_cli.flakes.inspect import register_inspect_parser
|
||||
|
||||
from .create import register_create_parser
|
||||
@ -18,9 +16,5 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
)
|
||||
create_parser = subparser.add_parser("create", help="Create a clan flake")
|
||||
register_create_parser(create_parser)
|
||||
add_parser = subparser.add_parser("add", help="Add a clan flake")
|
||||
register_add_parser(add_parser)
|
||||
list_parser = subparser.add_parser("list", help="List recently used flakes")
|
||||
register_list_parser(list_parser)
|
||||
inspect_parser = subparser.add_parser("inspect", help="Inspect a clan flake")
|
||||
register_inspect_parser(inspect_parser)
|
||||
|
@ -1,22 +0,0 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.flakes.history import push_history
|
||||
|
||||
from ..async_cmd import CmdOut, runforcli
|
||||
|
||||
|
||||
async def add_flake(path: Path) -> dict[str, CmdOut]:
|
||||
push_history(path)
|
||||
return {}
|
||||
|
||||
|
||||
def add_flake_command(args: argparse.Namespace) -> None:
|
||||
runforcli(add_flake, args.path)
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
def register_add_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument("path", type=Path, help="Path to the flake", default=Path("."))
|
||||
parser.set_defaults(func=add_flake_command)
|
@ -1,74 +0,0 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
import dataclasses
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.dirs import user_history_file
|
||||
|
||||
from ..locked_open import locked_open
|
||||
|
||||
|
||||
class EnhancedJSONEncoder(json.JSONEncoder):
|
||||
def default(self, o: Any) -> Any:
|
||||
if dataclasses.is_dataclass(o):
|
||||
return dataclasses.asdict(o)
|
||||
return super().default(o)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HistoryEntry:
|
||||
path: str
|
||||
last_used: str
|
||||
|
||||
|
||||
def list_history() -> list[HistoryEntry]:
|
||||
logs: list[HistoryEntry] = []
|
||||
if not user_history_file().exists():
|
||||
return []
|
||||
|
||||
with locked_open(user_history_file(), "r") as f:
|
||||
try:
|
||||
content: str = f.read()
|
||||
parsed: list[dict] = json.loads(content)
|
||||
logs = [HistoryEntry(**p) for p in parsed]
|
||||
except json.JSONDecodeError as ex:
|
||||
print("Failed to load history. Invalid JSON.")
|
||||
print(f"{user_history_file()}: {ex}")
|
||||
|
||||
return logs
|
||||
|
||||
|
||||
def push_history(path: Path) -> list[HistoryEntry]:
|
||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||
logs = list_history()
|
||||
|
||||
found = False
|
||||
with locked_open(user_history_file(), "w+") as f:
|
||||
for entry in logs:
|
||||
if entry.path == str(path):
|
||||
found = True
|
||||
entry.last_used = datetime.now().isoformat()
|
||||
|
||||
if not found:
|
||||
logs.append(
|
||||
HistoryEntry(path=str(path), last_used=datetime.now().isoformat())
|
||||
)
|
||||
|
||||
f.write(json.dumps(logs, cls=EnhancedJSONEncoder))
|
||||
f.truncate()
|
||||
|
||||
return logs
|
||||
|
||||
|
||||
def list_history_command(args: argparse.Namespace) -> None:
|
||||
for history_entry in list_history():
|
||||
print(history_entry.path)
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
def register_list_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.set_defaults(func=list_history_command)
|
@ -5,6 +5,7 @@ from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from ..errors import ClanError
|
||||
from ..machines.list import list_machines
|
||||
from ..nix import nix_config, nix_eval, nix_metadata
|
||||
|
||||
|
||||
@ -13,6 +14,7 @@ class FlakeConfig:
|
||||
flake_url: str | Path
|
||||
flake_attr: str
|
||||
|
||||
nar_hash: str
|
||||
icon: str | None
|
||||
description: str | None
|
||||
last_updated: str
|
||||
@ -23,13 +25,19 @@ def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
|
||||
config = nix_config()
|
||||
system = config["system"]
|
||||
|
||||
machines = list_machines(flake_url)
|
||||
if flake_attr not in machines:
|
||||
raise ClanError(
|
||||
f"Machine {flake_attr} not found in {flake_url}. Available machines: {', '.join(machines)}"
|
||||
)
|
||||
|
||||
cmd = nix_eval(
|
||||
[
|
||||
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.clanIcon'
|
||||
]
|
||||
)
|
||||
|
||||
proc = subprocess.run(cmd, check=True, text=True, stdout=subprocess.PIPE)
|
||||
proc = subprocess.run(cmd, text=True, capture_output=True)
|
||||
assert proc.stdout is not None
|
||||
if proc.returncode != 0:
|
||||
raise ClanError(
|
||||
@ -38,6 +46,8 @@ command: {shlex.join(cmd)}
|
||||
exit code: {proc.returncode}
|
||||
stdout:
|
||||
{proc.stdout}
|
||||
stderr:
|
||||
{proc.stderr}
|
||||
"""
|
||||
)
|
||||
res = proc.stdout.strip()
|
||||
@ -51,6 +61,7 @@ stdout:
|
||||
return FlakeConfig(
|
||||
flake_url=flake_url,
|
||||
flake_attr=flake_attr,
|
||||
nar_hash=meta["locked"]["narHash"],
|
||||
icon=icon_path,
|
||||
description=meta.get("description"),
|
||||
last_updated=meta["lastModified"],
|
||||
|
22
pkgs/clan-cli/clan_cli/history/__init__.py
Normal file
22
pkgs/clan-cli/clan_cli/history/__init__.py
Normal file
@ -0,0 +1,22 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
|
||||
from .add import register_add_parser
|
||||
from .list import register_list_parser
|
||||
from .update import register_update_parser
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
subparser = parser.add_subparsers(
|
||||
title="command",
|
||||
description="the command to run",
|
||||
help="the command to run",
|
||||
required=True,
|
||||
)
|
||||
add_parser = subparser.add_parser("add", help="Add a clan flake")
|
||||
register_add_parser(add_parser)
|
||||
list_parser = subparser.add_parser("list", help="List recently used flakes")
|
||||
register_list_parser(list_parser)
|
||||
update_parser = subparser.add_parser("update", help="Update a clan flake")
|
||||
register_update_parser(update_parser)
|
88
pkgs/clan-cli/clan_cli/history/add.py
Normal file
88
pkgs/clan-cli/clan_cli/history/add.py
Normal file
@ -0,0 +1,88 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
import dataclasses
|
||||
import datetime
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.flakes.inspect import FlakeConfig, inspect_flake
|
||||
|
||||
from ..clan_uri import ClanURI
|
||||
from ..dirs import user_history_file
|
||||
from ..locked_open import locked_open
|
||||
|
||||
|
||||
class EnhancedJSONEncoder(json.JSONEncoder):
|
||||
def default(self, o: Any) -> Any:
|
||||
if dataclasses.is_dataclass(o):
|
||||
return dataclasses.asdict(o)
|
||||
return super().default(o)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class HistoryEntry:
|
||||
last_used: str
|
||||
flake: FlakeConfig
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if isinstance(self.flake, dict):
|
||||
self.flake = FlakeConfig(**self.flake)
|
||||
|
||||
|
||||
def list_history() -> list[HistoryEntry]:
|
||||
logs: list[HistoryEntry] = []
|
||||
if not user_history_file().exists():
|
||||
return []
|
||||
|
||||
with locked_open(user_history_file(), "r") as f:
|
||||
try:
|
||||
content: str = f.read()
|
||||
parsed: list[dict] = json.loads(content)
|
||||
logs = [HistoryEntry(**p) for p in parsed]
|
||||
except json.JSONDecodeError as ex:
|
||||
print("Failed to load history. Invalid JSON.")
|
||||
print(f"{user_history_file()}: {ex}")
|
||||
|
||||
return logs
|
||||
|
||||
|
||||
def add_history(uri: ClanURI) -> list[HistoryEntry]:
|
||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||
logs = list_history()
|
||||
found = False
|
||||
path = uri.get_internal()
|
||||
machine = uri.params.flake_attr
|
||||
|
||||
for entry in logs:
|
||||
if entry.flake.flake_url == str(path):
|
||||
found = True
|
||||
entry.last_used = datetime.datetime.now().isoformat()
|
||||
|
||||
if found:
|
||||
break
|
||||
|
||||
flake = inspect_flake(path, machine)
|
||||
flake.flake_url = str(flake.flake_url)
|
||||
history = HistoryEntry(
|
||||
flake=flake,
|
||||
last_used=datetime.datetime.now().isoformat(),
|
||||
)
|
||||
logs.append(history)
|
||||
|
||||
with locked_open(user_history_file(), "w+") as f:
|
||||
f.write(json.dumps(logs, cls=EnhancedJSONEncoder, indent=4))
|
||||
f.truncate()
|
||||
|
||||
return logs
|
||||
|
||||
|
||||
def add_history_command(args: argparse.Namespace) -> None:
|
||||
add_history(args.uri)
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
def register_add_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument(
|
||||
"uri", type=ClanURI.from_str, help="Path to the flake", default="."
|
||||
)
|
||||
parser.set_defaults(func=add_history_command)
|
14
pkgs/clan-cli/clan_cli/history/list.py
Normal file
14
pkgs/clan-cli/clan_cli/history/list.py
Normal file
@ -0,0 +1,14 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
|
||||
from .add import list_history
|
||||
|
||||
|
||||
def list_history_command(args: argparse.Namespace) -> None:
|
||||
for history_entry in list_history():
|
||||
print(history_entry.flake.flake_url)
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
def register_list_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.set_defaults(func=list_history_command)
|
44
pkgs/clan-cli/clan_cli/history/update.py
Normal file
44
pkgs/clan-cli/clan_cli/history/update.py
Normal file
@ -0,0 +1,44 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
import copy
|
||||
import datetime
|
||||
import json
|
||||
|
||||
from ..dirs import user_history_file
|
||||
from ..locked_open import locked_open
|
||||
from ..nix import nix_metadata
|
||||
from .add import EnhancedJSONEncoder, HistoryEntry, list_history
|
||||
|
||||
|
||||
def update_history() -> list[HistoryEntry]:
|
||||
logs = list_history()
|
||||
|
||||
new_logs = []
|
||||
for entry in logs:
|
||||
new_entry = copy.deepcopy(entry)
|
||||
|
||||
meta = nix_metadata(entry.flake.flake_url)
|
||||
new_hash = meta["locked"]["narHash"]
|
||||
if new_hash != entry.flake.nar_hash:
|
||||
print(
|
||||
f"Updating {entry.flake.flake_url} from {entry.flake.nar_hash} to {new_hash}"
|
||||
)
|
||||
new_entry.last_used = datetime.datetime.now().isoformat()
|
||||
new_entry.flake.nar_hash = new_hash
|
||||
|
||||
# TODO: Delete stale entries
|
||||
new_logs.append(new_entry)
|
||||
|
||||
with locked_open(user_history_file(), "w+") as f:
|
||||
f.write(json.dumps(new_logs, cls=EnhancedJSONEncoder, indent=4))
|
||||
f.truncate()
|
||||
return new_logs
|
||||
|
||||
|
||||
def add_update_command(args: argparse.Namespace) -> None:
|
||||
update_history()
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
def register_update_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.set_defaults(func=add_update_command)
|
@ -1,24 +1,42 @@
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from ..dirs import machines_dir
|
||||
from .types import validate_hostname
|
||||
from ..errors import ClanError
|
||||
from ..nix import nix_config, nix_eval
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def list_machines(flake_dir: Path) -> list[str]:
|
||||
path = machines_dir(flake_dir)
|
||||
log.debug(f"Listing machines in {path}")
|
||||
if not path.exists():
|
||||
return []
|
||||
objs: list[str] = []
|
||||
for f in os.listdir(path):
|
||||
if validate_hostname(f):
|
||||
objs.append(f)
|
||||
return objs
|
||||
def list_machines(flake_url: Path | str) -> list[str]:
|
||||
config = nix_config()
|
||||
system = config["system"]
|
||||
cmd = nix_eval(
|
||||
[
|
||||
f"{flake_url}#clanInternals.machines.{system}",
|
||||
"--apply",
|
||||
"builtins.attrNames",
|
||||
"--json",
|
||||
]
|
||||
)
|
||||
proc = subprocess.run(cmd, text=True, capture_output=True)
|
||||
assert proc.stdout is not None
|
||||
if proc.returncode != 0:
|
||||
raise ClanError(
|
||||
f"""
|
||||
command: {shlex.join(cmd)}
|
||||
exit code: {proc.returncode}
|
||||
stdout:
|
||||
{proc.stdout}
|
||||
stderr:
|
||||
{proc.stderr}
|
||||
"""
|
||||
)
|
||||
res = proc.stdout.strip()
|
||||
return json.loads(res)
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
|
@ -17,7 +17,7 @@ from clan_cli.webui.api_outputs import (
|
||||
)
|
||||
|
||||
from ...async_cmd import run
|
||||
from ...flakes import add, create
|
||||
from ...flakes import create
|
||||
from ...nix import nix_command, nix_flake_show
|
||||
from ..tags import Tags
|
||||
|
||||
@ -45,16 +45,6 @@ async def get_attrs(url: AnyUrl | Path) -> list[str]:
|
||||
return flake_attrs
|
||||
|
||||
|
||||
@router.post("/api/flake/history", tags=[Tags.flake])
|
||||
async def flake_history_append(flake_dir: Path) -> None:
|
||||
await add.add_flake(flake_dir)
|
||||
|
||||
|
||||
@router.get("/api/flake/history", tags=[Tags.flake])
|
||||
async def flake_history_list() -> list[Path]:
|
||||
return []
|
||||
|
||||
|
||||
# TODO: Check for directory traversal
|
||||
@router.get("/api/flake/attrs", tags=[Tags.flake])
|
||||
async def inspect_flake_attrs(url: AnyUrl | Path) -> FlakeAttrResponse:
|
||||
|
@ -101,6 +101,36 @@ def test_from_path_with_default() -> None:
|
||||
assert False
|
||||
|
||||
|
||||
def test_from_str() -> None:
|
||||
# Create a ClanURI object from a remote URI with parameters
|
||||
uri_str = "https://example.com?password=asdasd&test=1234"
|
||||
params = ClanParameters(flake_attr="myVM")
|
||||
uri = ClanURI.from_str(url=uri_str, params=params)
|
||||
assert uri.params.flake_attr == "myVM"
|
||||
|
||||
match uri.scheme:
|
||||
case ClanScheme.HTTP.value(url):
|
||||
assert url == "https://example.com?password=asdasd&test=1234" # type: ignore
|
||||
case _:
|
||||
assert False
|
||||
|
||||
uri_str = "~/Downloads/democlan"
|
||||
params = ClanParameters(flake_attr="myVM")
|
||||
uri = ClanURI.from_str(url=uri_str, params=params)
|
||||
assert uri.params.flake_attr == "myVM"
|
||||
assert uri.get_internal() == "~/Downloads/democlan"
|
||||
|
||||
uri_str = "~/Downloads/democlan"
|
||||
uri = ClanURI.from_str(url=uri_str)
|
||||
assert uri.params.flake_attr == "defaultVM"
|
||||
assert uri.get_internal() == "~/Downloads/democlan"
|
||||
|
||||
uri_str = "clan://~/Downloads/democlan"
|
||||
uri = ClanURI.from_str(url=uri_str)
|
||||
assert uri.params.flake_attr == "defaultVM"
|
||||
assert uri.get_internal() == "~/Downloads/democlan"
|
||||
|
||||
|
||||
def test_remote_with_all_params() -> None:
|
||||
# Create a ClanURI object from a remote URI with parameters
|
||||
uri = ClanURI("clan://https://example.com?flake_attr=myVM&password=1234")
|
||||
|
@ -4,48 +4,10 @@ import logging
|
||||
import pytest
|
||||
from api import TestClient
|
||||
from fixtures_flakes import FlakeForTest
|
||||
from path import Path
|
||||
|
||||
from clan_cli.dirs import user_history_file
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_flake_history_append(
|
||||
api: TestClient, test_flake: FlakeForTest, temporary_home: Path
|
||||
) -> None:
|
||||
response = api.post(
|
||||
f"/api/flake/history?flake_dir={test_flake.path!s}",
|
||||
json={},
|
||||
)
|
||||
assert response.status_code == 200, response.json()
|
||||
assert user_history_file().exists()
|
||||
|
||||
|
||||
# def test_flake_history_list(
|
||||
# api: TestClient, test_flake: FlakeForTest, temporary_home: Path
|
||||
# ) -> None:
|
||||
# response = api.get(
|
||||
# "/api/flake/history",
|
||||
# )
|
||||
# assert response.status_code == 200, response.text
|
||||
# assert response.json() == []
|
||||
|
||||
# # add the test_flake
|
||||
# response = api.post(
|
||||
# f"/api/flake/history?flake_dir={test_flake.path!s}",
|
||||
# json={},
|
||||
# )
|
||||
# assert response.status_code == 200, response.text
|
||||
|
||||
# # list the flakes again
|
||||
# response = api.get(
|
||||
# "/api/flake/history",
|
||||
# )
|
||||
# assert response.status_code == 200, response.text
|
||||
# assert response.json() == [str(test_flake.path)]
|
||||
|
||||
|
||||
@pytest.mark.impure
|
||||
def test_inspect_ok(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
|
||||
params = {"url": str(test_flake_with_core.path)}
|
||||
|
@ -1,54 +1,13 @@
|
||||
import json
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from cli import Cli
|
||||
from fixtures_flakes import FlakeForTest
|
||||
from pytest import CaptureFixture
|
||||
|
||||
from clan_cli.dirs import user_history_file
|
||||
from clan_cli.flakes.history import HistoryEntry
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
|
||||
def test_flakes_add(
|
||||
test_flake: FlakeForTest,
|
||||
) -> None:
|
||||
cli = Cli()
|
||||
cmd = [
|
||||
"flakes",
|
||||
"add",
|
||||
str(test_flake.path),
|
||||
]
|
||||
|
||||
cli.run(cmd)
|
||||
|
||||
history_file = user_history_file()
|
||||
assert history_file.exists()
|
||||
history = [HistoryEntry(**entry) for entry in json.loads(open(history_file).read())]
|
||||
assert history[0].path == str(test_flake.path)
|
||||
|
||||
|
||||
def test_flakes_list(
|
||||
capsys: CaptureFixture,
|
||||
test_flake: FlakeForTest,
|
||||
) -> None:
|
||||
cli = Cli()
|
||||
cmd = [
|
||||
"flakes",
|
||||
"list",
|
||||
]
|
||||
|
||||
cli.run(cmd)
|
||||
assert str(test_flake.path) not in capsys.readouterr().out
|
||||
|
||||
cli.run(["flakes", "add", str(test_flake.path)])
|
||||
cli.run(cmd)
|
||||
assert str(test_flake.path) in capsys.readouterr().out
|
||||
|
||||
|
||||
@pytest.mark.impure
|
||||
def test_flakes_inspect(
|
||||
test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture
|
||||
|
53
pkgs/clan-cli/tests/test_history_cli.py
Normal file
53
pkgs/clan-cli/tests/test_history_cli.py
Normal file
@ -0,0 +1,53 @@
|
||||
import json
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from cli import Cli
|
||||
from fixtures_flakes import FlakeForTest
|
||||
from pytest import CaptureFixture
|
||||
|
||||
from clan_cli.clan_uri import ClanParameters, ClanURI
|
||||
from clan_cli.dirs import user_history_file
|
||||
from clan_cli.history.add import HistoryEntry
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
|
||||
def test_history_add(
|
||||
test_flake_with_core: FlakeForTest,
|
||||
) -> None:
|
||||
cli = Cli()
|
||||
params = ClanParameters(flake_attr="vm1")
|
||||
uri = ClanURI.from_path(test_flake_with_core.path, params=params)
|
||||
cmd = [
|
||||
"history",
|
||||
"add",
|
||||
str(uri),
|
||||
]
|
||||
|
||||
cli.run(cmd)
|
||||
|
||||
history_file = user_history_file()
|
||||
assert history_file.exists()
|
||||
history = [HistoryEntry(**entry) for entry in json.loads(open(history_file).read())]
|
||||
assert history[0].flake.flake_url == str(test_flake_with_core.path)
|
||||
|
||||
|
||||
def test_history_list(
|
||||
capsys: CaptureFixture,
|
||||
test_flake_with_core: FlakeForTest,
|
||||
) -> None:
|
||||
cli = Cli()
|
||||
params = ClanParameters(flake_attr="vm1")
|
||||
uri = ClanURI.from_path(test_flake_with_core.path, params=params)
|
||||
cmd = [
|
||||
"history",
|
||||
"list",
|
||||
]
|
||||
|
||||
cli.run(cmd)
|
||||
assert str(test_flake_with_core.path) not in capsys.readouterr().out
|
||||
|
||||
cli.run(["history", "add", str(uri)])
|
||||
cli.run(cmd)
|
||||
assert str(test_flake_with_core.path) in capsys.readouterr().out
|
@ -11,6 +11,22 @@
|
||||
}
|
||||
],
|
||||
"settings": {
|
||||
"python.linting.mypyEnabled": true
|
||||
"python.linting.mypyEnabled": true,
|
||||
"files.exclude": {
|
||||
"**/.direnv": true,
|
||||
"**/.mypy_cache": true,
|
||||
"**/.ruff_cache": true,
|
||||
"**/.hypothesis": true,
|
||||
"**/__pycache__": true,
|
||||
"**/.reports": true
|
||||
},
|
||||
"search.exclude": {
|
||||
"**/.direnv": true,
|
||||
"**/.mypy_cache": true,
|
||||
"**/.ruff_cache": true,
|
||||
"**/.hypothesis": true,
|
||||
"**/__pycache__": true,
|
||||
"**/.reports": true
|
||||
}
|
||||
}
|
||||
}
|
@ -4,7 +4,7 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
from clan_cli import flakes, vms
|
||||
from clan_cli import flakes, vms, history
|
||||
|
||||
gi.require_version("GdkPixbuf", "2.0")
|
||||
from gi.repository import GdkPixbuf
|
||||
@ -73,50 +73,11 @@ class VM:
|
||||
|
||||
# start/end indexes can be used optionally for pagination
|
||||
def get_initial_vms(start: int = 0, end: int | None = None) -> list[VM]:
|
||||
# vms = [
|
||||
# VM(
|
||||
# base=VMBase(
|
||||
# icon=assets.loc / "cybernet.jpeg",
|
||||
# name="Cybernet Clan",
|
||||
# url="clan://cybernet.lol",
|
||||
# _path=Path(__file__).parent.parent / "test_democlan",
|
||||
# status=False,
|
||||
# ),
|
||||
# ),
|
||||
# VM(
|
||||
# base=VMBase(
|
||||
# icon=assets.loc / "zenith.jpeg",
|
||||
# name="Zenith Clan",
|
||||
# url="clan://zenith.lol",
|
||||
# _path=Path(__file__).parent.parent / "test_democlan",
|
||||
# status=False,
|
||||
# )
|
||||
# ),
|
||||
# VM(
|
||||
# base=VMBase(
|
||||
# icon=assets.loc / "firestorm.jpeg",
|
||||
# name="Firestorm Clan",
|
||||
# url="clan://firestorm.lol",
|
||||
# _path=Path(__file__).parent.parent / "test_democlan",
|
||||
# status=False,
|
||||
# ),
|
||||
# ),
|
||||
# VM(
|
||||
# base=VMBase(
|
||||
# icon=assets.loc / "placeholder.jpeg",
|
||||
# name="Placeholder Clan",
|
||||
# url="clan://demo.lol",
|
||||
# _path=Path(__file__).parent.parent / "test_democlan",
|
||||
# status=True,
|
||||
# ),
|
||||
# ),
|
||||
# ]
|
||||
|
||||
vm_list = []
|
||||
|
||||
# TODO: list_history() should return a list of dicts, not a list of paths
|
||||
# Execute `clan flakes add <path>` to democlan for this to work
|
||||
for entry in flakes.history.list_history():
|
||||
for entry in history.list.list_history():
|
||||
flake_config = flakes.inspect.inspect_flake(entry.path, "defaultVM")
|
||||
vm_config = vms.inspect.inspect_vm(entry.path, "defaultVM")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user