add secrets integration

clan-cli: also depend on age for secrets
This commit is contained in:
Jörg Thalheim 2023-07-26 15:21:57 +02:00 committed by Mic92
parent dd96877b9e
commit 658c76336f
12 changed files with 910 additions and 3 deletions

View File

@ -2,7 +2,7 @@
import argparse
import sys
from . import admin, ssh
from . import admin, secrets, ssh
from .errors import ClanError
has_argcomplete = True
@ -23,6 +23,9 @@ def main() -> None:
parser_ssh = subparsers.add_parser("ssh", help="ssh to a remote machine")
ssh.register_parser(parser_ssh)
parser_secrets = subparsers.add_parser("secrets", help="manage secrets")
secrets.register_parser(parser_secrets)
if has_argcomplete:
argcomplete.autocomplete(parser)

View File

@ -0,0 +1,28 @@
# !/usr/bin/env python3
import argparse
from .groups import register_groups_parser
from .machines import register_machines_parser
from .secrets import register_secrets_parser
from .users import register_users_parser
# takes a (sub)parser and configures it
def register_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers(
title="command",
description="the command to run",
help="the command to run",
required=True,
)
groups_parser = subparser.add_parser("groups", help="manage groups")
register_groups_parser(groups_parser)
users_parser = subparser.add_parser("users", help="manage users")
register_users_parser(users_parser)
machines_parser = subparser.add_parser("machines", help="manage machines")
register_machines_parser(machines_parser)
register_secrets_parser(subparser)

View File

@ -0,0 +1,71 @@
import json
import os
import shutil
from pathlib import Path
from typing import Callable
from ..dirs import get_clan_flake_toplevel
from ..errors import ClanError
def get_sops_folder() -> Path:
return get_clan_flake_toplevel() / "sops"
def gen_sops_subfolder(subdir: str) -> Callable[[], Path]:
def folder() -> Path:
return get_clan_flake_toplevel() / "sops" / subdir
return folder
sops_secrets_folder = gen_sops_subfolder("secrets")
sops_users_folder = gen_sops_subfolder("users")
sops_machines_folder = gen_sops_subfolder("machines")
sops_groups_folder = gen_sops_subfolder("groups")
def list_objects(path: Path, is_valid: Callable[[str], bool]) -> None:
if not path.exists():
return
for f in os.listdir(path):
if is_valid(f):
print(f)
def remove_object(path: Path, name: str) -> None:
try:
shutil.rmtree(path / name)
except FileNotFoundError:
raise ClanError(f"{name} not found in {path}")
if not os.listdir(path):
os.rmdir(path)
def add_key(path: Path, publickey: str, overwrite: bool) -> None:
path.mkdir(parents=True, exist_ok=True)
try:
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
if not overwrite:
flags |= os.O_EXCL
fd = os.open(path / "key.json", flags)
except FileExistsError:
raise ClanError(f"{path.name} already exists in {path}")
with os.fdopen(fd, "w") as f:
json.dump({"publickey": publickey, "type": "age"}, f, indent=2)
def read_key(path: Path) -> str:
with open(path / "key.json") as f:
try:
key = json.load(f)
except json.JSONDecodeError as e:
raise ClanError(f"Failed to decode {path.name}: {e}")
if key["type"] != "age":
raise ClanError(
f"{path.name} is not an age key but {key['type']}. This is not supported"
)
publickey = key.get("publickey")
if not publickey:
raise ClanError(f"{path.name} does not contain a public key")
return publickey

View File

