diff --git a/pkgs/clan-cli/clan_cli/secrets/import_sops.py b/pkgs/clan-cli/clan_cli/secrets/import_sops.py index 4bce9a34..a8355606 100644 --- a/pkgs/clan-cli/clan_cli/secrets/import_sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/import_sops.py @@ -29,20 +29,61 @@ def import_sops(args: argparse.Namespace) -> None: raise ClanError(f"Could not import sops file {file}: {e}") from e secrets = json.loads(res.stdout) for k, v in secrets.items(): + k = args.prefix + k if not isinstance(v, str): print( f"WARNING: {k} is not a string but {type(v)}, skipping", file=sys.stderr, ) continue - encrypt_secret(sops_secrets_folder() / k, v) + if (sops_secrets_folder() / k).exists(): + print( + f"WARNING: {k} already exists, skipping", + file=sys.stderr, + ) + continue + encrypt_secret( + sops_secrets_folder() / k, + v, + add_groups=args.group, + add_machines=args.machine, + add_users=args.user, + ) def register_import_sops_parser(parser: argparse.ArgumentParser) -> None: parser.add_argument( - "--input_type", + "--input-type", type=str, - help="the input type of the sops file (yaml, json, ...)", + default=None, + help="the input type of the sops file (yaml, json, ...). If not specified, it will be guessed from the file extension", + ) + parser.add_argument( + "--group", + type=str, + action="append", + default=[], + help="the group to import the secrets to", + ) + parser.add_argument( + "--machine", + type=str, + action="append", + default=[], + help="the machine to import the secrets to", + ) + parser.add_argument( + "--user", + type=str, + action="append", + default=[], + help="the user to import the secrets to", + ) + parser.add_argument( + "--prefix", + type=str, + default="", + help="the prefix to use for the secret names", ) parser.add_argument( "sops_file", diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index 319b92e6..420fc672 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -9,50 +9,52 @@ from typing import IO, Union from .. import tty from ..errors import ClanError -from .folders import list_objects, sops_secrets_folder, sops_users_folder +from .folders import ( + list_objects, + sops_groups_folder, + sops_machines_folder, + sops_secrets_folder, + sops_users_folder, +) from .sops import decrypt_file, encrypt_file, ensure_sops_key, read_key, update_keys from .types import VALID_SECRET_NAME, secret_name_type -def list_command(args: argparse.Namespace) -> None: - list_objects( - sops_secrets_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None - ) - - -def get_command(args: argparse.Namespace) -> None: - secret: str = args.secret - ensure_sops_key() - secret_path = sops_secrets_folder() / secret / "secret" - if not secret_path.exists(): - raise ClanError(f"Secret '{secret}' does not exist") - print(decrypt_file(secret_path), end="") - - -def encrypt_secret(secret: Path, value: Union[IO[str], str]) -> None: +def encrypt_secret( + secret: Path, + value: Union[IO[str], str], + add_users: list[str] = [], + add_machines: list[str] = [], + add_groups: list[str] = [], +) -> None: key = ensure_sops_key() - keys = set([key.pubkey]) + keys = set([]) + + for user in add_users: + allow_member(users_folder(secret.name), sops_users_folder(), user, False) + + for machine in add_machines: + allow_member( + machines_folder(secret.name), sops_machines_folder(), machine, False + ) + + for group in add_groups: + allow_member(groups_folder(secret.name), sops_groups_folder(), group, False) + for kind in ["users", "machines", "groups"]: if not (sops_secrets_folder() / kind).is_dir(): continue k = read_key(sops_secrets_folder() / kind) keys.add(k) + + if key.pubkey not in keys: + keys.add(key.pubkey) + allow_member( + users_folder(secret.name), sops_users_folder(), key.username, False + ) + encrypt_file(secret / "secret", value, list(sorted(keys))) - # make sure we add ourselves to the key - allow_member(users_folder(secret.name), sops_users_folder(), key.username) - - -def set_command(args: argparse.Namespace) -> None: - secret_value = os.environ.get("SOPS_NIX_SECRET") - if secret_value: - encrypt_secret(sops_secrets_folder() / args.secret, StringIO(secret_value)) - elif tty.is_interactive(): - secret = getpass.getpass(prompt="Paste your secret: ") - encrypt_secret(sops_secrets_folder() / args.secret, StringIO(secret)) - else: - encrypt_secret(sops_secrets_folder() / args.secret, sys.stdin) - def remove_command(args: argparse.Namespace) -> None: secret: str = args.secret @@ -111,7 +113,9 @@ def collect_keys_for_path(path: Path) -> list[str]: return keys -def allow_member(group_folder: Path, source_folder: Path, name: str) -> None: +def allow_member( + group_folder: Path, source_folder: Path, name: str, do_update_keys: bool = True +) -> None: source = source_folder / name if not source.exists(): raise ClanError(f"{name} does not exist in {source_folder}") @@ -125,7 +129,8 @@ def allow_member(group_folder: Path, source_folder: Path, name: str) -> None: os.remove(user_target) user_target.symlink_to(os.path.relpath(source, user_target.parent)) - update_keys(group_folder.parent, collect_keys_for_path(group_folder.parent)) + if do_update_keys: + update_keys(group_folder.parent, collect_keys_for_path(group_folder.parent)) def disallow_member(group_folder: Path, name: str) -> None: @@ -150,6 +155,32 @@ def disallow_member(group_folder: Path, name: str) -> None: update_keys(target.parent.parent, collect_keys_for_path(group_folder.parent)) +def list_command(args: argparse.Namespace) -> None: + list_objects( + sops_secrets_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None + ) + + +def get_command(args: argparse.Namespace) -> None: + secret: str = args.secret + ensure_sops_key() + secret_path = sops_secrets_folder() / secret / "secret" + if not secret_path.exists(): + raise ClanError(f"Secret '{secret}' does not exist") + print(decrypt_file(secret_path), end="") + + +def set_command(args: argparse.Namespace) -> None: + secret_value = os.environ.get("SOPS_NIX_SECRET") + if secret_value: + encrypt_secret(sops_secrets_folder() / args.secret, StringIO(secret_value)) + elif tty.is_interactive(): + secret = getpass.getpass(prompt="Paste your secret: ") + encrypt_secret(sops_secrets_folder() / args.secret, StringIO(secret)) + else: + encrypt_secret(sops_secrets_folder() / args.secret, sys.stdin) + + def register_secrets_parser(subparser: argparse._SubParsersAction) -> None: parser_list = subparser.add_parser("list", help="list secrets") parser_list.set_defaults(func=list_command) diff --git a/pkgs/clan-cli/tests/test_secrets.py b/pkgs/clan-cli/tests/test_secrets.py index 2e8ca612..d944f816 100644 --- a/pkgs/clan-cli/tests/test_secrets.py +++ b/pkgs/clan-cli/tests/test_secrets.py @@ -26,6 +26,9 @@ PRIVKEY = "AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSS PUBKEY_2 = "age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62" PRIVKEY_2 = "AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ" +PUBKEY_3 = "age1dhuh9xtefhgpr2sjjf7gmp9q2pr37z92rv4wsadxuqdx48989g7qj552qp" +PRIVKEY_3 = "AGE-SECRET-KEY-169N3FT32VNYQ9WYJMLUSVTMA0TTZGVJF7YZWS8AHTWJ5RR9VGR7QCD8SKF" + def _test_identities( what: str, clan_flake: Path, capsys: pytest.CaptureFixture @@ -110,11 +113,11 @@ def test_secrets(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: cli.run(["list"]) assert capsys.readouterr().out == "" - with pytest.raises(ClanError): # does not exist yet - cli.run(["get", "nonexisting"]) with mock_env( SOPS_NIX_SECRET="foo", SOPS_AGE_KEY_FILE=str(clan_flake / ".." / "age.key") ): + with pytest.raises(ClanError): # does not exist yet + cli.run(["get", "nonexisting"]) cli.run(["set", "key"]) capsys.readouterr() cli.run(["get", "key"]) @@ -165,9 +168,21 @@ def test_import_sops( cli = SecretCli() with mock_env(SOPS_AGE_KEY=PRIVKEY_2): + cli.run(["machines", "add", "machine1", PUBKEY]) + cli.run(["users", "add", "user1", PUBKEY_3]) + # To edit: # SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml - cli.run(["import-sops", str(test_root.joinpath("data", "secrets.yaml"))]) + cli.run( + [ + "import-sops", + "--user", + "user1", + "--machine", + "machine1", + str(test_root.joinpath("data", "secrets.yaml")), + ] + ) capsys.readouterr() cli.run(["get", "secret-key"]) assert capsys.readouterr().out == "secret-value"