From 8338944062a3e878cd2fe21104541da9b4e67738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Wed, 17 Jan 2024 14:02:37 +0100 Subject: [PATCH] move python code in nixos Module to external file --- nixosModules/clanCore/secrets/sops.nix | 52 +---------- .../clanCore/secrets/sops/default.nix | 93 +++++++++++++++++++ nixosModules/clanCore/secrets/sops/sops.py | 47 ++++++++++ 3 files changed, 141 insertions(+), 51 deletions(-) create mode 100644 nixosModules/clanCore/secrets/sops/default.nix create mode 100644 nixosModules/clanCore/secrets/sops/sops.py diff --git a/nixosModules/clanCore/secrets/sops.nix b/nixosModules/clanCore/secrets/sops.nix index bf6ee797..c507c9bf 100644 --- a/nixosModules/clanCore/secrets/sops.nix +++ b/nixosModules/clanCore/secrets/sops.nix @@ -26,57 +26,7 @@ in clanCore.secretsDirectory = "/run/secrets"; clanCore.secretsPrefix = config.clanCore.machineName + "-"; system.clan = lib.mkIf (config.clanCore.secrets != { }) { - - secretsModule = pkgs.writeText "sops.py" '' - from pathlib import Path - - from clan_cli.secrets.folders import sops_secrets_folder - from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret - from clan_cli.secrets.sops import generate_private_key - from clan_cli.secrets.machines import has_machine, add_machine - from clan_cli.machines.machines import Machine - - - class SecretStore: - def __init__(self, machine: Machine) -> None: - self.machine = machine - if has_machine(self.machine.flake_dir, self.machine.name): - return - priv_key, pub_key = generate_private_key() - encrypt_secret( - self.machine.flake_dir, - sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-age.key", - priv_key, - ) - add_machine(self.machine.flake_dir, self.machine.name, pub_key, False) - - def set(self, service: str, name: str, value: str): - encrypt_secret( - self.machine.flake_dir, - sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}", - value, - add_machines=[self.machine.name], - ) - - def get(self, service: str, name: str) -> bytes: - # TODO: add support for getting a secret - pass - - def exists(self, service: str, name: str) -> bool: - return has_secret( - self.machine.flake_dir, - f"{self.machine.name}-{name}", - ) - - def upload(self, output_dir: Path, secrets: list[str, str]) -> None: - key_name = f"{self.machine.name}-age.key" - if not has_secret(self.machine.flake_dir, key_name): - # skip uploading the secret, not managed by us - return - key = decrypt_secret(self.machine.flake_dir, key_name) - - (output_dir / "key.txt").write_text(key) - ''; + secretsModule = ./sops/sops.py; }; sops.secrets = builtins.mapAttrs (name: _: { diff --git a/nixosModules/clanCore/secrets/sops/default.nix b/nixosModules/clanCore/secrets/sops/default.nix new file mode 100644 index 00000000..d01feb76 --- /dev/null +++ b/nixosModules/clanCore/secrets/sops/default.nix @@ -0,0 +1,93 @@ +{ config, lib, pkgs, ... }: +let + secretsDir = config.clanCore.clanDir + "/sops/secrets"; + groupsDir = config.clanCore.clanDir + "/sops/groups"; + + + # My symlink is in the nixos module detected as a directory also it works in the repl. Is this because of pure evaluation? + containsSymlink = path: + builtins.pathExists path && (builtins.readFileType path == "directory" || builtins.readFileType path == "symlink"); + + containsMachine = parent: name: type: + type == "directory" && containsSymlink "${parent}/${name}/machines/${config.clanCore.machineName}"; + + containsMachineOrGroups = name: type: + (containsMachine secretsDir name type) || lib.any (group: type == "directory" && containsSymlink "${secretsDir}/${name}/groups/${group}") groups; + + filterDir = filter: dir: + lib.optionalAttrs (builtins.pathExists dir) + (lib.filterAttrs filter (builtins.readDir dir)); + + groups = builtins.attrNames (filterDir (containsMachine groupsDir) groupsDir); + secrets = filterDir containsMachineOrGroups secretsDir; +in +{ + config = lib.mkIf (config.clanCore.secretStore == "sops") { + clanCore.secretsDirectory = "/run/secrets"; + clanCore.secretsPrefix = config.clanCore.machineName + "-"; + system.clan = lib.mkIf (config.clanCore.secrets != { }) { + secretsModule = pkgs.writeText "sops.py" '' + from pathlib import Path + + from clan_cli.secrets.folders import sops_secrets_folder + from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret + from clan_cli.secrets.sops import generate_private_key + from clan_cli.secrets.machines import has_machine, add_machine + from clan_cli.machines.machines import Machine + + + class SecretStore: + def __init__(self, machine: Machine) -> None: + self.machine = machine + if has_machine(self.machine.flake_dir, self.machine.name): + return + priv_key, pub_key = generate_private_key() + encrypt_secret( + self.machine.flake_dir, + sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-age.key", + priv_key, + ) + add_machine(self.machine.flake_dir, self.machine.name, pub_key, False) + + def set(self, service: str, name: str, value: str): + encrypt_secret( + self.machine.flake_dir, + sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}", + value, + add_machines=[self.machine.name], + ) + + def get(self, service: str, name: str) -> bytes: + # TODO: add support for getting a secret + pass + + def exists(self, service: str, name: str) -> bool: + return has_secret( + self.machine.flake_dir, + f"{self.machine.name}-{name}", + ) + + def upload(self, output_dir: Path, secrets: list[str, str]) -> None: + key_name = f"{self.machine.name}-age.key" + if not has_secret(self.machine.flake_dir, key_name): + # skip uploading the secret, not managed by us + return + key = decrypt_secret(self.machine.flake_dir, key_name) + + (output_dir / "key.txt").write_text(key) + ''; + }; + sops.secrets = builtins.mapAttrs + (name: _: { + sopsFile = config.clanCore.clanDir + "/sops/secrets/${name}/secret"; + format = "binary"; + }) + secrets; + # To get proper error messages about missing secrets we need a dummy secret file that is always present + sops.defaultSopsFile = lib.mkIf config.sops.validateSopsFiles (lib.mkDefault (builtins.toString (pkgs.writeText "dummy.yaml" ""))); + + sops.age.keyFile = lib.mkIf (builtins.pathExists (config.clanCore.clanDir + "/sops/secrets/${config.clanCore.machineName}-age.key/secret")) + (lib.mkDefault "/var/lib/sops-nix/key.txt"); + clanCore.secretsUploadDirectory = lib.mkDefault "/var/lib/sops-nix"; + }; +} diff --git a/nixosModules/clanCore/secrets/sops/sops.py b/nixosModules/clanCore/secrets/sops/sops.py new file mode 100644 index 00000000..1ad42372 --- /dev/null +++ b/nixosModules/clanCore/secrets/sops/sops.py @@ -0,0 +1,47 @@ +from pathlib import Path + +from clan_cli.machines.machines import Machine +from clan_cli.secrets.folders import sops_secrets_folder +from clan_cli.secrets.machines import add_machine, has_machine +from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret +from clan_cli.secrets.sops import generate_private_key + + +class SecretStore: + def __init__(self, machine: Machine) -> None: + self.machine = machine + if has_machine(self.machine.flake_dir, self.machine.name): + return + priv_key, pub_key = generate_private_key() + encrypt_secret( + self.machine.flake_dir, + sops_secrets_folder(self.machine.flake_dir) + / f"{self.machine.name}-age.key", + priv_key, + ) + add_machine(self.machine.flake_dir, self.machine.name, pub_key, False) + + def set(self, _service: str, name: str, value: str): + encrypt_secret( + self.machine.flake_dir, + sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}", + value, + add_machines=[self.machine.name], + ) + + def get(self, _service: str, _name: str) -> bytes: + raise NotImplementedError() + + def exists(self, _service: str, name: str) -> bool: + return has_secret( + self.machine.flake_dir, + f"{self.machine.name}-{name}", + ) + + def upload(self, output_dir: Path, _secrets: list[str]) -> None: + key_name = f"{self.machine.name}-age.key" + if not has_secret(self.machine.flake_dir, key_name): + # skip uploading the secret, not managed by us + return + key = decrypt_secret(self.machine.flake_dir, key_name) + (output_dir / "key.txt").write_text(key)