@ -0,0 +1,177 @@
import argparse
import os
from pathlib import Path
from ..errors import ClanError
from . import secrets
from .folders import sops_groups_folder, sops_machines_folder, sops_users_folder
from .types import (
VALID_USER_NAME,
group_name_type,
machine_name_type,
secret_name_type,
user_name_type,
validate_hostname,
)
def machines_folder(group: str) -> Path:
return sops_groups_folder() / group / "machines"
def users_folder(group: str) -> Path:
return sops_groups_folder() / group / "users"
# TODO: make this a tree
def list_command(args: argparse.Namespace) -> None:
folder = sops_groups_folder()
if not folder.exists():
return
for group in os.listdir(folder):
group_folder = folder / group
if not group_folder.is_dir():
continue
print(group)
machines = machines_folder(group)
if machines.is_dir():
print("machines:")
for f in machines.iterdir():
if validate_hostname(f.name):
print(f.name)
users = users_folder(group)
if users.is_dir():
print("users:")
for f in users.iterdir():
if VALID_USER_NAME.match(f.name):
print(f)
def add_member(group_folder: Path, source_folder: Path, name: str) -> None:
source = source_folder / name
if not source.exists():
raise ClanError(f"{name} does not exist in {source_folder}")
group_folder.mkdir(parents=True, exist_ok=True)
user_target = group_folder / name
if user_target.exists():
if not user_target.is_symlink():
raise ClanError(
f"Cannot add user {name}. {user_target} exists but is not a symlink"
)
os.remove(user_target)
user_target.symlink_to(source)
def remove_member(group_folder: Path, name: str) -> None:
target = group_folder / name
if not target.exists():
raise ClanError(f"{name} does not exist in group in {group_folder}")
os.remove(target)
if len(os.listdir(group_folder)) == 0:
os.rmdir(group_folder)
if len(os.listdir(group_folder.parent)) == 0:
os.rmdir(group_folder.parent)
def add_user_command(args: argparse.Namespace) -> None:
add_member(users_folder(args.group), sops_users_folder(), args.user)
def remove_user_command(args: argparse.Namespace) -> None:
remove_member(users_folder(args.group), args.user)
def add_machine_command(args: argparse.Namespace) -> None:
add_member(
machines_folder(args.group),
sops_machines_folder(),
args.machine,
)
def remove_machine_command(args: argparse.Namespace) -> None:
remove_member(machines_folder(args.group), args.machine)
def add_group_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument("group", help="the name of the secret", type=group_name_type)
def add_secret_command(args: argparse.Namespace) -> None:
secrets.allow_member(
secrets.groups_folder(args.group), sops_machines_folder(), args.group
)
def remove_secret_command(args: argparse.Namespace) -> None:
secrets.disallow_member(secrets.groups_folder(args.group), args.group)
def register_groups_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers(
title="command",
description="the command to run",
help="the command to run",
required=True,
)
list_parser = subparser.add_parser("list", help="list groups")
list_parser.set_defaults(func=list_command)
add_machine_parser = subparser.add_parser(
"add-machine", help="add a machine to group"
)
add_group_argument(add_machine_parser)
add_machine_parser.add_argument(
"machine", help="the name of the machines to add", type=machine_name_type
)
add_machine_parser.set_defaults(func=add_machine_command)
remove_machine_parser = subparser.add_parser(
"remove-machine", help="remove a machine from group"
)
add_group_argument(remove_machine_parser)
remove_machine_parser.add_argument(
"machine", help="the name of the machines to remove", type=machine_name_type
)
remove_machine_parser.set_defaults(func=remove_machine_command)
add_user_parser = subparser.add_parser("add-user", help="add a user to group")
add_group_argument(add_user_parser)
add_user_parser.add_argument(
"user", help="the name of the user to add", type=user_name_type
)
add_user_parser.set_defaults(func=add_user_command)
remove_user_parser = subparser.add_parser(
"remove-user", help="remove a user from group"
)
add_group_argument(remove_user_parser)
remove_user_parser.add_argument(
"user", help="the name of the user to remove", type=user_name_type
)
remove_user_parser.set_defaults(func=remove_user_command)
add_secret_parser = subparser.add_parser(
"add-secret", help="allow a user to access a secret"
)
add_secret_parser.add_argument(
"group", help="the name of the user", type=group_name_type
)
add_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
add_secret_parser.set_defaults(func=add_secret_command)
remove_secret_parser = subparser.add_parser(
"remove-secret", help="remove a group's access to a secret"
)
remove_secret_parser.add_argument(
"group", help="the name of the group", type=group_name_type
)
remove_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
remove_secret_parser.set_defaults(func=remove_secret_command)

View File

