diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index db869b07..ef998d44 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -19,6 +19,39 @@ from .sops import decrypt_file, encrypt_file, ensure_sops_key, read_key, update_ from .types import VALID_SECRET_NAME, secret_name_type +def collect_keys_for_type(folder: Path) -> set[str]: + if not folder.exists(): + return set() + keys = set() + for p in folder.iterdir(): + if not p.is_symlink(): + continue + try: + target = p.resolve() + except FileNotFoundError: + tty.warn(f"Ignoring broken symlink {p}") + continue + kind = target.parent.name + if folder.name != kind: + tty.warn(f"Expected {p} to point to {folder} but points to {target.parent}") + continue + keys.add(read_key(target)) + return keys + + +def collect_keys_for_path(path: Path) -> set[str]: + keys = set([]) + keys.update(collect_keys_for_type(path / "machines")) + keys.update(collect_keys_for_type(path / "users")) + groups = path / "groups" + if not groups.is_dir(): + return keys + for group in groups.iterdir(): + keys.update(collect_keys_for_type(group / "machines")) + keys.update(collect_keys_for_type(group / "users")) + return keys + + def encrypt_secret( secret: Path, value: Union[IO[str], str], @@ -40,11 +73,7 @@ def encrypt_secret( for group in add_groups: allow_member(groups_folder(secret.name), sops_groups_folder(), group, False) - for kind in ["users", "machines", "groups"]: - if not (sops_secrets_folder() / kind).is_dir(): - continue - k = read_key(sops_secrets_folder() / kind) - keys.add(k) + keys = collect_keys_for_path(secret) if key.pubkey not in keys: keys.add(key.pubkey) @@ -79,39 +108,6 @@ def groups_folder(group: str) -> Path: return sops_secrets_folder() / group / "groups" -def collect_keys_for_type(folder: Path) -> list[str]: - if not folder.exists(): - return [] - keys = [] - for p in folder.iterdir(): - if not p.is_symlink(): - continue - try: - target = p.resolve() - except FileNotFoundError: - tty.warn(f"Ignoring broken symlink {p}") - continue - kind = target.parent.name - if folder.name != kind: - tty.warn(f"Expected {p} to point to {folder} but points to {target.parent}") - continue - keys.append(read_key(target)) - return keys - - -def collect_keys_for_path(path: Path) -> list[str]: - keys = [] - keys += collect_keys_for_type(path / "machines") - keys += collect_keys_for_type(path / "users") - groups = path / "groups" - if not groups.is_dir(): - return keys - for group in groups.iterdir(): - keys += collect_keys_for_type(group / "machines") - keys += collect_keys_for_type(group / "users") - return keys - - def allow_member( group_folder: Path, source_folder: Path, name: str, do_update_keys: bool = True ) -> None: @@ -129,7 +125,10 @@ def allow_member( user_target.symlink_to(os.path.relpath(source, user_target.parent)) if do_update_keys: - update_keys(group_folder.parent, collect_keys_for_path(group_folder.parent)) + update_keys( + group_folder.parent, + list(sorted(collect_keys_for_path(group_folder.parent))), + ) def disallow_member(group_folder: Path, name: str) -> None: @@ -151,7 +150,9 @@ def disallow_member(group_folder: Path, name: str) -> None: if len(os.listdir(group_folder.parent)) == 0: os.rmdir(group_folder.parent) - update_keys(target.parent.parent, collect_keys_for_path(group_folder.parent)) + update_keys( + target.parent.parent, list(sorted(collect_keys_for_path(group_folder.parent))) + ) def list_command(args: argparse.Namespace) -> None: diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index dc9595b2..80650ee4 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -138,9 +138,7 @@ def encrypt_file( folder.mkdir(parents=True, exist_ok=True) # hopefully /tmp is written to an in-memory file to avoid leaking secrets - with NamedTemporaryFile(delete=False) as dummy_manifest_file, NamedTemporaryFile( - delete=False - ) as f: + with sops_manifest(keys) as manifest, NamedTemporaryFile(delete=False) as f: try: with open(f.name, "w") as fd: if isinstance(content, str): @@ -148,9 +146,7 @@ def encrypt_file( else: shutil.copyfileobj(content, fd) # we pass an empty manifest to pick up existing configuration of the user - args = ["sops", "--config", dummy_manifest_file.name] - for key in keys: - args.extend(["--age", key]) + args = ["sops", "--config", str(manifest)] args.extend(["-i", "--encrypt", str(f.name)]) cmd = nix_shell(["sops"], args) subprocess.run(cmd, check=True) diff --git a/pkgs/clan-cli/default.nix b/pkgs/clan-cli/default.nix index f9623c89..9205d0dc 100644 --- a/pkgs/clan-cli/default.nix +++ b/pkgs/clan-cli/default.nix @@ -79,6 +79,11 @@ python3.pkgs.buildPythonPackage { ''; checkPhase = '' PYTHONPATH= $out/bin/clan --help + if grep --include \*.py -q "breakpoint()" $out; then + echo "breakpoint() found in $out:" + grep --include \*.py -Rn "breakpoint()" $out + exit 1 + fi ''; meta.mainProgram = "clan"; } diff --git a/pkgs/clan-cli/tests/conftest.py b/pkgs/clan-cli/tests/conftest.py index 48134764..7bde26d7 100644 --- a/pkgs/clan-cli/tests/conftest.py +++ b/pkgs/clan-cli/tests/conftest.py @@ -3,4 +3,4 @@ import sys sys.path.append(os.path.join(os.path.dirname(__file__), "helpers")) -pytest_plugins = ["temporary_dir", "clan_flake", "root"] +pytest_plugins = ["temporary_dir", "clan_flake", "root", "test_keys"] diff --git a/pkgs/clan-cli/tests/helpers/secret_cli.py b/pkgs/clan-cli/tests/helpers/secret_cli.py new file mode 100644 index 00000000..d43408d0 --- /dev/null +++ b/pkgs/clan-cli/tests/helpers/secret_cli.py @@ -0,0 +1,14 @@ +import argparse + +from clan_cli.secrets import register_parser + + +class SecretCli: + def __init__(self) -> None: + self.parser = argparse.ArgumentParser() + register_parser(self.parser) + + def run(self, args: list[str]) -> argparse.Namespace: + parsed = self.parser.parse_args(args) + parsed.func(parsed) + return parsed diff --git a/pkgs/clan-cli/tests/test_import_sops.py b/pkgs/clan-cli/tests/test_import_sops.py new file mode 100644 index 00000000..b578dd0f --- /dev/null +++ b/pkgs/clan-cli/tests/test_import_sops.py @@ -0,0 +1,46 @@ +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest +from environment import mock_env +from secret_cli import SecretCli + +if TYPE_CHECKING: + from test_keys import KeyPair + + +def test_import_sops( + test_root: Path, + clan_flake: Path, + capsys: pytest.CaptureFixture, + test_keys: list["KeyPair"], +) -> None: + cli = SecretCli() + + with mock_env(SOPS_AGE_KEY=test_keys[1].privkey): + cli.run(["machines", "add", "machine1", test_keys[0].pubkey]) + cli.run(["users", "add", "user1", test_keys[1].pubkey]) + cli.run(["users", "add", "user2", test_keys[2].pubkey]) + cli.run(["groups", "add-user", "group1", "user1"]) + cli.run(["groups", "add-user", "group1", "user2"]) + + # To edit: + # SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml + cli.run( + [ + "import-sops", + "--group", + "group1", + "--machine", + "machine1", + str(test_root.joinpath("data", "secrets.yaml")), + ] + ) + capsys.readouterr() + cli.run(["users", "list"]) + users = sorted(capsys.readouterr().out.rstrip().split()) + assert users == ["user1", "user2"] + + capsys.readouterr() + cli.run(["get", "secret-key"]) + assert capsys.readouterr().out == "secret-value" diff --git a/pkgs/clan-cli/tests/test_keys.py b/pkgs/clan-cli/tests/test_keys.py new file mode 100644 index 00000000..518a2bec --- /dev/null +++ b/pkgs/clan-cli/tests/test_keys.py @@ -0,0 +1,31 @@ +import pytest + + +class KeyPair: + def __init__(self, pubkey: str, privkey: str) -> None: + self.pubkey = pubkey + self.privkey = privkey + + +KEYS = [ + KeyPair( + "age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c", + "AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK", + ), + KeyPair( + "age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62", + "AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ", + ), + KeyPair( + "age1dhuh9xtefhgpr2sjjf7gmp9q2pr37z92rv4wsadxuqdx48989g7qj552qp", + "AGE-SECRET-KEY-169N3FT32VNYQ9WYJMLUSVTMA0TTZGVJF7YZWS8AHTWJ5RR9VGR7QCD8SKF", + ), +] + + +@pytest.fixture +def test_keys() -> list[KeyPair]: + """ + Root directory of the tests + """ + return KEYS diff --git a/pkgs/clan-cli/tests/test_secrets.py b/pkgs/clan-cli/tests/test_secrets.py index 50c9f7b7..9aac1e68 100644 --- a/pkgs/clan-cli/tests/test_secrets.py +++ b/pkgs/clan-cli/tests/test_secrets.py @@ -1,45 +1,30 @@ -import argparse import os from pathlib import Path +from typing import TYPE_CHECKING import pytest from environment import mock_env +from secret_cli import SecretCli from clan_cli.errors import ClanError -from clan_cli.secrets import register_parser - -class SecretCli: - def __init__(self) -> None: - self.parser = argparse.ArgumentParser() - register_parser(self.parser) - - def run(self, args: list[str]) -> argparse.Namespace: - parsed = self.parser.parse_args(args) - parsed.func(parsed) - return parsed - - -PUBKEY = "age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c" -PRIVKEY = "AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK" - -PUBKEY_2 = "age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62" -PRIVKEY_2 = "AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ" - -PUBKEY_3 = "age1dhuh9xtefhgpr2sjjf7gmp9q2pr37z92rv4wsadxuqdx48989g7qj552qp" -PRIVKEY_3 = "AGE-SECRET-KEY-169N3FT32VNYQ9WYJMLUSVTMA0TTZGVJF7YZWS8AHTWJ5RR9VGR7QCD8SKF" +if TYPE_CHECKING: + from test_keys import KeyPair def _test_identities( - what: str, clan_flake: Path, capsys: pytest.CaptureFixture + what: str, + clan_flake: Path, + capsys: pytest.CaptureFixture, + test_keys: list["KeyPair"], ) -> None: cli = SecretCli() sops_folder = clan_flake / "sops" - cli.run([what, "add", "foo", PUBKEY]) + cli.run([what, "add", "foo", test_keys[0].pubkey]) assert (sops_folder / what / "foo" / "key.json").exists() with pytest.raises(ClanError): - cli.run([what, "add", "foo", PUBKEY]) + cli.run([what, "add", "foo", test_keys[0].pubkey]) cli.run( [ @@ -47,7 +32,7 @@ def _test_identities( "add", "-f", "foo", - PRIVKEY, + test_keys[0].privkey, ] ) capsys.readouterr() # empty the buffer @@ -68,15 +53,21 @@ def _test_identities( assert "foo" not in out.out -def test_users(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: - _test_identities("users", clan_flake, capsys) +def test_users( + clan_flake: Path, capsys: pytest.CaptureFixture, test_keys: list["KeyPair"] +) -> None: + _test_identities("users", clan_flake, capsys, test_keys) -def test_machines(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: - _test_identities("machines", clan_flake, capsys) +def test_machines( + clan_flake: Path, capsys: pytest.CaptureFixture, test_keys: list["KeyPair"] +) -> None: + _test_identities("machines", clan_flake, capsys, test_keys) -def test_groups(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: +def test_groups( + clan_flake: Path, capsys: pytest.CaptureFixture, test_keys: list["KeyPair"] +) -> None: cli = SecretCli() capsys.readouterr() # empty the buffer cli.run(["groups", "list"]) @@ -86,13 +77,13 @@ def test_groups(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: cli.run(["groups", "add-machine", "group1", "machine1"]) with pytest.raises(ClanError): # user does not exist yet cli.run(["groups", "add-user", "groupb1", "user1"]) - cli.run(["machines", "add", "machine1", PUBKEY]) + cli.run(["machines", "add", "machine1", test_keys[0].pubkey]) cli.run(["groups", "add-machine", "group1", "machine1"]) # Should this fail? cli.run(["groups", "add-machine", "group1", "machine1"]) - cli.run(["users", "add", "user1", PUBKEY]) + cli.run(["users", "add", "user1", test_keys[0].pubkey]) cli.run(["groups", "add-user", "group1", "user1"]) capsys.readouterr() # empty the buffer @@ -107,7 +98,9 @@ def test_groups(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: assert len(groups) == 0 -def test_secrets(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: +def test_secrets( + clan_flake: Path, capsys: pytest.CaptureFixture, test_keys: list["KeyPair"] +) -> None: cli = SecretCli() capsys.readouterr() # empty the buffer cli.run(["list"]) @@ -132,18 +125,18 @@ def test_secrets(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: cli.run(["list"]) assert capsys.readouterr().out == "key\n" - cli.run(["machines", "add", "machine1", PUBKEY]) + cli.run(["machines", "add", "machine1", test_keys[0].pubkey]) cli.run(["machines", "add-secret", "machine1", "key"]) - with mock_env(SOPS_AGE_KEY=PRIVKEY, SOPS_AGE_KEY_FILE=""): + with mock_env(SOPS_AGE_KEY=test_keys[1].privkey, SOPS_AGE_KEY_FILE=""): capsys.readouterr() cli.run(["get", "key"]) assert capsys.readouterr().out == "foo" cli.run(["machines", "remove-secret", "machine1", "key"]) - cli.run(["users", "add", "user1", PUBKEY_2]) + cli.run(["users", "add", "user1", test_keys[1].pubkey]) cli.run(["users", "add-secret", "user1", "key"]) - with mock_env(SOPS_AGE_KEY=PRIVKEY_2, SOPS_AGE_KEY_FILE=""): + with mock_env(SOPS_AGE_KEY=test_keys[1].privkey, SOPS_AGE_KEY_FILE=""): capsys.readouterr() cli.run(["get", "key"]) assert capsys.readouterr().out == "foo" @@ -158,7 +151,7 @@ def test_secrets(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: capsys.readouterr() # empty the buffer cli.run(["set", "--group", "admin-group", "key2"]) - with mock_env(SOPS_AGE_KEY=PRIVKEY_2, SOPS_AGE_KEY_FILE=""): + with mock_env(SOPS_AGE_KEY=test_keys[1].privkey, SOPS_AGE_KEY_FILE=""): capsys.readouterr() cli.run(["get", "key"]) assert capsys.readouterr().out == "foo" @@ -170,35 +163,3 @@ def test_secrets(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: capsys.readouterr() # empty the buffer cli.run(["list"]) assert capsys.readouterr().out == "" - - -def test_import_sops( - test_root: Path, clan_flake: Path, capsys: pytest.CaptureFixture -) -> None: - cli = SecretCli() - - with mock_env(SOPS_AGE_KEY=PRIVKEY_2): - cli.run(["machines", "add", "machine1", PUBKEY]) - cli.run(["users", "add", "user1", PUBKEY_2]) - cli.run(["users", "add", "user2", PUBKEY_3]) - - # To edit: - # SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml - cli.run( - [ - "import-sops", - "--user", - "user1", - "--machine", - "machine1", - str(test_root.joinpath("data", "secrets.yaml")), - ] - ) - capsys.readouterr() - cli.run(["users", "list"]) - users = sorted(capsys.readouterr().out.rstrip().split()) - assert users == ["user1", "user2"] - - capsys.readouterr() - cli.run(["get", "secret-key"]) - assert capsys.readouterr().out == "secret-value"