Merge pull request 'add machine subcommand' (#167) from Mic92-main into main
All checks were successful
build / test (push) Successful in 15s

This commit is contained in:
clan-bot 2023-08-24 16:29:15 +00:00
commit 8cab7ec44c
18 changed files with 220 additions and 149 deletions

View File

@ -1,11 +1,10 @@
import argparse
import os
import sys
from types import ModuleType
from typing import Optional
from . import admin, machines, secrets, webui
# from . import admin, config, secrets, update, webui
from . import admin, config, machines, secrets, webui
from .errors import ClanError
from .ssh import cli as ssh_cli
@ -16,17 +15,17 @@ except ImportError:
pass
# this will be the entrypoint under /bin/clan (see pyproject.toml config)
def main() -> None:
parser = argparse.ArgumentParser(description="cLAN tool")
def create_parser(prog: Optional[str] = None) -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog=prog, description="cLAN tool")
subparsers = parser.add_subparsers()
parser_admin = subparsers.add_parser("admin", help="administrate a clan")
admin.register_parser(parser_admin)
# DISABLED: this currently crashes if a flake does not define .#clanOptions
# parser_config = subparsers.add_parser("config", help="set nixos configuration")
# config.register_parser(parser_config)
if os.environ.get("CLAN_OPTIONS_FILE") is not None:
parser_config = subparsers.add_parser("config", help="set nixos configuration")
config.register_parser(parser_config)
parser_ssh = subparsers.add_parser("ssh", help="ssh to a remote machine")
ssh_cli.register_parser(parser_ssh)
@ -47,14 +46,20 @@ def main() -> None:
if len(sys.argv) == 1:
parser.print_help()
return parser
# this will be the entrypoint under /bin/clan (see pyproject.toml config)
def main() -> None:
parser = create_parser()
args = parser.parse_args()
if hasattr(args, "func"):
try:
args.func(args)
except ClanError as e:
print(f"{sys.argv[0]}: {e}")
sys.exit(1)
if not hasattr(args, "func"):
return
try:
args.func(args)
except ClanError as e:
print(f"{sys.argv[0]}: {e}")
sys.exit(1)
if __name__ == "__main__":

View File

@ -1,6 +1,9 @@
# !/usr/bin/env python3
import argparse
from .create import register_create_parser
from .delete import register_delete_parser
from .list import register_list_parser
from .update import register_update_parser
@ -13,5 +16,14 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
required=True,
)
groups_parser = subparser.add_parser("update", help="Update a machine")
register_update_parser(groups_parser)
update_parser = subparser.add_parser("update", help="Update a machine")
register_update_parser(update_parser)
create_parser = subparser.add_parser("create", help="Create a machine")
register_create_parser(create_parser)
remove_parser = subparser.add_parser("remove", help="Remove a machine")
register_delete_parser(remove_parser)
list_parser = subparser.add_parser("list", help="List machines")
register_list_parser(list_parser)

View File

@ -0,0 +1,13 @@
import argparse
from .folders import machine_folder
def create_command(args: argparse.Namespace) -> None:
folder = machine_folder(args.host)
folder.mkdir(parents=True, exist_ok=True)
def register_create_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument("host", type=str)
parser.set_defaults(func=create_command)

View File

@ -0,0 +1,17 @@
import argparse
from ..errors import ClanError
from .folders import machine_folder
def delete_command(args: argparse.Namespace) -> None:
folder = machine_folder(args.host)
if folder.exists():
folder.rmdir()
else:
raise ClanError(f"Machine {args.host} does not exist")
def register_delete_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument("host", type=str)
parser.set_defaults(func=delete_command)

View File

@ -0,0 +1,11 @@
from pathlib import Path
from ..dirs import get_clan_flake_toplevel
def machines_folder() -> Path:
return get_clan_flake_toplevel() / "machines"
def machine_folder(machine: str) -> Path:
return machines_folder() / machine

View File