@ -0,0 +1,89 @@
import argparse
from . import secrets
from .folders import add_key, list_objects, remove_object, sops_machines_folder
from .types import (
machine_name_type,
public_or_private_age_key_type,
secret_name_type,
validate_hostname,
)
def list_command(args: argparse.Namespace) -> None:
list_objects(sops_machines_folder(), lambda x: validate_hostname(x))
def add_command(args: argparse.Namespace) -> None:
add_key(sops_machines_folder() / args.machine, args.key, args.force)
def remove_command(args: argparse.Namespace) -> None:
remove_object(sops_machines_folder(), args.machine)
def add_secret_command(args: argparse.Namespace) -> None:
secrets.allow_member(
secrets.machines_folder(args.group), sops_machines_folder(), args.machine
)
def remove_secret_command(args: argparse.Namespace) -> None:
secrets.disallow_member(secrets.machines_folder(args.group), args.machine)
def register_machines_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers(
title="command",
description="the command to run",
help="the command to run",
required=True,
)
list_parser = subparser.add_parser("list", help="list machines")
list_parser.set_defaults(func=list_command)
add_parser = subparser.add_parser("add", help="add a machine")
add_parser.add_argument(
"-f",
"--force",
help="overwrite existing machine",
action="store_true",
default=False,
)
add_parser.add_argument(
"machine", help="the name of the machine", type=machine_name_type
)
add_parser.add_argument(
"key",
help="public key or private key of the user",
type=public_or_private_age_key_type,
)
add_parser.set_defaults(func=add_command)
remove_parser = subparser.add_parser("remove", help="remove a machine")
remove_parser.add_argument(
"machine", help="the name of the machine", type=machine_name_type
)
remove_parser.set_defaults(func=remove_command)
add_secret_parser = subparser.add_parser(
"add-secret", help="allow a machine to access a secret"
)
add_secret_parser.add_argument(
"machine", help="the name of the machine", type=machine_name_type
)
add_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
add_secret_parser.set_defaults(func=add_secret_command)
remove_secret_parser = subparser.add_parser(
"remove-secret", help="remove a group's access to a secret"
)
remove_secret_parser.add_argument(
"machine", help="the name of the group", type=machine_name_type
)
remove_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
remove_secret_parser.set_defaults(func=remove_secret_command)

View File

@ -0,0 +1,124 @@
import argparse
import getpass
import os
import subprocess
import sys
from io import StringIO
from pathlib import Path
from typing import IO
from .. import tty
from ..errors import ClanError
from ..nix import nix_shell
from .folders import list_objects, sops_secrets_folder
from .sops import SopsKey, encrypt_file, ensure_sops_key, read_key
from .types import VALID_SECRET_NAME, secret_name_type
def list_command(args: argparse.Namespace) -> None:
list_objects(
sops_secrets_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None
)
def get_command(args: argparse.Namespace) -> None:
secret: str = args.secret
ensure_sops_key()
secret_path = sops_secrets_folder() / secret / "secret"
if not secret_path.exists():
raise ClanError(f"Secret '{secret}' does not exist")
cmd = nix_shell(["sops"], ["sops", "--decrypt", str(secret_path)])
res = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, text=True)
print(res.stdout, end="")
def encrypt_secret(key: SopsKey, secret: Path, value: IO[str]) -> None:
keys = set([key.pubkey])
for kind in ["users", "machines", "groups"]:
if not (sops_secrets_folder() / kind).is_dir():
continue
k = read_key(sops_secrets_folder() / kind)
keys.add(k)
encrypt_file(secret / "secret", value, list(sorted(keys)))
def set_command(args: argparse.Namespace) -> None:
secret: str = args.secret
key = ensure_sops_key()
secret_value = os.environ.get("SOPS_NIX_SECRET")
if secret_value:
encrypt_secret(key, sops_secrets_folder() / secret, StringIO(secret_value))
elif tty.is_interactive():
secret = getpass.getpass(prompt="Paste your secret: ")
encrypt_secret(key, sops_secrets_folder() / secret, StringIO(secret))
else:
encrypt_secret(key, sops_secrets_folder() / secret, sys.stdin)
def remove_command(args: argparse.Namespace) -> None:
secret: str = args.secret
path = sops_secrets_folder() / secret
if not path.exists():
raise ClanError(f"Secret '{secret}' does not exist")
path.unlink()
def add_secret_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument("secret", help="the name of the secret", type=secret_name_type)
def allow_member(group_folder: Path, source_folder: Path, name: str) -> None:
source = source_folder / name
if not source.exists():
raise ClanError(f"{name} does not exist in {source_folder}")
group_folder.mkdir(parents=True, exist_ok=True)
user_target = group_folder / name
if user_target.exists():
if not user_target.is_symlink():
raise ClanError(
f"Cannot add user {name}. {user_target} exists but is not a symlink"
)
os.remove(user_target)
user_target.symlink_to(source)
def disallow_member(group_folder: Path, name: str) -> None:
target = group_folder / name
if not target.exists():
raise ClanError(f"{name} does not exist in group in {group_folder}")
os.remove(target)
if len(os.listdir(group_folder)) == 0:
os.rmdir(group_folder)
if len(os.listdir(group_folder.parent)) == 0:
os.rmdir(group_folder.parent)
def machines_folder(group: str) -> Path:
return sops_secrets_folder() / group / "machines"
def users_folder(group: str) -> Path:
return sops_secrets_folder() / group / "users"
def groups_folder(group: str) -> Path:
return sops_secrets_folder() / group / "groups"
def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
parser_list = subparser.add_parser("list", help="list secrets")
parser_list.set_defaults(func=list_command)
parser_get = subparser.add_parser("get", help="get a secret")
add_secret_argument(parser_get)
parser_get.set_defaults(func=get_command)
parser_set = subparser.add_parser("set", help="set a secret")
add_secret_argument(parser_set)
parser_set.set_defaults(func=set_command)
parser_delete = subparser.add_parser("remove", help="remove a secret")
add_secret_argument(parser_delete)
parser_delete.set_defaults(func=remove_command)

View File

@ -0,0 +1,124 @@
import os
import shutil
import subprocess
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import IO
from .. import tty
from ..dirs import user_config_dir
from ..nix import nix_shell
from .folders import add_key, read_key, sops_users_folder
class SopsKey:
def __init__(self, pubkey: str) -> None:
self.pubkey = pubkey
def get_public_key(privkey: str) -> str:
cmd = nix_shell(["age"], ["age-keygen", "-y"])
res = subprocess.run(
cmd, input=privkey, check=True, stdout=subprocess.PIPE, text=True
)
return res.stdout.strip()
def get_unique_user(users_folder: Path, user: str) -> str:
"""Return a unique path in the users_folder for the given user."""
i = 0
path = users_folder / user
while path.exists():
i += 1
user = user + str(i)
path = users_folder / user
return user
def get_user_name(user: str) -> str:
"""Ask the user for their name until a unique one is provided."""
while True:
name = input(
f"Enter your user name for which the key will be stored as [{user}]: "
)
if name:
user = name
if not (sops_users_folder() / user).exists():
return user
print(f"{sops_users_folder() / user} already exists")
def ensure_user(pub_key: str) -> SopsKey:
key = SopsKey(pub_key)
users_folder = sops_users_folder()
# Check if the public key already exists for any user
if users_folder.exists():
for user in users_folder.iterdir():
if not user.is_dir():
continue
if read_key(user) == pub_key:
return key
# Find a unique user name if the public key is not found
try:
loginname = os.getlogin()
except OSError:
loginname = os.environ.get("USER", "nobody")
username = get_unique_user(users_folder, loginname)
if tty.is_interactive():
# Ask the user for their name until a unique one is provided
username = get_user_name(username)
# Add the public key for the user
add_key(users_folder / username, pub_key, False)
return key
def ensure_sops_key() -> SopsKey:
key = os.environ.get("SOPS_AGE_KEY")
if key:
return ensure_user(get_public_key(key))
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
if raw_path:
path = Path(raw_path)
else:
path = user_config_dir() / "sops" / "age" / "keys.txt"
if path.exists():
return ensure_user(get_public_key(path.read_text()))
path.parent.mkdir(parents=True, exist_ok=True)
cmd = nix_shell(["age"], ["age-keygen", "-o", str(path)])
subprocess.run(cmd, check=True)
tty.info(
f"Generated age key at '{path}'. Please back it up on a secure location or you will lose access to your secrets."
)
return ensure_user(get_public_key(path.read_text()))
def encrypt_file(secret_path: Path, content: IO[str], keys: list[str]) -> None:
folder = secret_path.parent
folder.mkdir(parents=True, exist_ok=True)
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
with NamedTemporaryFile(delete=False) as f:
try:
with open(f.name, "w") as fd:
shutil.copyfileobj(content, fd)
args = ["sops"]
for key in keys:
args.extend(["--age", key])
args.extend(["-i", "--encrypt", str(f.name)])
cmd = nix_shell(["sops"], args)
subprocess.run(cmd, check=True)
# atomic copy of the encrypted file
with NamedTemporaryFile(dir=folder, delete=False) as f2:
shutil.copyfile(f.name, f2.name)
os.rename(f2.name, secret_path)
finally:
try:
os.remove(f.name)
except OSError:
pass

