add option to set defaultGroups for secrets

This commit is contained in:
Jörg Thalheim 2024-02-16 17:03:14 +01:00
parent 714f3b0378
commit 57e9b27ff8
13 changed files with 84 additions and 12 deletions

View File

@ -64,7 +64,13 @@
'';
default = pkgs.writers.writeJSON "secrets.json" (lib.mapAttrs
(_name: secret: {
secrets = builtins.attrNames secret.secrets;
secrets = lib.mapAttrsToList
(name: secret: {
inherit name;
} // lib.optionalAttrs (secret ? groups) {
inherit (secret) groups;
})
secret.secrets;
facts = lib.mapAttrs (_: secret: secret.path) secret.facts;
generator = secret.generator.finalScript;
})

View File

@ -108,6 +108,14 @@
'';
default = "${config'.clanCore.secretsDirectory}/${config'.clanCore.secretsPrefix}${config.name}";
};
} // lib.optionalAttrs (config'.clanCore.secretStore == "sops") {
groups = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = config'.clanCore.sops.defaultGroups;
description = ''
Groups to decrypt the secret for. By default we always use the user's key.
'';
};
};
}));
description = ''

View File

@ -22,6 +22,14 @@ let
secrets = filterDir containsMachineOrGroups secretsDir;
in
{
options = {
clanCore.sops.defaultGroups = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [ ];
example = [ "admins" ];
description = "The default groups to for encryption use when no groups are specified.";
};
};
config = lib.mkIf (config.clanCore.secretStore == "sops") {
clanCore.secretsDirectory = "/run/secrets";
clanCore.secretsPrefix = config.clanCore.machineName + "-";

View File

@ -4,6 +4,7 @@ from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any
from clan_cli.dirs import vm_state_dir
from qemu.qmp import QEMUMonitorProtocol
@ -101,7 +102,7 @@ class Machine:
return self.deployment_info["factsModule"]
@property
def secrets_data(self) -> dict:
def secrets_data(self) -> dict[str, dict[str, Any]]:
if self.deployment_info["secretsData"]:
try:
return json.loads(Path(self.deployment_info["secretsData"]).read_text())

View File

@ -17,9 +17,13 @@ def check_secrets(machine: Machine) -> bool:
missing_facts = []
for service in machine.secrets_data:
for secret in machine.secrets_data[service]["secrets"]:
if not secret_store.exists(service, secret):
if isinstance(secret, str):
secret_name = secret
else:
secret_name = secret["name"]
if not secret_store.exists(service, secret_name):
log.info(f"Secret {secret} for service {service} is missing")
missing_secrets.append((service, secret))
missing_secrets.append((service, secret_name))
for fact in machine.secrets_data[service]["facts"]:
if not fact_store.exists(service, fact):

View File

@ -69,12 +69,22 @@ def generate_service_secrets(
files_to_commit = []
# store secrets
for secret in machine.secrets_data[service]["secrets"]:
secret_file = secrets_dir / secret
if isinstance(secret, str):
# TODO: This is the old NixOS module, can be dropped everyone has updated.
secret_name = secret
groups = []
else:
secret_name = secret["name"]
groups = secret.get("groups", [])
secret_file = secrets_dir / secret_name
if not secret_file.is_file():
msg = f"did not generate a file for '{secret}' when running the following command:\n"
msg = f"did not generate a file for '{secret_name}' when running the following command:\n"
msg += machine.secrets_data[service]["generator"]
raise ClanError(msg)
secret_path = secret_store.set(service, secret, secret_file.read_bytes())
secret_path = secret_store.set(
service, secret_name, secret_file.read_bytes(), groups
)
if secret_path:
files_to_commit.append(secret_path)

View File

@ -10,7 +10,9 @@ class SecretStoreBase(ABC):
pass
@abstractmethod
def set(self, service: str, name: str, value: bytes) -> Path | None:
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
pass
@abstractmethod

View File

@ -12,7 +12,9 @@ class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
def set(self, service: str, name: str, value: bytes) -> Path | None:
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
subprocess.run(
nix_shell(
["nixpkgs#pass"],
@ -104,5 +106,10 @@ class SecretStore(SecretStoreBase):
def upload(self, output_dir: Path) -> None:
for service in self.machine.secrets_data:
for secret in self.machine.secrets_data[service]["secrets"]:
(output_dir / secret).write_bytes(self.get(service, secret))
if isinstance(secret, dict):
secret_name = secret["name"]
else:
# TODO: drop old format soon
secret_name = secret
(output_dir / secret_name).write_bytes(self.get(service, secret_name))
(output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@ -28,7 +28,9 @@ class SecretStore:
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def set(self, service: str, name: str, value: bytes) -> Path | None:
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
path = (
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}"
)
@ -37,6 +39,7 @@ class SecretStore:
path,
value.decode(),
add_machines=[self.machine.name],
add_groups=groups,
)
return path

View File

@ -14,7 +14,9 @@ class SecretStore(SecretStoreBase):
self.dir = vm_state_dir(str(machine.flake), machine.name) / "secrets"
self.dir.mkdir(parents=True, exist_ok=True)
def set(self, service: str, name: str, value: bytes) -> Path | None:
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
secret_file = self.dir / service / name
secret_file.parent.mkdir(parents=True, exist_ok=True)
secret_file.write_bytes(value)

View File

@ -16,6 +16,7 @@
system.stateVersion = lib.version;
sops.age.keyFile = "__CLAN_SOPS_KEY_PATH__";
clanCore.secretsUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clanCore.sops.defaultGroups = [ "admins" ];
clan.virtualisation.graphics = false;
clan.networking.zerotier.controller.enable = true;

View File

@ -33,6 +33,17 @@ def test_generate_secret(
age_keys[0].pubkey,
]
)
cli.run(
[
"--flake",
str(test_flake_with_core.path),
"secrets",
"groups",
"add-user",
"admins",
"user1",
]
)
cmd = ["--flake", str(test_flake_with_core.path), "secrets", "generate", "vm1"]
cli.run(cmd)
has_secret(test_flake_with_core.path, "vm1-age.key")

View File

@ -106,6 +106,15 @@ def test_run(
age_keys[0].pubkey,
]
)
cli.run(
[
"secrets",
"groups",
"add-user",
"admins",
"user1",
]
)
cli.run(["vms", "run", "vm1"])