@ -0,0 +1,25 @@
import argparse
import os
from .folders import machines_folder
from .types import validate_hostname
def list_machines() -> list[str]:
path = machines_folder()
if not path.exists():
return []
objs: list[str] = []
for f in os.listdir(path):
if validate_hostname(f):
objs.append(f)
return objs
def list_command(args: argparse.Namespace) -> None:
for machine in list_machines():
print(machine)
def register_list_parser(parser: argparse.ArgumentParser) -> None:
parser.set_defaults(func=list_command)

View File

@ -0,0 +1,22 @@
import argparse
import re
VALID_HOSTNAME = re.compile(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", re.IGNORECASE)
def validate_hostname(hostname: str) -> bool:
if len(hostname) > 63:
return False
return VALID_HOSTNAME.match(hostname) is not None
def machine_name_type(arg_value: str) -> str:
if len(arg_value) > 63:
raise argparse.ArgumentTypeError(
"Machine name must be less than 63 characters long"
)
if not VALID_HOSTNAME.match(arg_value):
raise argparse.ArgumentTypeError(
"Invalid character in machine name. Allowed characters are a-z, 0-9, ., -, and _. Must not start with a number"
)
return arg_value

View File

@ -3,15 +3,14 @@ import os
from pathlib import Path
from ..errors import ClanError
from ..machines.types import machine_name_type, validate_hostname
from . import secrets
from .folders import sops_groups_folder, sops_machines_folder, sops_users_folder
from .types import (
VALID_USER_NAME,
group_name_type,
machine_name_type,
secret_name_type,
user_name_type,
validate_hostname,
)

View File

@ -1,13 +1,12 @@
import argparse
from ..machines.types import machine_name_type, validate_hostname
from . import secrets
from .folders import list_objects, remove_object, sops_machines_folder
from .sops import write_key
from .types import (
machine_name_type,
public_or_private_age_key_type,
secret_name_type,
validate_hostname,
)

View File

@ -9,13 +9,6 @@ from .sops import get_public_key
VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$")
VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$")
VALID_HOSTNAME = re.compile(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", re.IGNORECASE)
def validate_hostname(hostname: str) -> bool:
if len(hostname) > 63:
return False
return VALID_HOSTNAME.match(hostname) is not None
def secret_name_type(arg_value: str) -> str:
@ -26,18 +19,6 @@ def secret_name_type(arg_value: str) -> str:
return arg_value
def machine_name_type(arg_value: str) -> str:
if len(arg_value) > 63:
raise argparse.ArgumentTypeError(
"Machine name must be less than 63 characters long"
)
if not VALID_SECRET_NAME.match(arg_value):
raise argparse.ArgumentTypeError(
"Invalid character in machine name. Allowed characters are a-z, 0-9, ., -, and _. Must not start with a number"
)
return arg_value
def public_or_private_age_key_type(arg_value: str) -> str:
if os.path.isfile(arg_value):
arg_value = Path(arg_value).read_text().strip()

View File

@ -1,14 +1,14 @@
import argparse
from clan_cli.secrets import register_parser
from clan_cli import create_parser
class SecretCli:
class Cli:
def __init__(self) -> None:
self.parser = argparse.ArgumentParser()
register_parser(self.parser)
self.parser = create_parser(prog="clan")
def run(self, args: list[str]) -> argparse.Namespace:
parsed = self.parser.parse_args(args)
parsed.func(parsed)
if hasattr(parsed, "func"):
parsed.func(parsed)
return parsed

View File

@ -1,21 +1,14 @@
import argparse
from typing import Union
import pytest_subprocess.fake_process
from cli import Cli
from pytest_subprocess import utils
from clan_cli import admin
def test_make_parser() -> None:
parser = argparse.ArgumentParser()
admin.register_parser(parser)
# using fp fixture from pytest-subprocess
def test_create(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
cmd: list[Union[str, utils.Any]] = ["nix", "flake", "init", "-t", fp.any()]
fp.register(cmd)
args = argparse.Namespace(folder="./my-clan")
admin.create(args)
cli = Cli()
cli.run(["admin", "--folder", "./my-clan", "create"])
assert fp.call_count(cmd) == 1

View File

@ -1,22 +1,10 @@
import sys
import pytest
import clan_cli
from cli import Cli
def test_no_args(
capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setattr(sys, "argv", [""])
clan_cli.main()
captured = capsys.readouterr()
assert captured.out.startswith("usage:")
def test_help(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(sys, "argv", ["", "--help"])
def test_help(capsys: pytest.CaptureFixture) -> None:
cli = Cli()
with pytest.raises(SystemExit):
clan_cli.main()
cli.run(["--help"])
captured = capsys.readouterr()
assert captured.out.startswith("usage:")

View File

@ -1,11 +1,10 @@
import argparse
import json
import sys
import tempfile
from pathlib import Path
from typing import Any
import pytest
from cli import Cli
from clan_cli import config
from clan_cli.config import parsing
@ -15,7 +14,7 @@ example_options = f"{Path(config.__file__).parent}/jsonschema/options.json"
# use pytest.parametrize
@pytest.mark.parametrize(
"argv,expected",
"args,expected",
[
(["name", "DavHau"], {"name": "DavHau"}),
(
@ -27,25 +26,18 @@ example_options = f"{Path(config.__file__).parent}/jsonschema/options.json"
],
)
def test_set_some_option(
argv: list[str],
args: list[str],
expected: dict[str, Any],
monkeypatch: pytest.MonkeyPatch,
) -> None:
# monkeypatch sys.argv
monkeypatch.setenv("CLAN_OPTIONS_FILE", example_options)
# create temporary file for out_file
with tempfile.NamedTemporaryFile() as out_file:
with open(out_file.name, "w") as f:
json.dump({}, f)
monkeypatch.setattr(
sys, "argv", ["", "--quiet", "--settings-file", out_file.name] + argv
)
parser = argparse.ArgumentParser()
config._register_parser(
parser=parser,
options=json.loads(Path(example_options).read_text()),
)
args = parser.parse_args()
args.func(args)
cli = Cli()
cli.run(["config", "--quiet", "--settings-file", out_file.name] + args)
json_out = json.loads(open(out_file.name).read())
assert json_out == expected

View File

@ -2,8 +2,8 @@ from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from cli import Cli
from environment import mock_env
from secret_cli import SecretCli
if TYPE_CHECKING:
from age_keys import KeyPair
@ -15,19 +15,20 @@ def test_import_sops(
capsys: pytest.CaptureFixture,
age_keys: list["KeyPair"],
) -> None:
cli = SecretCli()
cli = Cli()
with mock_env(SOPS_AGE_KEY=age_keys[1].privkey):
cli.run(["machines", "add", "machine1", age_keys[0].pubkey])
cli.run(["users", "add", "user1", age_keys[1].pubkey])
cli.run(["users", "add", "user2", age_keys[2].pubkey])
cli.run(["groups", "add-user", "group1", "user1"])
cli.run(["groups", "add-user", "group1", "user2"])
cli.run(["secrets", "machines", "add", "machine1", age_keys[0].pubkey])
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey])
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey])
cli.run(["secrets", "groups", "add-user", "group1", "user1"])
cli.run(["secrets", "groups", "add-user", "group1", "user2"])
# To edit:
# SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml
cli.run(
[
"secrets",
"import-sops",
"--group",
"group1",
@ -37,10 +38,10 @@ def test_import_sops(
]
)
capsys.readouterr()
cli.run(["users", "list"])
cli.run(["secrets", "users", "list"])
users = sorted(capsys.readouterr().out.rstrip().split())
assert users == ["user1", "user2"]
capsys.readouterr()
cli.run(["get", "secret-key"])
cli.run(["secrets", "get", "secret-key"])
assert capsys.readouterr().out == "secret-value"

View File

@ -0,0 +1,21 @@
from pathlib import Path
import pytest
from cli import Cli
def test_machine_subcommands(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
cli = Cli()
cli.run(["machines", "create", "machine1"])
capsys.readouterr()
cli.run(["machines", "list"])
out = capsys.readouterr()
assert "machine1\n" == out.out
cli.run(["machines", "remove", "machine1"])
capsys.readouterr()
cli.run(["machines", "list"])
out = capsys.readouterr()
assert "" == out.out

View File

@ -1,21 +1,12 @@
import argparse
import os
import shutil
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from environment import mock_env
from host_group import HostGroup
from clan_cli.machines.update import deploy_nixos, register_update_parser
def test_cli() -> None:
parser = argparse.ArgumentParser()
register_update_parser(parser)
with pytest.raises(SystemExit):
parser.parse_args(["--help"])
from clan_cli.machines.update import deploy_nixos
def test_update(clan_flake: Path, host_group: HostGroup) -> None:

View File

@ -3,8 +3,8 @@ from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from cli import Cli
from environment import mock_env
from secret_cli import SecretCli
from clan_cli.errors import ClanError
@ -18,16 +18,17 @@ def _test_identities(
capsys: pytest.CaptureFixture,
age_keys: list["KeyPair"],
) -> None:
cli = SecretCli()
cli = Cli()
sops_folder = clan_flake / "sops"
cli.run([what, "add", "foo", age_keys[0].pubkey])
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey])
assert (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError):
cli.run([what, "add", "foo", age_keys[0].pubkey])
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey])
cli.run(
[
"secrets",
what,
"add",
"-f",
@ -37,18 +38,18 @@ def _test_identities(
)
capsys.readouterr() # empty the buffer
cli.run([what, "list"])
cli.run(["secrets", what, "list"])
out = capsys.readouterr() # empty the buffer
assert "foo" in out.out
cli.run([what, "remove", "foo"])
cli.run(["secrets", what, "remove", "foo"])
assert not (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError): # already removed
cli.run([what, "remove", "foo"])
cli.run(["secrets", what, "remove", "foo"])
capsys.readouterr()
cli.run([what, "list"])
cli.run(["secrets", what, "list"])
out = capsys.readouterr()
assert "foo" not in out.out
@ -68,32 +69,32 @@ def test_machines(
def test_groups(
clan_flake: Path, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
) -> None:
cli = SecretCli()
cli = Cli()
capsys.readouterr() # empty the buffer
cli.run(["groups", "list"])
cli.run(["secrets", "groups", "list"])
assert capsys.readouterr().out == ""
with pytest.raises(ClanError): # machine does not exist yet
cli.run(["groups", "add-machine", "group1", "machine1"])
cli.run(["secrets", "groups", "add-machine", "group1", "machine1"])
with pytest.raises(ClanError): # user does not exist yet
cli.run(["groups", "add-user", "groupb1", "user1"])
cli.run(["machines", "add", "machine1", age_keys[0].pubkey])
cli.run(["groups", "add-machine", "group1", "machine1"])
cli.run(["secrets", "groups", "add-user", "groupb1", "user1"])
cli.run(["secrets", "machines", "add", "machine1", age_keys[0].pubkey])
cli.run(["secrets", "groups", "add-machine", "group1", "machine1"])
# Should this fail?
cli.run(["groups", "add-machine", "group1", "machine1"])
cli.run(["secrets", "groups", "add-machine", "group1", "machine1"])
cli.run(["users", "add", "user1", age_keys[0].pubkey])
cli.run(["groups", "add-user", "group1", "user1"])
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey])
cli.run(["secrets", "groups", "add-user", "group1", "user1"])
capsys.readouterr() # empty the buffer
cli.run(["groups", "list"])
cli.run(["secrets", "groups", "list"])
out = capsys.readouterr().out
assert "user1" in out
assert "machine1" in out
cli.run(["groups", "remove-user", "group1", "user1"])
cli.run(["groups", "remove-machine", "group1", "machine1"])
cli.run(["secrets", "groups", "remove-user", "group1", "user1"])
cli.run(["secrets", "groups", "remove-machine", "group1", "machine1"])
groups = os.listdir(clan_flake / "sops" / "groups")
assert len(groups) == 0
@ -101,65 +102,65 @@ def test_groups(
def test_secrets(
clan_flake: Path, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
) -> None:
cli = SecretCli()
cli = Cli()
capsys.readouterr() # empty the buffer
cli.run(["list"])
cli.run(["secrets", "list"])
assert capsys.readouterr().out == ""
with mock_env(
SOPS_NIX_SECRET="foo", SOPS_AGE_KEY_FILE=str(clan_flake / ".." / "age.key")
):
with pytest.raises(ClanError): # does not exist yet
cli.run(["get", "nonexisting"])
cli.run(["set", "key"])
cli.run(["secrets", "get", "nonexisting"])
cli.run(["secrets", "set", "key"])
capsys.readouterr()
cli.run(["get", "key"])
cli.run(["secrets", "get", "key"])
assert capsys.readouterr().out == "foo"
capsys.readouterr()
cli.run(["users", "list"])
cli.run(["secrets", "users", "list"])
users = capsys.readouterr().out.rstrip().split("\n")
assert len(users) == 1, f"users: {users}"
owner = users[0]
capsys.readouterr() # empty the buffer
cli.run(["list"])
cli.run(["secrets", "list"])
assert capsys.readouterr().out == "key\n"
cli.run(["machines", "add", "machine1", age_keys[0].pubkey])
cli.run(["machines", "add-secret", "machine1", "key"])
cli.run(["secrets", "machines", "add", "machine1", age_keys[0].pubkey])
cli.run(["secrets", "machines", "add-secret", "machine1", "key"])
with mock_env(SOPS_AGE_KEY=age_keys[0].privkey, SOPS_AGE_KEY_FILE=""):
capsys.readouterr()
cli.run(["get", "key"])
cli.run(["secrets", "get", "key"])
assert capsys.readouterr().out == "foo"
cli.run(["machines", "remove-secret", "machine1", "key"])
cli.run(["secrets", "machines", "remove-secret", "machine1", "key"])
cli.run(["users", "add", "user1", age_keys[1].pubkey])
cli.run(["users", "add-secret", "user1", "key"])
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey])
cli.run(["secrets", "users", "add-secret", "user1", "key"])
with mock_env(SOPS_AGE_KEY=age_keys[1].privkey, SOPS_AGE_KEY_FILE=""):
capsys.readouterr()
cli.run(["get", "key"])
cli.run(["secrets", "get", "key"])
assert capsys.readouterr().out == "foo"
cli.run(["users", "remove-secret", "user1", "key"])
cli.run(["secrets", "users", "remove-secret", "user1", "key"])
with pytest.raises(ClanError): # does not exist yet
cli.run(["groups", "add-secret", "admin-group", "key"])
cli.run(["groups", "add-user", "admin-group", "user1"])
cli.run(["groups", "add-user", "admin-group", owner])
cli.run(["groups", "add-secret", "admin-group", "key"])
cli.run(["secrets", "groups", "add-secret", "admin-group", "key"])
cli.run(["secrets", "groups", "add-user", "admin-group", "user1"])
cli.run(["secrets", "groups", "add-user", "admin-group", owner])
cli.run(["secrets", "groups", "add-secret", "admin-group", "key"])
capsys.readouterr() # empty the buffer
cli.run(["set", "--group", "admin-group", "key2"])
cli.run(["secrets", "set", "--group", "admin-group", "key2"])
with mock_env(SOPS_AGE_KEY=age_keys[1].privkey, SOPS_AGE_KEY_FILE=""):
capsys.readouterr()
cli.run(["get", "key"])
cli.run(["secrets", "get", "key"])
assert capsys.readouterr().out == "foo"
cli.run(["groups", "remove-secret", "admin-group", "key"])
cli.run(["secrets", "groups", "remove-secret", "admin-group", "key"])
cli.run(["remove", "key"])
cli.run(["remove", "key2"])
cli.run(["secrets", "remove", "key"])
cli.run(["secrets", "remove", "key2"])
capsys.readouterr() # empty the buffer
cli.run(["list"])
cli.run(["secrets", "list"])
assert capsys.readouterr().out == ""