View File

@ -0,0 +1,71 @@
import argparse
import os
import re
from pathlib import Path
from typing import Callable
from ..errors import ClanError
from .sops import get_public_key
VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$")
VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$")
VALID_HOSTNAME = re.compile(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", re.IGNORECASE)
def validate_hostname(hostname: str) -> bool:
if len(hostname) > 63:
return False
return VALID_HOSTNAME.match(hostname) is not None
def secret_name_type(arg_value: str) -> str:
if not VALID_SECRET_NAME.match(arg_value):
raise argparse.ArgumentTypeError(
"Invalid character in secret name. Allowed characters are a-z, A-Z, 0-9, ., -, and _"
)
return arg_value
def machine_name_type(arg_value: str) -> str:
if len(arg_value) > 63:
raise argparse.ArgumentTypeError(
"Machine name must be less than 63 characters long"
)
if not VALID_SECRET_NAME.match(arg_value):
raise argparse.ArgumentTypeError(
"Invalid character in machine name. Allowed characters are a-z, 0-9, ., -, and _. Must not start with a number"
)
return arg_value
def public_or_private_age_key_type(arg_value: str) -> str:
if os.path.isfile(arg_value):
arg_value = Path(arg_value).read_text().strip()
if arg_value.startswith("age1"):
return arg_value.strip()
if arg_value.startswith("AGE-SECRET-KEY-"):
return get_public_key(arg_value)
if not arg_value.startswith("age1"):
raise ClanError(
f"Please provide an age key starting with age1, got: '{arg_value}'"
)
return arg_value
def group_or_user_name_type(what: str) -> Callable[[str], str]:
def name_type(arg_value: str) -> str:
if len(arg_value) > 32:
raise argparse.ArgumentTypeError(
f"{what.capitalize()} name must be less than 32 characters long"
)
if not VALID_USER_NAME.match(arg_value):
raise argparse.ArgumentTypeError(
f"Invalid character in {what} name. Allowed characters are a-z, 0-9, -, and _. Must start with a letter or _"
)
return arg_value
return name_type
user_name_type = group_or_user_name_type("user")
group_name_type = group_or_user_name_type("group")

View File

@ -0,0 +1,96 @@
import argparse
from . import secrets
from .folders import add_key, list_objects, remove_object, sops_users_folder
from .types import (
VALID_SECRET_NAME,
public_or_private_age_key_type,
secret_name_type,
user_name_type,
)
def add_user(name: str, key: str, force: bool) -> None:
add_key(sops_users_folder() / name, key, force)
def list_command(args: argparse.Namespace) -> None:
list_objects(sops_users_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None)
def add_command(args: argparse.Namespace) -> None:
add_user(args.user, args.key, args.force)
def remove_command(args: argparse.Namespace) -> None:
remove_object(sops_users_folder(), args.user)
def add_secret_command(args: argparse.Namespace) -> None:
secrets.allow_member(
secrets.groups_folder(args.group), sops_users_folder(), args.group
)
def remove_secret_command(args: argparse.Namespace) -> None:
secrets.disallow_member(secrets.groups_folder(args.group), args.group)
def register_users_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers(
title="command",
description="the command to run",
help="the command to run",
required=True,
)
list_parser = subparser.add_parser("list", help="list users")
list_parser.set_defaults(func=list_command)
add_parser = subparser.add_parser("add", help="add a user")
add_parser.add_argument(
"-f", "--force", help="overwrite existing user", action="store_true"
)
add_parser.add_argument("user", help="the name of the user", type=user_name_type)
add_parser.add_argument(
"key",
help="public key or private key of the user",
type=public_or_private_age_key_type,
)
add_parser.set_defaults(func=add_command)
remove_parser = subparser.add_parser("remove", help="remove a user")
remove_parser.add_argument("user", help="the name of the user", type=user_name_type)
remove_parser.set_defaults(func=remove_command)
add_secret_parser = subparser.add_parser(
"add-secret", help="allow a user to access a secret"
)
add_secret_parser.add_argument(
"user", help="the name of the group", type=user_name_type
)
add_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
add_secret_parser.set_defaults(func=add_secret_command)
add_secret_parser = subparser.add_parser(
"add-secret", help="allow a machine to access a secret"
)
add_secret_parser.add_argument(
"user", help="the name of the group", type=user_name_type
)
add_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
add_secret_parser.set_defaults(func=add_secret_command)
remove_secret_parser = subparser.add_parser(
"remove-secret", help="remove a user's access to a secret"
)
add_secret_parser.add_argument(
"user", help="the name of the group", type=user_name_type
)
remove_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
remove_secret_parser.set_defaults(func=remove_secret_command)

View File

@ -6,6 +6,8 @@
, installShellFiles
, zerotierone
, bubblewrap
, sops
, age
, self
}:
let
@ -71,7 +73,7 @@ let
clan-pytest = runCommand "${name}-tests"
{
nativeBuildInputs = [ zerotierone bubblewrap ];
nativeBuildInputs = [ zerotierone bubblewrap sops age ];
} ''
cp -r ${src} ./src
chmod +w -R ./src

View File

@ -18,7 +18,9 @@
openssh
sshpass
zbar
tor;
tor
sops
age;
# Override license so that we can build zerotierone without
# having to re-import nixpkgs.
zerotierone = pkgs.zerotierone.overrideAttrs (_old: { meta = { }; });

View File

@ -0,0 +1,120 @@
import argparse
import os
from pathlib import Path
import pytest
from environment import mock_env
from clan_cli.errors import ClanError
from clan_cli.secrets import register_parser
class SecretCli:
def __init__(self) -> None:
self.parser = argparse.ArgumentParser()
register_parser(self.parser)
def run(self, args: list[str]) -> argparse.Namespace:
parsed = self.parser.parse_args(args)
parsed.func(parsed)
return parsed
PUBKEY = "age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c"
PRIVKEY = "AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK"
def _test_identities(
what: str, clan_flake: Path, capsys: pytest.CaptureFixture
) -> None:
cli = SecretCli()
sops_folder = clan_flake / "sops"
cli.run([what, "add", "foo", PUBKEY])
assert (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError):
cli.run([what, "add", "foo", PUBKEY])
cli.run(
[
what,
"add",
"-f",
"foo",
PRIVKEY,
]
)
capsys.readouterr() # empty the buffer
cli.run([what, "list"])
out = capsys.readouterr() # empty the buffer
assert "foo" in out.out
cli.run([what, "remove", "foo"])
assert not (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError): # already removed
cli.run([what, "remove", "foo"])
capsys.readouterr()
cli.run([what, "list"])
out = capsys.readouterr()
assert "foo" not in out.out
def test_users(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
_test_identities("users", clan_flake, capsys)
def test_machines(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
_test_identities("machines", clan_flake, capsys)
def test_groups(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
cli = SecretCli()
capsys.readouterr() # empty the buffer
cli.run(["groups", "list"])
assert capsys.readouterr().out == ""
with pytest.raises(ClanError): # machine does not exist yet
cli.run(["groups", "add-machine", "group1", "machine1"])
with pytest.raises(ClanError): # user does not exist yet
cli.run(["groups", "add-user", "groupb1", "user1"])
cli.run(["machines", "add", "machine1", PUBKEY])
cli.run(["groups", "add-machine", "group1", "machine1"])
# Should this fail?
cli.run(["groups", "add-machine", "group1", "machine1"])
cli.run(["users", "add", "user1", PUBKEY])
cli.run(["groups", "add-user", "group1", "user1"])
capsys.readouterr() # empty the buffer
cli.run(["groups", "list"])
out = capsys.readouterr().out
assert "user1" in out
assert "machine1" in out
cli.run(["groups", "remove-user", "group1", "user1"])
cli.run(["groups", "remove-machine", "group1", "machine1"])
groups = os.listdir(clan_flake / "sops" / "groups")
assert len(groups) == 0
def test_secrets(
clan_flake: Path, capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch
) -> None:
cli = SecretCli()
capsys.readouterr() # empty the buffer
cli.run(["list"])
assert capsys.readouterr().out == ""
with pytest.raises(ClanError): # does not exist yet
cli.run(["get", "nonexisting"])
with mock_env(
SOPS_NIX_SECRET="foo", SOPS_AGE_KEY_FILE=str(clan_flake / ".." / "age.key")
):
cli.run(["set", "nonexisting"])
capsys.readouterr()
cli.run(["get", "nonexisting"])
assert capsys.readouterr().out == "foo"