diff --git a/pkgs/clan-cli/clan_cli/secrets/groups.py b/pkgs/clan-cli/clan_cli/secrets/groups.py index 264c43b7..c94866eb 100644 --- a/pkgs/clan-cli/clan_cli/secrets/groups.py +++ b/pkgs/clan-cli/clan_cli/secrets/groups.py @@ -102,12 +102,12 @@ def add_group_argument(parser: argparse.ArgumentParser) -> None: def add_secret_command(args: argparse.Namespace) -> None: secrets.allow_member( - secrets.groups_folder(args.group), sops_machines_folder(), args.group + secrets.groups_folder(args.secret), sops_groups_folder(), args.group ) def remove_secret_command(args: argparse.Namespace) -> None: - secrets.disallow_member(secrets.groups_folder(args.group), args.group) + secrets.disallow_member(secrets.groups_folder(args.secret), args.group) def register_groups_parser(parser: argparse.ArgumentParser) -> None: diff --git a/pkgs/clan-cli/clan_cli/secrets/machines.py b/pkgs/clan-cli/clan_cli/secrets/machines.py index 9f7692aa..d7a2ffb0 100644 --- a/pkgs/clan-cli/clan_cli/secrets/machines.py +++ b/pkgs/clan-cli/clan_cli/secrets/machines.py @@ -25,12 +25,12 @@ def remove_command(args: argparse.Namespace) -> None: def add_secret_command(args: argparse.Namespace) -> None: secrets.allow_member( - secrets.machines_folder(args.group), sops_machines_folder(), args.machine + secrets.machines_folder(args.secret), sops_machines_folder(), args.machine ) def remove_secret_command(args: argparse.Namespace) -> None: - secrets.disallow_member(secrets.machines_folder(args.group), args.machine) + secrets.disallow_member(secrets.machines_folder(args.secret), args.machine) def register_machines_parser(parser: argparse.ArgumentParser) -> None: diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index 52c62c6f..9e7344ea 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -11,8 +11,8 @@ from typing import IO from .. import tty from ..errors import ClanError from ..nix import nix_shell -from .folders import list_objects, sops_secrets_folder -from .sops import SopsKey, encrypt_file, ensure_sops_key, read_key +from .folders import list_objects, sops_secrets_folder, sops_users_folder +from .sops import SopsKey, encrypt_file, ensure_sops_key, read_key, update_keys from .types import VALID_SECRET_NAME, secret_name_type @@ -54,6 +54,9 @@ def set_command(args: argparse.Namespace) -> None: else: encrypt_secret(key, sops_secrets_folder() / args.secret, sys.stdin) + # make sure we add ourselves to the key + allow_member(users_folder(args.secret), sops_users_folder(), key.username) + def remove_command(args: argparse.Namespace) -> None: secret: str = args.secret @@ -67,6 +70,51 @@ def add_secret_argument(parser: argparse.ArgumentParser) -> None: parser.add_argument("secret", help="the name of the secret", type=secret_name_type) +def machines_folder(group: str) -> Path: + return sops_secrets_folder() / group / "machines" + + +def users_folder(group: str) -> Path: + return sops_secrets_folder() / group / "users" + + +def groups_folder(group: str) -> Path: + return sops_secrets_folder() / group / "groups" + + +def collect_keys_for_type(folder: Path) -> list[str]: + if not folder.exists(): + return [] + keys = [] + for p in folder.iterdir(): + if not p.is_symlink(): + continue + try: + target = p.resolve() + except FileNotFoundError: + tty.warn(f"Ignoring broken symlink {p}") + continue + kind = target.parent.name + if folder.name != kind: + tty.warn(f"Expected {p} to point to {folder} but points to {target.parent}") + continue + keys.append(read_key(target)) + return keys + + +def collect_keys_for_path(path: Path) -> list[str]: + keys = [] + keys += collect_keys_for_type(path / "machines") + keys += collect_keys_for_type(path / "users") + groups = path / "groups" + if not groups.is_dir(): + return keys + for group in groups.iterdir(): + keys += collect_keys_for_type(group / "machines") + keys += collect_keys_for_type(group / "users") + return keys + + def allow_member(group_folder: Path, source_folder: Path, name: str) -> None: source = source_folder / name if not source.exists(): @@ -80,12 +128,20 @@ def allow_member(group_folder: Path, source_folder: Path, name: str) -> None: ) os.remove(user_target) user_target.symlink_to(source) + update_keys(group_folder.parent, collect_keys_for_path(group_folder.parent)) def disallow_member(group_folder: Path, name: str) -> None: target = group_folder / name if not target.exists(): raise ClanError(f"{name} does not exist in group in {group_folder}") + + keys = collect_keys_for_path(group_folder.parent) + + if len(keys) < 2: + raise ClanError( + f"Cannot remove {name} from {group_folder.parent.name}. No keys left. Use 'clan secrets remove {name}' to remove the secret." + ) os.remove(target) if len(os.listdir(group_folder)) == 0: @@ -94,17 +150,7 @@ def disallow_member(group_folder: Path, name: str) -> None: if len(os.listdir(group_folder.parent)) == 0: os.rmdir(group_folder.parent) - -def machines_folder(group: str) -> Path: - return sops_secrets_folder() / group / "machines" - - -def users_folder(group: str) -> Path: - return sops_secrets_folder() / group / "users" - - -def groups_folder(group: str) -> Path: - return sops_secrets_folder() / group / "groups" + update_keys(target.parent.parent, collect_keys_for_path(group_folder.parent)) def register_secrets_parser(subparser: argparse._SubParsersAction) -> None: diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 61b80f5f..4d41c5a3 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -2,9 +2,10 @@ import json import os import shutil import subprocess +from contextlib import contextmanager from pathlib import Path from tempfile import NamedTemporaryFile -from typing import IO +from typing import IO, Iterator from .. import tty from ..dirs import user_config_dir @@ -14,8 +15,9 @@ from .folders import sops_users_folder class SopsKey: - def __init__(self, pubkey: str) -> None: + def __init__(self, pubkey: str, username: str) -> None: self.pubkey = pubkey + self.username = username def get_public_key(privkey: str) -> str: @@ -51,7 +53,7 @@ def get_user_name(user: str) -> str: def ensure_user(pub_key: str) -> SopsKey: - key = SopsKey(pub_key) + key = SopsKey(pub_key, username="") users_folder = sops_users_folder() # Check if the public key already exists for any user @@ -60,6 +62,7 @@ def ensure_user(pub_key: str) -> SopsKey: if not user.is_dir(): continue if read_key(user) == pub_key: + key.username = user.name return key # Find a unique user name if the public key is not found @@ -76,6 +79,8 @@ def ensure_user(pub_key: str) -> SopsKey: # Add the public key for the user write_key(users_folder / username, pub_key, False) + key.username = username + return key @@ -100,6 +105,32 @@ def ensure_sops_key() -> SopsKey: return ensure_user(get_public_key(path.read_text())) +@contextmanager +def sops_manifest(keys: list[str]) -> Iterator[Path]: + with NamedTemporaryFile(delete=False, mode="w") as manifest: + json.dump( + dict(creation_rules=[dict(key_groups=[dict(age=keys)])]), manifest, indent=2 + ) + manifest.flush() + yield Path(manifest.name) + + +def update_keys(secret_path: Path, keys: list[str]) -> None: + with sops_manifest(keys) as manifest: + cmd = nix_shell( + ["sops"], + [ + "sops", + "--config", + str(manifest), + "updatekeys", + "--yes", + str(secret_path / "secret"), + ], + ) + subprocess.run(cmd, check=True) + + def encrypt_file(secret_path: Path, content: IO[str], keys: list[str]) -> None: folder = secret_path.parent folder.mkdir(parents=True, exist_ok=True) diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index eccf8a44..ef0c29ec 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -29,12 +29,12 @@ def remove_command(args: argparse.Namespace) -> None: def add_secret_command(args: argparse.Namespace) -> None: secrets.allow_member( - secrets.groups_folder(args.group), sops_users_folder(), args.group + secrets.users_folder(args.secret), sops_users_folder(), args.user ) def remove_secret_command(args: argparse.Namespace) -> None: - secrets.disallow_member(secrets.groups_folder(args.group), args.group) + secrets.disallow_member(secrets.users_folder(args.secret), args.user) def register_users_parser(parser: argparse.ArgumentParser) -> None: @@ -74,21 +74,10 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None: ) add_secret_parser.set_defaults(func=add_secret_command) - add_secret_parser = subparser.add_parser( - "add-secret", help="allow a machine to access a secret" - ) - add_secret_parser.add_argument( - "user", help="the name of the group", type=user_name_type - ) - add_secret_parser.add_argument( - "secret", help="the name of the secret", type=secret_name_type - ) - add_secret_parser.set_defaults(func=add_secret_command) - remove_secret_parser = subparser.add_parser( "remove-secret", help="remove a user's access to a secret" ) - add_secret_parser.add_argument( + remove_secret_parser.add_argument( "user", help="the name of the group", type=user_name_type ) remove_secret_parser.add_argument( diff --git a/pkgs/clan-cli/tests/test_secrets.py b/pkgs/clan-cli/tests/test_secrets.py index b9425abf..31166633 100644 --- a/pkgs/clan-cli/tests/test_secrets.py +++ b/pkgs/clan-cli/tests/test_secrets.py @@ -23,6 +23,9 @@ class SecretCli: PUBKEY = "age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c" PRIVKEY = "AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK" +PUBKEY_2 = "age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62" +PRIVKEY_2 = "AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ" + def _test_identities( what: str, clan_flake: Path, capsys: pytest.CaptureFixture @@ -123,6 +126,34 @@ def test_secrets( cli.run(["list"]) assert capsys.readouterr().out == "key\n" + cli.run(["machines", "add", "machine1", PUBKEY]) + cli.run(["machines", "add-secret", "machine1", "key"]) + + with mock_env(SOPS_AGE_KEY=PRIVKEY, SOPS_AGE_KEY_FILE=""): + capsys.readouterr() + cli.run(["get", "key"]) + assert capsys.readouterr().out == "foo" + cli.run(["machines", "remove-secret", "machine1", "key"]) + + cli.run(["users", "add", "user1", PUBKEY_2]) + cli.run(["users", "add-secret", "user1", "key"]) + with mock_env(SOPS_AGE_KEY=PRIVKEY_2, SOPS_AGE_KEY_FILE=""): + capsys.readouterr() + cli.run(["get", "key"]) + assert capsys.readouterr().out == "foo" + cli.run(["users", "remove-secret", "user1", "key"]) + + with pytest.raises(ClanError): # does not exist yet + cli.run(["groups", "add-secret", "admin-group", "key"]) + cli.run(["groups", "add-user", "admin-group", "user1"]) + cli.run(["groups", "add-secret", "admin-group", "key"]) + + with mock_env(SOPS_AGE_KEY=PRIVKEY_2, SOPS_AGE_KEY_FILE=""): + capsys.readouterr() + cli.run(["get", "key"]) + assert capsys.readouterr().out == "foo" + cli.run(["groups", "remove-secret", "admin-group", "key"]) + cli.run(["remove", "key"]) capsys.readouterr() # empty the buffer