Updated to main

This commit is contained in:
Luis Hebendanz 2023-10-24 16:54:10 +02:00
parent 711c70d1f0
commit fdcd7ad1d9
23 changed files with 296 additions and 180 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
.direnv .direnv
.coverage.*
**/qubeclan **/qubeclan
**/testdir **/testdir
democlan democlan

View File

@ -3,7 +3,7 @@ import sys
from types import ModuleType from types import ModuleType
from typing import Optional from typing import Optional
from . import config, flake, join, machines, secrets, vms, webui from . import config, flakes, join, machines, secrets, vms, webui
from .ssh import cli as ssh_cli from .ssh import cli as ssh_cli
argcomplete: Optional[ModuleType] = None argcomplete: Optional[ModuleType] = None
@ -25,9 +25,9 @@ def create_parser(prog: Optional[str] = None) -> argparse.ArgumentParser:
subparsers = parser.add_subparsers() subparsers = parser.add_subparsers()
parser_flake = subparsers.add_parser( parser_flake = subparsers.add_parser(
"flake", help="create a clan flake inside the current directory" "flakes", help="create a clan flake inside the current directory"
) )
flake.register_parser(parser_flake) flakes.register_parser(parser_flake)
parser_join = subparsers.add_parser("join", help="join a remote clan") parser_join = subparsers.add_parser("join", help="join a remote clan")
join.register_parser(parser_join) join.register_parser(parser_join)

View File

