re-encrypt secrets after rotating users/machines keys
All checks were successful
checks / check-links (pull_request) Successful in 13s
checks / checks-impure (pull_request) Successful in 1m53s
checks / checks (pull_request) Successful in 4m5s

This commit is contained in:
Jörg Thalheim 2024-03-25 11:18:20 +01:00
parent b6d5f8a6ce
commit 0fa36252c2
6 changed files with 84 additions and 23 deletions

View File

@ -3,14 +3,8 @@ from pathlib import Path
from .. import tty
from ..errors import ClanError
from .folders import sops_secrets_folder
from .secrets import collect_keys_for_path, list_secrets
from .sops import (
default_sops_key_path,
generate_private_key,
get_public_key,
update_keys,
)
from .secrets import update_secrets
from .sops import default_sops_key_path, generate_private_key, get_public_key
def generate_key() -> str:
@ -44,12 +38,7 @@ def show_command(args: argparse.Namespace) -> None:
def update_command(args: argparse.Namespace) -> None:
flake_dir = Path(args.flake)
for name in list_secrets(flake_dir):
secret_path = sops_secrets_folder(flake_dir) / name
update_keys(
secret_path,
list(sorted(collect_keys_for_path(secret_path))),
)
update_secrets(flake_dir)
def register_key_parser(parser: argparse.ArgumentParser) -> None:

View File

@ -6,6 +6,7 @@ from ..git import commit_files
from ..machines.types import machine_name_type, validate_hostname
from . import secrets
from .folders import list_objects, remove_object, sops_machines_folder
from .secrets import update_secrets
from .sops import read_key, write_key
from .types import public_or_private_age_key_type, secret_name_type
@ -13,6 +14,12 @@ from .types import public_or_private_age_key_type, secret_name_type
def add_machine(flake_dir: Path, name: str, key: str, force: bool) -> None:
path = sops_machines_folder(flake_dir) / name
write_key(path, key, force)
paths = [path]
def filter_machine_secrets(secret: Path) -> bool:
return secret.joinpath("machines", name).exists()
paths.extend(update_secrets(flake_dir, filter_secrets=filter_machine_secrets))
commit_files(
[path],
flake_dir,

View File

@ -3,6 +3,7 @@ import getpass
import os
import shutil
import sys
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
from typing import IO
@ -21,6 +22,23 @@ from .sops import decrypt_file, encrypt_file, ensure_sops_key, read_key, update_
from .types import VALID_SECRET_NAME, secret_name_type
def update_secrets(
flake_dir: Path, filter_secrets: Callable[[Path], bool] = lambda _: True
) -> list[Path]:
changed_files = []
for name in list_secrets(flake_dir):
secret_path = sops_secrets_folder(flake_dir) / name
if not filter_secrets(secret_path):
continue
changed_files.extend(
update_keys(
secret_path,
list(sorted(collect_keys_for_path(secret_path))),
)
)
return changed_files
def collect_keys_for_type(folder: Path) -> set[str]:
if not folder.exists():
return set()

View File

@ -117,8 +117,10 @@ def sops_manifest(keys: list[str]) -> Iterator[Path]:
yield Path(manifest.name)
def update_keys(secret_path: Path, keys: list[str]) -> None:
def update_keys(secret_path: Path, keys: list[str]) -> list[Path]:
with sops_manifest(keys) as manifest:
secret_path = secret_path / "secret"
time_before = secret_path.stat().st_mtime
cmd = nix_shell(
["nixpkgs#sops"],
[
@ -127,10 +129,13 @@ def update_keys(secret_path: Path, keys: list[str]) -> None:
str(manifest),
"updatekeys",
"--yes",
str(secret_path / "secret"),
str(secret_path),
],
)
run(cmd, log=Log.BOTH, error_msg=f"Could not update keys for {secret_path}")
if time_before == secret_path.stat().st_mtime:
return []
return [secret_path]
def encrypt_file(
@ -202,7 +207,9 @@ def write_key(path: Path, publickey: str, overwrite: bool) -> None:
flags |= os.O_EXCL
fd = os.open(path / "key.json", flags)
except FileExistsError:
raise ClanError(f"{path.name} already exists in {path}. Use --force to overwrite.")
raise ClanError(
f"{path.name} already exists in {path}. Use --force to overwrite."
)
with os.fdopen(fd, "w") as f:
json.dump({"publickey": publickey, "type": "age"}, f, indent=2)

View File

@ -5,6 +5,7 @@ from ..errors import ClanError
from ..git import commit_files
from . import secrets
from .folders import list_objects, remove_object, sops_users_folder
from .secrets import update_secrets
from .sops import read_key, write_key
from .types import (
VALID_USER_NAME,
@ -16,9 +17,15 @@ from .types import (
def add_user(flake_dir: Path, name: str, key: str, force: bool) -> None:
path = sops_users_folder(flake_dir) / name
def filter_user_secrets(secret: Path) -> bool:
return secret.joinpath("users", name).exists()
write_key(path, key, force)
paths = [path]
paths.extend(update_secrets(flake_dir, filter_secrets=filter_user_secrets))
commit_files(
[path],
paths,
flake_dir,
f"Add user {name} to secrets",
)

View File

@ -37,9 +37,21 @@ def _test_identities(
]
)
assert (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError):
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey])
with pytest.raises(ClanError): # raises "foo already exists"
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
what,
"add",
"foo",
age_keys[0].pubkey,
]
)
# rotate the key
cli.run(
[
"--flake",
@ -49,7 +61,7 @@ def _test_identities(
"add",
"-f",
"foo",
age_keys[0].privkey,
age_keys[1].privkey,
]
)
@ -65,7 +77,7 @@ def _test_identities(
]
)
out = capsys.readouterr() # empty the buffer
assert age_keys[0].pubkey in out.out
assert age_keys[1].pubkey in out.out
capsys.readouterr() # empty the buffer
cli.run(["--flake", str(test_flake.path), "secrets", what, "list"])
@ -291,7 +303,7 @@ def test_secrets(
"machines",
"add",
"machine1",
age_keys[0].pubkey,
age_keys[1].pubkey,
]
)
cli.run(
@ -309,6 +321,27 @@ def test_secrets(
cli.run(["--flake", str(test_flake.path), "secrets", "machines", "list"])
assert capsys.readouterr().out == "machine1\n"
with use_key(age_keys[1].privkey, monkeypatch):
capsys.readouterr()
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
assert capsys.readouterr().out == "foo"
# rotate machines key
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"machines",
"add",
"-f",
"machine1",
age_keys[0].privkey,
]
)
# should also rotate the encrypted secret
with use_key(age_keys[0].privkey, monkeypatch):
capsys.readouterr()
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])