Merge pull request 'fix rotating keys with sops' (#94) from Mic92-mic92 into main
All checks were successful
build / test (push) Successful in 24s
All checks were successful
build / test (push) Successful in 24s
This commit is contained in:
commit
6f22717ffd
@ -102,12 +102,12 @@ def add_group_argument(parser: argparse.ArgumentParser) -> None:
|
||||
|
||||
def add_secret_command(args: argparse.Namespace) -> None:
|
||||
secrets.allow_member(
|
||||
secrets.groups_folder(args.group), sops_machines_folder(), args.group
|
||||
secrets.groups_folder(args.secret), sops_groups_folder(), args.group
|
||||
)
|
||||
|
||||
|
||||
def remove_secret_command(args: argparse.Namespace) -> None:
|
||||
secrets.disallow_member(secrets.groups_folder(args.group), args.group)
|
||||
secrets.disallow_member(secrets.groups_folder(args.secret), args.group)
|
||||
|
||||
|
||||
def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
||||
|
@ -25,12 +25,12 @@ def remove_command(args: argparse.Namespace) -> None:
|
||||
|
||||
def add_secret_command(args: argparse.Namespace) -> None:
|
||||
secrets.allow_member(
|
||||
secrets.machines_folder(args.group), sops_machines_folder(), args.machine
|
||||
secrets.machines_folder(args.secret), sops_machines_folder(), args.machine
|
||||
)
|
||||
|
||||
|
||||
def remove_secret_command(args: argparse.Namespace) -> None:
|
||||
secrets.disallow_member(secrets.machines_folder(args.group), args.machine)
|
||||
secrets.disallow_member(secrets.machines_folder(args.secret), args.machine)
|
||||
|
||||
|
||||
def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
||||
|
@ -11,8 +11,8 @@ from typing import IO
|
||||
from .. import tty
|
||||
from ..errors import ClanError
|
||||
from ..nix import nix_shell
|
||||
from .folders import list_objects, sops_secrets_folder
|
||||
from .sops import SopsKey, encrypt_file, ensure_sops_key, read_key
|
||||
from .folders import list_objects, sops_secrets_folder, sops_users_folder
|
||||
from .sops import SopsKey, encrypt_file, ensure_sops_key, read_key, update_keys
|
||||
from .types import VALID_SECRET_NAME, secret_name_type
|
||||
|
||||
|
||||
@ -54,6 +54,9 @@ def set_command(args: argparse.Namespace) -> None:
|
||||
else:
|
||||
encrypt_secret(key, sops_secrets_folder() / args.secret, sys.stdin)
|
||||
|
||||
# make sure we add ourselves to the key
|
||||
allow_member(users_folder(args.secret), sops_users_folder(), key.username)
|
||||
|
||||
|
||||
def remove_command(args: argparse.Namespace) -> None:
|
||||
secret: str = args.secret
|
||||
@ -67,6 +70,51 @@ def add_secret_argument(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument("secret", help="the name of the secret", type=secret_name_type)
|
||||
|
||||
|
||||
def machines_folder(group: str) -> Path:
|
||||
return sops_secrets_folder() / group / "machines"
|
||||
|
||||
|
||||
def users_folder(group: str) -> Path:
|
||||
return sops_secrets_folder() / group / "users"
|
||||
|
||||
|
||||
def groups_folder(group: str) -> Path:
|
||||
return sops_secrets_folder() / group / "groups"
|
||||
|
||||
|
||||
def collect_keys_for_type(folder: Path) -> list[str]:
|
||||
if not folder.exists():
|
||||
return []
|
||||
keys = []
|
||||
for p in folder.iterdir():
|
||||
if not p.is_symlink():
|
||||
continue
|
||||
try:
|
||||
target = p.resolve()
|
||||
except FileNotFoundError:
|
||||
tty.warn(f"Ignoring broken symlink {p}")
|
||||
continue
|
||||
kind = target.parent.name
|
||||
if folder.name != kind:
|
||||
tty.warn(f"Expected {p} to point to {folder} but points to {target.parent}")
|
||||
continue
|
||||
keys.append(read_key(target))
|
||||
return keys
|
||||
|
||||
|
||||
def collect_keys_for_path(path: Path) -> list[str]:
|
||||
keys = []
|
||||
keys += collect_keys_for_type(path / "machines")
|
||||
keys += collect_keys_for_type(path / "users")
|
||||
groups = path / "groups"
|
||||
if not groups.is_dir():
|
||||
return keys
|
||||
for group in groups.iterdir():
|
||||
keys += collect_keys_for_type(group / "machines")
|
||||
keys += collect_keys_for_type(group / "users")
|
||||
return keys
|
||||
|
||||
|
||||
def allow_member(group_folder: Path, source_folder: Path, name: str) -> None:
|
||||
source = source_folder / name
|
||||
if not source.exists():
|
||||
@ -80,12 +128,20 @@ def allow_member(group_folder: Path, source_folder: Path, name: str) -> None:
|
||||
)
|
||||
os.remove(user_target)
|
||||
user_target.symlink_to(source)
|
||||
update_keys(group_folder.parent, collect_keys_for_path(group_folder.parent))
|
||||
|
||||
|
||||
def disallow_member(group_folder: Path, name: str) -> None:
|
||||
target = group_folder / name
|
||||
if not target.exists():
|
||||
raise ClanError(f"{name} does not exist in group in {group_folder}")
|
||||
|
||||
keys = collect_keys_for_path(group_folder.parent)
|
||||
|
||||
if len(keys) < 2:
|
||||
raise ClanError(
|
||||
f"Cannot remove {name} from {group_folder.parent.name}. No keys left. Use 'clan secrets remove {name}' to remove the secret."
|
||||
)
|
||||
os.remove(target)
|
||||
|
||||
if len(os.listdir(group_folder)) == 0:
|
||||
@ -94,17 +150,7 @@ def disallow_member(group_folder: Path, name: str) -> None:
|
||||
if len(os.listdir(group_folder.parent)) == 0:
|
||||
os.rmdir(group_folder.parent)
|
||||
|
||||
|
||||
def machines_folder(group: str) -> Path:
|
||||
return sops_secrets_folder() / group / "machines"
|
||||
|
||||
|
||||
def users_folder(group: str) -> Path:
|
||||
return sops_secrets_folder() / group / "users"
|
||||
|
||||
|
||||
def groups_folder(group: str) -> Path:
|
||||
return sops_secrets_folder() / group / "groups"
|
||||
update_keys(target.parent.parent, collect_keys_for_path(group_folder.parent))
|
||||
|
||||
|
||||
def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
||||
|
@ -2,9 +2,10 @@ import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import IO
|
||||
from typing import IO, Iterator
|
||||
|
||||
from .. import tty
|
||||
from ..dirs import user_config_dir
|
||||
@ -14,8 +15,9 @@ from .folders import sops_users_folder
|
||||
|
||||
|
||||
class SopsKey:
|
||||
def __init__(self, pubkey: str) -> None:
|
||||
def __init__(self, pubkey: str, username: str) -> None:
|
||||
self.pubkey = pubkey
|
||||
self.username = username
|
||||
|
||||
|
||||
def get_public_key(privkey: str) -> str:
|
||||
@ -51,7 +53,7 @@ def get_user_name(user: str) -> str:
|
||||
|
||||
|
||||
def ensure_user(pub_key: str) -> SopsKey:
|
||||
key = SopsKey(pub_key)
|
||||
key = SopsKey(pub_key, username="")
|
||||
users_folder = sops_users_folder()
|
||||
|
||||
# Check if the public key already exists for any user
|
||||
@ -60,6 +62,7 @@ def ensure_user(pub_key: str) -> SopsKey:
|
||||
if not user.is_dir():
|
||||
continue
|
||||
if read_key(user) == pub_key:
|
||||
key.username = user.name
|
||||
return key
|
||||
|
||||
# Find a unique user name if the public key is not found
|
||||
@ -76,6 +79,8 @@ def ensure_user(pub_key: str) -> SopsKey:
|
||||
# Add the public key for the user
|
||||
write_key(users_folder / username, pub_key, False)
|
||||
|
||||
key.username = username
|
||||
|
||||
return key
|
||||
|
||||
|
||||
@ -100,6 +105,32 @@ def ensure_sops_key() -> SopsKey:
|
||||
return ensure_user(get_public_key(path.read_text()))
|
||||
|
||||
|
||||
@contextmanager
|
||||
def sops_manifest(keys: list[str]) -> Iterator[Path]:
|
||||
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
||||
json.dump(
|
||||
dict(creation_rules=[dict(key_groups=[dict(age=keys)])]), manifest, indent=2
|
||||
)
|
||||
manifest.flush()
|
||||
yield Path(manifest.name)
|
||||
|
||||
|
||||
def update_keys(secret_path: Path, keys: list[str]) -> None:
|
||||
with sops_manifest(keys) as manifest:
|
||||
cmd = nix_shell(
|
||||
["sops"],
|
||||
[
|
||||
"sops",
|
||||
"--config",
|
||||
str(manifest),
|
||||
"updatekeys",
|
||||
"--yes",
|
||||
str(secret_path / "secret"),
|
||||
],
|
||||
)
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
|
||||
def encrypt_file(secret_path: Path, content: IO[str], keys: list[str]) -> None:
|
||||
folder = secret_path.parent
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
|
@ -29,12 +29,12 @@ def remove_command(args: argparse.Namespace) -> None:
|
||||
|
||||
def add_secret_command(args: argparse.Namespace) -> None:
|
||||
secrets.allow_member(
|
||||
secrets.groups_folder(args.group), sops_users_folder(), args.group
|
||||
secrets.users_folder(args.secret), sops_users_folder(), args.user
|
||||
)
|
||||
|
||||
|
||||
def remove_secret_command(args: argparse.Namespace) -> None:
|
||||
secrets.disallow_member(secrets.groups_folder(args.group), args.group)
|
||||
secrets.disallow_member(secrets.users_folder(args.secret), args.user)
|
||||
|
||||
|
||||
def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
@ -74,21 +74,10 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
)
|
||||
add_secret_parser.set_defaults(func=add_secret_command)
|
||||
|
||||
add_secret_parser = subparser.add_parser(
|
||||
"add-secret", help="allow a machine to access a secret"
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
"user", help="the name of the group", type=user_name_type
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
"secret", help="the name of the secret", type=secret_name_type
|
||||
)
|
||||
add_secret_parser.set_defaults(func=add_secret_command)
|
||||
|
||||
remove_secret_parser = subparser.add_parser(
|
||||
"remove-secret", help="remove a user's access to a secret"
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
remove_secret_parser.add_argument(
|
||||
"user", help="the name of the group", type=user_name_type
|
||||
)
|
||||
remove_secret_parser.add_argument(
|
||||
|
@ -23,6 +23,9 @@ class SecretCli:
|
||||
PUBKEY = "age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c"
|
||||
PRIVKEY = "AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK"
|
||||
|
||||
PUBKEY_2 = "age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62"
|
||||
PRIVKEY_2 = "AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ"
|
||||
|
||||
|
||||
def _test_identities(
|
||||
what: str, clan_flake: Path, capsys: pytest.CaptureFixture
|
||||
@ -123,6 +126,34 @@ def test_secrets(
|
||||
cli.run(["list"])
|
||||
assert capsys.readouterr().out == "key\n"
|
||||
|
||||
cli.run(["machines", "add", "machine1", PUBKEY])
|
||||
cli.run(["machines", "add-secret", "machine1", "key"])
|
||||
|
||||
with mock_env(SOPS_AGE_KEY=PRIVKEY, SOPS_AGE_KEY_FILE=""):
|
||||
capsys.readouterr()
|
||||
cli.run(["get", "key"])
|
||||
assert capsys.readouterr().out == "foo"
|
||||
cli.run(["machines", "remove-secret", "machine1", "key"])
|
||||
|
||||
cli.run(["users", "add", "user1", PUBKEY_2])
|
||||
cli.run(["users", "add-secret", "user1", "key"])
|
||||
with mock_env(SOPS_AGE_KEY=PRIVKEY_2, SOPS_AGE_KEY_FILE=""):
|
||||
capsys.readouterr()
|
||||
cli.run(["get", "key"])
|
||||
assert capsys.readouterr().out == "foo"
|
||||
cli.run(["users", "remove-secret", "user1", "key"])
|
||||
|
||||
with pytest.raises(ClanError): # does not exist yet
|
||||
cli.run(["groups", "add-secret", "admin-group", "key"])
|
||||
cli.run(["groups", "add-user", "admin-group", "user1"])
|
||||
cli.run(["groups", "add-secret", "admin-group", "key"])
|
||||
|
||||
with mock_env(SOPS_AGE_KEY=PRIVKEY_2, SOPS_AGE_KEY_FILE=""):
|
||||
capsys.readouterr()
|
||||
cli.run(["get", "key"])
|
||||
assert capsys.readouterr().out == "foo"
|
||||
cli.run(["groups", "remove-secret", "admin-group", "key"])
|
||||
|
||||
cli.run(["remove", "key"])
|
||||
|
||||
capsys.readouterr() # empty the buffer
|
||||
|
Loading…
Reference in New Issue
Block a user