@ -14,6 +14,7 @@ class CmdOut(NamedTuple):
stderr: str stderr: str
cwd: Optional[Path] = None cwd: Optional[Path] = None
async def run(cmd: list[str], cwd: Optional[Path] = None) -> CmdOut: async def run(cmd: list[str], cwd: Optional[Path] = None) -> CmdOut:
log.debug(f"$: {shlex.join(cmd)}") log.debug(f"$: {shlex.join(cmd)}")
cwd_res = None cwd_res = None
@ -48,7 +49,9 @@ stdout:
return CmdOut(stdout.decode("utf-8"), stderr.decode("utf-8"), cwd=cwd) return CmdOut(stdout.decode("utf-8"), stderr.decode("utf-8"), cwd=cwd)
def runforcli(func: Callable[..., Coroutine[Any, Any, Dict[str, CmdOut]]], *args: Any) -> None: def runforcli(
func: Callable[..., Coroutine[Any, Any, Dict[str, CmdOut]]], *args: Any
) -> None:
try: try:
res = asyncio.run(func(*args)) res = asyncio.run(func(*args))
@ -60,4 +63,4 @@ def runforcli(func: Callable[..., Coroutine[Any, Any, Dict[str, CmdOut]]], *args
print(f"{name}: {out.stdout}", end="") print(f"{name}: {out.stdout}", end="")
except ClanError as e: except ClanError as e:
print(e) print(e)
exit(1) exit(1)

View File

@ -9,10 +9,9 @@ import sys
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Tuple, get_origin from typing import Any, Optional, Tuple, get_origin
from clan_cli.dirs import get_clan_flake_toplevel from clan_cli.dirs import get_clan_flake_toplevel, machine_settings_file
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.git import commit_file from clan_cli.git import commit_file
from clan_cli.machines.folders import machine_settings_file
from clan_cli.nix import nix_eval from clan_cli.nix import nix_eval
script_dir = Path(__file__).parent script_dir = Path(__file__).parent
@ -154,6 +153,39 @@ def read_machine_option_value(
return out return out
def get_or_set_option(args: argparse.Namespace) -> None:
if args.value == []:
print(read_machine_option_value(args.machine, args.option, args.show_trace))
else:
# load options
if args.options_file is None:
options = options_for_machine(
machine_name=args.machine, show_trace=args.show_trace
)
else:
with open(args.options_file) as f:
options = json.load(f)
# compute settings json file location
if args.settings_file is None:
get_clan_flake_toplevel()
settings_file = machine_settings_file(args.flake, args.machine)
else:
settings_file = args.settings_file
# set the option with the given value
set_option(
option=args.option,
value=args.value,
options=options,
settings_file=settings_file,
option_description=args.option,
show_trace=args.show_trace,
)
if not args.quiet:
new_value = read_machine_option_value(args.machine, args.option)
print(f"New Value for {args.option}:")
print(new_value)
def find_option( def find_option(
option: str, value: Any, options: dict, option_description: Optional[str] = None option: str, value: Any, options: dict, option_description: Optional[str] = None
) -> Tuple[str, Any]: ) -> Tuple[str, Any]:
@ -258,38 +290,6 @@ def set_option(
commit_file(settings_file, commit_message=f"Set option {option_description}") commit_file(settings_file, commit_message=f"Set option {option_description}")
def get_or_set_option(args: argparse.Namespace) -> None:
if args.value == []:
print(read_machine_option_value(args.machine, args.option, args.show_trace))
else:
# load options
if args.options_file is None:
options = options_for_machine(
machine_name=args.machine, show_trace=args.show_trace
)
else:
with open(args.options_file) as f:
options = json.load(f)
# compute settings json file location
if args.settings_file is None:
get_clan_flake_toplevel()
settings_file = machine_settings_file(args.machine)
else:
settings_file = args.settings_file
# set the option with the given value
set_option(
option=args.option,
value=args.value,
options=options,
settings_file=settings_file,
option_description=args.option,
show_trace=args.show_trace,
)
if not args.quiet:
new_value = read_machine_option_value(args.machine, args.option)
print(f"New Value for {args.option}:")
print(new_value)
# takes a (sub)parser and configures it # takes a (sub)parser and configures it
def register_parser( def register_parser(
@ -302,7 +302,11 @@ def register_parser(
# inject callback function to process the input later # inject callback function to process the input later
parser.set_defaults(func=get_or_set_option) parser.set_defaults(func=get_or_set_option)
parser.add_argument(
"flake",
type=str,
help="name of the flake to set machine options for",
)
parser.add_argument( parser.add_argument(
"--machine", "--machine",
"-m", "-m",

View File

@ -3,14 +3,16 @@ import os
import subprocess import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Optional
from fastapi import HTTPException from fastapi import HTTPException
from clan_cli.dirs import get_clan_flake_toplevel, nixpkgs_source from clan_cli.dirs import (
get_flake_path,
machine_settings_file,
nixpkgs_source,
specific_machine_dir,
)
from clan_cli.git import commit_file, find_git_repo_root from clan_cli.git import commit_file, find_git_repo_root
from clan_cli.machines.folders import machine_folder, machine_settings_file
from clan_cli.nix import nix_eval from clan_cli.nix import nix_eval
@ -52,26 +54,26 @@ def verify_machine_config(
def config_for_machine(machine_name: str) -> dict: def config_for_machine(machine_name: str) -> dict:
# read the config from a json file located at {flake}/machines/{machine_name}/settings.json # read the config from a json file located at {flake}/machines/{machine_name}/settings.json
if not machine_folder(machine_name).exists(): if not specific_machine_dir(flake_name, machine_name).exists():
raise HTTPException( raise HTTPException(
status_code=404, status_code=404,
detail=f"Machine {machine_name} not found. Create the machine first`", detail=f"Machine {machine_name} not found. Create the machine first`",
) )
settings_path = machine_settings_file(machine_name) settings_path = machine_settings_file(flake_name, machine_name)
if not settings_path.exists(): if not settings_path.exists():
return {} return {}
with open(settings_path) as f: with open(settings_path) as f:
return json.load(f) return json.load(f)
def set_config_for_machine(machine_name: str, config: dict) -> None: def set_config_for_machine(flake_name: str, machine_name: str, config: dict) -> None:
# write the config to a json file located at {flake}/machines/{machine_name}/settings.json # write the config to a json file located at {flake}/machines/{machine_name}/settings.json
if not machine_folder(machine_name).exists(): if not specific_machine_dir(flake_name, machine_name).exists():
raise HTTPException( raise HTTPException(
status_code=404, status_code=404,
detail=f"Machine {machine_name} not found. Create the machine first`", detail=f"Machine {machine_name} not found. Create the machine first`",
) )
settings_path = machine_settings_file(machine_name) settings_path = machine_settings_file(flake_name, machine_name)
settings_path.parent.mkdir(parents=True, exist_ok=True) settings_path.parent.mkdir(parents=True, exist_ok=True)
with open(settings_path, "w") as f: with open(settings_path, "w") as f:
json.dump(config, f) json.dump(config, f)
@ -81,50 +83,76 @@ def set_config_for_machine(machine_name: str, config: dict) -> None:
commit_file(settings_path, repo_dir) commit_file(settings_path, repo_dir)
def schema_for_machine( def schema_for_machine(flake_name: str, machine_name: str) -> dict:
machine_name: str, config: Optional[dict] = None, flake: Optional[Path] = None flake = get_flake_path(flake_name)
) -> dict:
if flake is None: # use nix eval to lib.evalModules .#nixosModules.machine-{machine_name}
flake = get_clan_flake_toplevel() proc = subprocess.run(
# use nix eval to lib.evalModules .#nixosConfigurations.<machine_name>.options.clan nix_eval(
with NamedTemporaryFile(mode="w") as clan_machine_settings_file: flags=[
env = os.environ.copy() "--impure",
inject_config_flags = [] "--show-trace",
if config is not None: "--expr",
json.dump(config, clan_machine_settings_file, indent=2) f"""
clan_machine_settings_file.seek(0) let
env["CLAN_MACHINE_SETTINGS_FILE"] = clan_machine_settings_file.name flake = builtins.getFlake (toString {flake});
inject_config_flags = [ lib = import {nixpkgs_source()}/lib;
"--impure", # needed to access CLAN_MACHINE_SETTINGS_FILE options = flake.nixosConfigurations.{machine_name}.options;
] clanOptions = options.clan;
proc = subprocess.run( jsonschemaLib = import {Path(__file__).parent / "jsonschema"} {{ inherit lib; }};
nix_eval( jsonschema = jsonschemaLib.parseOptions clanOptions;
flags=inject_config_flags in
+ [ jsonschema
"--impure", """,
"--show-trace", ],
"--expr", ),
f""" capture_output=True,
let text=True,
flake = builtins.getFlake (toString {flake}); )
lib = import {nixpkgs_source()}/lib; # def schema_for_machine(
options = flake.nixosConfigurations.{machine_name}.options; # machine_name: str, config: Optional[dict] = None, flake: Optional[Path] = None
clanOptions = options.clan; # ) -> dict:
jsonschemaLib = import {Path(__file__).parent / "jsonschema"} {{ inherit lib; }}; # if flake is None:
jsonschema = jsonschemaLib.parseOptions clanOptions; # flake = get_clan_flake_toplevel()
in # # use nix eval to lib.evalModules .#nixosConfigurations.<machine_name>.options.clan
jsonschema # with NamedTemporaryFile(mode="w") as clan_machine_settings_file:
""", # env = os.environ.copy()
], # inject_config_flags = []
), # if config is not None:
capture_output=True, # json.dump(config, clan_machine_settings_file, indent=2)
text=True, # clan_machine_settings_file.seek(0)
cwd=flake, # env["CLAN_MACHINE_SETTINGS_FILE"] = clan_machine_settings_file.name
env=env, # inject_config_flags = [
) # "--impure", # needed to access CLAN_MACHINE_SETTINGS_FILE
if proc.returncode != 0: # ]
print(proc.stderr, file=sys.stderr) # proc = subprocess.run(
raise Exception( # nix_eval(
f"Failed to read schema for machine {machine_name}:\n{proc.stderr}" # flags=inject_config_flags
) # + [
return json.loads(proc.stdout) # "--impure",
# "--show-trace",
# "--expr",
# f"""
# let
# flake = builtins.getFlake (toString {flake});
# lib = import {nixpkgs_source()}/lib;
# options = flake.nixosConfigurations.{machine_name}.options;
# clanOptions = options.clan;
# jsonschemaLib = import {Path(__file__).parent / "jsonschema"} {{ inherit lib; }};
# jsonschema = jsonschemaLib.parseOptions clanOptions;
# in
# jsonschema
# """,
# ],
# ),
# capture_output=True,
# text=True,
# cwd=flake,
# env=env,
# )
# if proc.returncode != 0:
# print(proc.stderr, file=sys.stderr)
# raise Exception(
# f"Failed to read schema for machine {machine_name}:\n{proc.stderr}"
# )
# return json.loads(proc.stdout)

View File

@ -68,6 +68,25 @@ def clan_flake_dir() -> Path:
return path.resolve() return path.resolve()
def get_flake_path(name: str) -> Path:
flake_dir = clan_flake_dir() / name
if not flake_dir.exists():
raise ClanError(f"Flake {name} does not exist")
return flake_dir
def machines_dir(flake_name: str) -> Path:
return get_flake_path(flake_name) / "machines"
def specific_machine_dir(flake_name: str, machine: str) -> Path:
return machines_dir(flake_name) / machine
def machine_settings_file(flake_name: str, machine: str) -> Path:
return specific_machine_dir(flake_name, machine) / "settings.json"
def module_root() -> Path: def module_root() -> Path:
return Path(__file__).parent return Path(__file__).parent

View File

@ -2,6 +2,7 @@
import argparse import argparse
from .create import register_create_parser from .create import register_create_parser
from .list import register_list_parser
# takes a (sub)parser and configures it # takes a (sub)parser and configures it
@ -12,5 +13,8 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
help="the command to run", help="the command to run",
required=True, required=True,
) )
update_parser = subparser.add_parser("create", help="Create a clan flake") create_parser = subparser.add_parser("create", help="Create a clan flake")
register_create_parser(update_parser) register_create_parser(create_parser)
list_parser = subparser.add_parser("list", help="List clan flakes")
register_list_parser(list_parser)

View File

@ -7,9 +7,12 @@ from pydantic import AnyUrl
from pydantic.tools import parse_obj_as from pydantic.tools import parse_obj_as
from ..async_cmd import CmdOut, run, runforcli from ..async_cmd import CmdOut, run, runforcli
from ..dirs import clan_flake_dir
from ..nix import nix_command, nix_shell from ..nix import nix_command, nix_shell
DEFAULT_URL: AnyUrl = parse_obj_as(AnyUrl, "git+https://git.clan.lol/clan/clan-core#new-clan") DEFAULT_URL: AnyUrl = parse_obj_as(
AnyUrl, "git+https://git.clan.lol/clan/clan-core#new-clan"
)
async def create_flake(directory: Path, url: AnyUrl) -> Dict[str, CmdOut]: async def create_flake(directory: Path, url: AnyUrl) -> Dict[str, CmdOut]:
@ -51,16 +54,16 @@ async def create_flake(directory: Path, url: AnyUrl) -> Dict[str, CmdOut]:
def create_flake_command(args: argparse.Namespace) -> None: def create_flake_command(args: argparse.Namespace) -> None:
runforcli(create_flake, args.directory, DEFAULT_URL) flake_dir = clan_flake_dir() / args.name
runforcli(create_flake, flake_dir, DEFAULT_URL)
# takes a (sub)parser and configures it # takes a (sub)parser and configures it
def register_create_parser(parser: argparse.ArgumentParser) -> None: def register_create_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument( parser.add_argument(
"directory", "name",
type=Path, type=str,
help="output directory for the flake", help="name for the flake",
) )
# parser.add_argument("name", type=str, help="name of the flake") # parser.add_argument("name", type=str, help="name of the flake")
parser.set_defaults(func=create_flake_command) parser.set_defaults(func=create_flake_command)

View File

@ -0,0 +1,27 @@
import argparse
import logging
import os
from ..dirs import clan_flake_dir
log = logging.getLogger(__name__)
def list_flakes() -> list[str]:
path = clan_flake_dir()
log.debug(f"Listing machines in {path}")
if not path.exists():
return []
objs: list[str] = []
for f in os.listdir(path):
objs.append(f)
return objs
def list_command(args: argparse.Namespace) -> None:
for flake in list_flakes():
print(flake)
def register_list_parser(parser: argparse.ArgumentParser) -> None:
parser.set_defaults(func=list_command)

View File

@ -23,8 +23,8 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
create_parser = subparser.add_parser("create", help="Create a machine") create_parser = subparser.add_parser("create", help="Create a machine")
register_create_parser(create_parser) register_create_parser(create_parser)
remove_parser = subparser.add_parser("remove", help="Remove a machine") delete_parser = subparser.add_parser("delete", help="Delete a machine")
register_delete_parser(remove_parser) register_delete_parser(delete_parser)
list_parser = subparser.add_parser("list", help="List machines") list_parser = subparser.add_parser("list", help="List machines")
register_list_parser(list_parser) register_list_parser(list_parser)

View File

@ -3,31 +3,49 @@ import logging
from typing import Dict from typing import Dict
from ..async_cmd import CmdOut, run, runforcli from ..async_cmd import CmdOut, run, runforcli
from ..dirs import get_flake_path, specific_machine_dir
from ..errors import ClanError
from ..nix import nix_shell from ..nix import nix_shell
from .folders import machine_folder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
async def create_machine(name: str) -> Dict[str, CmdOut]:
folder = machine_folder(name) async def create_machine(flake_name: str, machine_name: str) -> Dict[str, CmdOut]:
folder = specific_machine_dir(flake_name, machine_name)
folder.mkdir(parents=True, exist_ok=True) folder.mkdir(parents=True, exist_ok=True)
# create empty settings.json file inside the folder # create empty settings.json file inside the folder
with open(folder / "settings.json", "w") as f: with open(folder / "settings.json", "w") as f:
f.write("{}") f.write("{}")
response = {} response = {}
out = await run(nix_shell(["git"], ["git", "add", str(folder)])) out = await run(nix_shell(["git"], ["git", "add", str(folder)]), cwd=folder)
response["git add"] = out response["git add"] = out
out = await run(nix_shell(["git"], ["git", "commit", "-m", f"Added machine {name}", str(folder)])) out = await run(
nix_shell(
["git"],
["git", "commit", "-m", f"Added machine {machine_name}", str(folder)],
),
cwd=folder,
)
response["git commit"] = out response["git commit"] = out
return response return response
def create_command(args: argparse.Namespace) -> None: def create_command(args: argparse.Namespace) -> None:
runforcli(create_machine, args.host) try:
flake_dir = get_flake_path(args.flake)
runforcli(create_machine, flake_dir, args.machine)
except ClanError as e:
print(e)
def register_create_parser(parser: argparse.ArgumentParser) -> None: def register_create_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument("host", type=str) parser.add_argument("machine", type=str)
parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser.set_defaults(func=create_command) parser.set_defaults(func=create_command)

View File

@ -1,12 +1,12 @@
import argparse import argparse
import shutil import shutil
from ..dirs import specific_machine_dir
from ..errors import ClanError from ..errors import ClanError
from .folders import machine_folder
def delete_command(args: argparse.Namespace) -> None: def delete_command(args: argparse.Namespace) -> None:
folder = machine_folder(args.host) folder = specific_machine_dir(args.flake, args.host)
if folder.exists(): if folder.exists():
shutil.rmtree(folder) shutil.rmtree(folder)
else: else:
@ -15,4 +15,9 @@ def delete_command(args: argparse.Namespace) -> None:
def register_delete_parser(parser: argparse.ArgumentParser) -> None: def register_delete_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument("host", type=str) parser.add_argument("host", type=str)
parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser.set_defaults(func=delete_command) parser.set_defaults(func=delete_command)

View File

@ -1,9 +1,9 @@
from .folders import machine_folder from ..dirs import specific_machine_dir
def machine_has_fact(machine: str, fact: str) -> bool: def machine_has_fact(flake_name: str, machine: str, fact: str) -> bool:
return (machine_folder(machine) / "facts" / fact).exists() return (specific_machine_dir(flake_name, machine) / "facts" / fact).exists()
def machine_get_fact(machine: str, fact: str) -> str: def machine_get_fact(flake_name: str, machine: str, fact: str) -> str:
return (machine_folder(machine) / "facts" / fact).read_text() return (specific_machine_dir(flake_name, machine) / "facts" / fact).read_text()

View File

@ -1,15 +0,0 @@
from pathlib import Path
from ..dirs import get_clan_flake_toplevel
def machines_folder() -> Path:
return get_clan_flake_toplevel() / "machines"
def machine_folder(machine: str) -> Path:
return machines_folder() / machine
def machine_settings_file(machine: str) -> Path:
return machine_folder(machine) / "settings.json"

View File

@ -3,6 +3,7 @@ import subprocess
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from ..dirs import get_flake_path
from ..machines.machines import Machine from ..machines.machines import Machine
from ..nix import nix_shell from ..nix import nix_shell
from ..secrets.generate import generate_secrets from ..secrets.generate import generate_secrets
@ -26,7 +27,7 @@ def install_nixos(machine: Machine) -> None:
[ [
"nixos-anywhere", "nixos-anywhere",
"-f", "-f",
f"{machine.clan_dir}#{flake_attr}", f"{machine.flake_dir}#{flake_attr}",
"-t", "-t",
"--no-reboot", "--no-reboot",
"--extra-files", "--extra-files",
@ -39,7 +40,7 @@ def install_nixos(machine: Machine) -> None:
def install_command(args: argparse.Namespace) -> None: def install_command(args: argparse.Namespace) -> None:
machine = Machine(args.machine) machine = Machine(args.machine, flake_dir=get_flake_path(args.flake))
machine.deployment_address = args.target_host machine.deployment_address = args.target_host
install_nixos(machine) install_nixos(machine)
@ -56,5 +57,9 @@ def register_install_parser(parser: argparse.ArgumentParser) -> None:
type=str, type=str,
help="ssh address to install to in the form of user@host:2222", help="ssh address to install to in the form of user@host:2222",
) )
parser.add_argument(
"flake",
type=str,
help="name of the flake to install machine from",
)
parser.set_defaults(func=install_command) parser.set_defaults(func=install_command)

View File

@ -2,14 +2,14 @@ import argparse
import logging import logging
import os import os
from .folders import machines_folder from ..dirs import machines_dir
from .types import validate_hostname from .types import validate_hostname
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def list_machines() -> list[str]: def list_machines(flake_name: str) -> list[str]:
path = machines_folder() path = machines_dir(flake_name)
log.debug(f"Listing machines in {path}") log.debug(f"Listing machines in {path}")
if not path.exists(): if not path.exists():
return [] return []
@ -21,9 +21,14 @@ def list_machines() -> list[str]:
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
for machine in list_machines(): for machine in list_machines(args.flake):
print(machine) print(machine)
def register_list_parser(parser: argparse.ArgumentParser) -> None: def register_list_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser.set_defaults(func=list_command) parser.set_defaults(func=list_command)

View File

@ -31,7 +31,7 @@ class Machine:
def __init__( def __init__(
self, self,
name: str, name: str,
clan_dir: Optional[Path] = None, flake_dir: Optional[Path] = None,
machine_data: Optional[dict] = None, machine_data: Optional[dict] = None,
) -> None: ) -> None:
""" """
@ -41,13 +41,13 @@ class Machine:
@machine_json: can be optionally used to skip evaluation of the machine, location of the json file with machine data @machine_json: can be optionally used to skip evaluation of the machine, location of the json file with machine data
""" """
self.name = name self.name = name
if clan_dir is None: if flake_dir is None:
self.clan_dir = get_clan_flake_toplevel() self.flake_dir = get_clan_flake_toplevel()
else: else:
self.clan_dir = clan_dir self.flake_dir = flake_dir
if machine_data is None: if machine_data is None:
self.machine_data = build_machine_data(name, self.clan_dir) self.machine_data = build_machine_data(name, self.flake_dir)
else: else:
self.machine_data = machine_data self.machine_data = machine_data
@ -68,7 +68,7 @@ class Machine:
@secrets_dir: the directory to store the secrets in @secrets_dir: the directory to store the secrets in
""" """
env = os.environ.copy() env = os.environ.copy()
env["CLAN_DIR"] = str(self.clan_dir) env["CLAN_DIR"] = str(self.flake_dir)
env["PYTHONPATH"] = str( env["PYTHONPATH"] = str(
":".join(sys.path) ":".join(sys.path)
) # TODO do this in the clanCore module ) # TODO do this in the clanCore module
@ -95,7 +95,7 @@ class Machine:
@attr: the attribute to get @attr: the attribute to get
""" """
output = subprocess.run( output = subprocess.run(
nix_eval([f"path:{self.clan_dir}#{attr}"]), nix_eval([f"path:{self.flake_dir}#{attr}"]),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
check=True, check=True,
text=True, text=True,
@ -108,7 +108,7 @@ class Machine:
@attr: the attribute to get @attr: the attribute to get
""" """
outpath = subprocess.run( outpath = subprocess.run(
nix_build([f"path:{self.clan_dir}#{attr}"]), nix_build([f"path:{self.flake_dir}#{attr}"]),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
check=True, check=True,
text=True, text=True,

View File

@ -4,7 +4,7 @@ import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from ..dirs import get_clan_flake_toplevel from ..dirs import get_flake_path
from ..machines.machines import Machine from ..machines.machines import Machine
from ..nix import nix_build, nix_command, nix_config from ..nix import nix_build, nix_command, nix_config
from ..secrets.generate import generate_secrets from ..secrets.generate import generate_secrets
@ -101,19 +101,19 @@ def get_all_machines(clan_dir: Path) -> HostGroup:
return HostGroup(hosts) return HostGroup(hosts)
def get_selected_machines(machine_names: list[str], clan_dir: Path) -> HostGroup: def get_selected_machines(machine_names: list[str], flake_dir: Path) -> HostGroup:
hosts = [] hosts = []
for name in machine_names: for name in machine_names:
machine = Machine(name=name, clan_dir=clan_dir) machine = Machine(name=name, flake_dir=flake_dir)
hosts.append(machine.host) hosts.append(machine.host)
return HostGroup(hosts) return HostGroup(hosts)
# FIXME: we want some kind of inventory here. # FIXME: we want some kind of inventory here.
def update(args: argparse.Namespace) -> None: def update(args: argparse.Namespace) -> None:
clan_dir = get_clan_flake_toplevel() flake_dir = get_flake_path(args.flake)
if len(args.machines) == 1 and args.target_host is not None: if len(args.machines) == 1 and args.target_host is not None:
machine = Machine(name=args.machines[0], clan_dir=clan_dir) machine = Machine(name=args.machines[0], flake_dir=flake_dir)
machine.deployment_address = args.target_host machine.deployment_address = args.target_host
host = parse_deployment_address( host = parse_deployment_address(
args.machines[0], args.machines[0],
@ -127,11 +127,11 @@ def update(args: argparse.Namespace) -> None:
exit(1) exit(1)
else: else:
if len(args.machines) == 0: if len(args.machines) == 0:
machines = get_all_machines(clan_dir) machines = get_all_machines(flake_dir)
else: else:
machines = get_selected_machines(args.machines, clan_dir) machines = get_selected_machines(args.machines, flake_dir)
deploy_nixos(machines, clan_dir) deploy_nixos(machines, flake_dir)
def register_update_parser(parser: argparse.ArgumentParser) -> None: def register_update_parser(parser: argparse.ArgumentParser) -> None:
@ -142,6 +142,11 @@ def register_update_parser(parser: argparse.ArgumentParser) -> None:
nargs="*", nargs="*",
default=[], default=[],
) )
parser.add_argument(
"flake",
type=str,
help="name of the flake to update machine for",
)
parser.add_argument( parser.add_argument(
"--target-host", "--target-host",
type=str, type=str,

View File

@ -13,7 +13,7 @@ log = logging.getLogger(__name__)
def generate_secrets(machine: Machine) -> None: def generate_secrets(machine: Machine) -> None:
env = os.environ.copy() env = os.environ.copy()
env["CLAN_DIR"] = str(machine.clan_dir) env["CLAN_DIR"] = str(machine.flake_dir)
env["PYTHONPATH"] = ":".join(sys.path) # TODO do this in the clanCore module env["PYTHONPATH"] = ":".join(sys.path) # TODO do this in the clanCore module
print(f"generating secrets... {machine.generate_secrets}") print(f"generating secrets... {machine.generate_secrets}")

View File

@ -1,4 +1,3 @@
# mypy: ignore-errors
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -6,7 +5,7 @@ from typing import Any
from pydantic import AnyUrl, BaseModel, validator from pydantic import AnyUrl, BaseModel, validator
from ..dirs import clan_data_dir, clan_flake_dir from ..dirs import clan_data_dir, clan_flake_dir
from ..flake.create import DEFAULT_URL from ..flakes.create import DEFAULT_URL
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -30,7 +29,7 @@ class ClanDataPath(BaseModel):
dest: Path dest: Path
@validator("dest") @validator("dest")
def check_data_path(cls: Any, v: Path) -> Path: # type: ignore def check_data_path(cls: Any, v: Path) -> Path: # noqa
return validate_path(clan_data_dir(), v) return validate_path(clan_data_dir(), v)
@ -38,7 +37,7 @@ class ClanFlakePath(BaseModel):
dest: Path dest: Path
@validator("dest") @validator("dest")
def check_dest(cls: Any, v: Path) -> Path: # type: ignore def check_dest(cls: Any, v: Path) -> Path: # noqa
return validate_path(clan_flake_dir(), v) return validate_path(clan_flake_dir(), v)

View File

@ -17,11 +17,12 @@ from clan_cli.webui.api_outputs import (
) )
from ...async_cmd import run from ...async_cmd import run
from ...flake import create from ...flakes import create
from ...nix import nix_command, nix_flake_show from ...nix import nix_command, nix_flake_show
router = APIRouter() router = APIRouter()
# TODO: Check for directory traversal # TODO: Check for directory traversal
async def get_attrs(url: AnyUrl | Path) -> list[str]: async def get_attrs(url: AnyUrl | Path) -> list[str]:
cmd = nix_flake_show(url) cmd = nix_flake_show(url)
@ -42,6 +43,7 @@ async def get_attrs(url: AnyUrl | Path) -> list[str]:
) )
return flake_attrs return flake_attrs
# TODO: Check for directory traversal # TODO: Check for directory traversal
@router.get("/api/flake/attrs") @router.get("/api/flake/attrs")
async def inspect_flake_attrs(url: AnyUrl | Path) -> FlakeAttrResponse: async def inspect_flake_attrs(url: AnyUrl | Path) -> FlakeAttrResponse:
@ -74,7 +76,6 @@ async def inspect_flake(
return FlakeResponse(content=content, actions=actions) return FlakeResponse(content=content, actions=actions)
@router.post("/api/flake/create", status_code=status.HTTP_201_CREATED) @router.post("/api/flake/create", status_code=status.HTTP_201_CREATED)
async def create_flake( async def create_flake(
args: Annotated[FlakeCreateInput, Body()], args: Annotated[FlakeCreateInput, Body()],

View File

@ -27,17 +27,19 @@ log = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
@router.get("/api/machines") @router.get("/api/{flake_name}/machines")
async def list_machines() -> MachinesResponse: async def list_machines(flake_name: str) -> MachinesResponse:
machines = [] machines = []
for m in _list_machines(): for m in _list_machines(flake_name):
machines.append(Machine(name=m, status=Status.UNKNOWN)) machines.append(Machine(name=m, status=Status.UNKNOWN))
return MachinesResponse(machines=machines) return MachinesResponse(machines=machines)
@router.post("/api/machines", status_code=201) @router.post("/api/{flake_name}/machines", status_code=201)
async def create_machine(machine: Annotated[MachineCreate, Body()]) -> MachineResponse: async def create_machine(
out = await _create_machine(machine.name) flake_name: str, machine: Annotated[MachineCreate, Body()]
) -> MachineResponse:
out = await _create_machine(flake_name, machine.name)
log.debug(out) log.debug(out)
return MachineResponse(machine=Machine(name=machine.name, status=Status.UNKNOWN)) return MachineResponse(machine=Machine(name=machine.name, status=Status.UNKNOWN))
@ -48,23 +50,23 @@ async def get_machine(name: str) -> MachineResponse:
return MachineResponse(machine=Machine(name=name, status=Status.UNKNOWN)) return MachineResponse(machine=Machine(name=name, status=Status.UNKNOWN))
@router.get("/api/machines/{name}/config") @router.get("/api/{flake_name}/machines/{name}/config")
async def get_machine_config(name: str) -> ConfigResponse: async def get_machine_config(flake_name: str, name: str) -> ConfigResponse:
config = config_for_machine(name) config = config_for_machine(flake_name, name)
return ConfigResponse(config=config) return ConfigResponse(config=config)
@router.put("/api/machines/{name}/config") @router.put("/api/{flake_name}/machines/{name}/config")
async def set_machine_config( async def set_machine_config(
name: str, config: Annotated[dict, Body()] flake_name: str, name: str, config: Annotated[dict, Body()]
) -> ConfigResponse: ) -> ConfigResponse:
set_config_for_machine(name, config) set_config_for_machine(flake_name, name, config)
return ConfigResponse(config=config) return ConfigResponse(config=config)
@router.get("/api/machines/{name}/schema") @router.get("/api/{flake_name}/machines/{name}/schema")
async def get_machine_schema(name: str) -> SchemaResponse: async def get_machine_schema(flake_name: str, name: str) -> SchemaResponse:
schema = schema_for_machine(name) schema = schema_for_machine(flake_name, name)
return SchemaResponse(schema=schema) return SchemaResponse(schema=schema)

View File

@ -22,6 +22,7 @@ from ..api_outputs import (
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
# TODO: Check for directory traversal # TODO: Check for directory traversal
@router.post("/api/vms/inspect") @router.post("/api/vms/inspect")
async def inspect_vm( async def inspect_vm(
@ -52,6 +53,7 @@ async def get_vm_logs(uuid: UUID) -> StreamingResponse:
media_type="text/plain", media_type="text/plain",
) )
# TODO: Check for directory traversal # TODO: Check for directory traversal
@router.post("/api/vms/create") @router.post("/api/vms/create")
async def create_vm(vm: Annotated[VmConfig, Body()]) -> VmCreateResponse: async def create_vm(vm: Annotated[VmConfig, Body()]) -> VmCreateResponse: