secrets: use clanInternal for crosscompiling, move sops generators to new file

This commit is contained in:
lassulus 2023-09-20 18:08:47 +02:00
parent 18c360f729
commit aeed648bd0
6 changed files with 183 additions and 165 deletions

View File

@ -11,22 +11,44 @@ let
(builtins.fromJSON
(builtins.readFile (directory + /machines/${machineName}/settings.json)));
nixosConfiguration = { system ? "x86_64-linux", name }: nixpkgs.lib.nixosSystem {
modules = [
self.nixosModules.clanCore
(machineSettings name)
(machines.${name} or { })
{
clanCore.machineName = name;
clanCore.clanDir = directory;
# TODO: remove this once we have a hardware-config mechanism
nixpkgs.hostPlatform = lib.mkDefault system;
}
];
inherit specialArgs;
};
nixosConfigurations = lib.mapAttrs
(name: _:
nixpkgs.lib.nixosSystem {
modules = [
self.nixosModules.clanCore
(machineSettings name)
(machines.${name} or { })
{
clanCore.machineName = name;
clanCore.clanDir = directory;
# TODO: remove this once we have a hardware-config mechanism
nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";
}
];
inherit specialArgs;
})
nixosConfiguration { inherit name; })
(machinesDirs // machines);
systems = [
"x86_64-linux"
"aarch64-linux"
"riscv64-linux"
"x86_64-darwin"
"aarch64-darwin"
];
clanInternals = {
machines = lib.mapAttrs
(name: _:
(builtins.listToAttrs (map
(system:
lib.nameValuePair system (nixosConfiguration { inherit name system; })
)
systems))
)
(machinesDirs // machines);
};
in
nixosConfigurations
{ inherit nixosConfigurations clanInternals; }

View File

@ -19,33 +19,26 @@ let
groups = builtins.attrNames (filterDir (containsMachine groupsDir) groupsDir);
secrets = filterDir containsMachineOrGroups secretsDir;
systems = [ "i686-linux" "x86_64-linux" "riscv64-linux" "aarch64-linux" "x86_64-darwin" ];
in
{
config = lib.mkIf (config.clanCore.secretStore == "sops") {
system.clan = lib.genAttrs systems (system:
let
# Maybe use inputs.nixpkgs.legacyPackages here?
# don't reimport nixpkgs if we are on the same system (optimization)
pkgs' = if pkgs.hostPlatform.system == system then pkgs else import pkgs.path { system = system; };
in
{
generateSecrets = pkgs.writeScript "generate-secrets" ''
#!${pkgs'.python3}/bin/python
import json
from clan_cli.secrets.generate import generate_secrets_from_nix
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; secret_submodules = config.clanCore.secrets; })})
generate_secrets_from_nix(**args)
'';
uploadSecrets = pkgs.writeScript "upload-secrets" ''
#!${pkgs'.python3}/bin/python
import json
from clan_cli.secrets.upload import upload_age_key_from_nix
# the second toJSON is needed to escape the string for the python
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; deployment_address = config.clan.networking.deploymentAddress; age_key_file = config.sops.age.keyFile; })})
upload_age_key_from_nix(**args)
'';
});
system.clan = {
generateSecrets = pkgs.writeScript "generate-secrets" ''
#!${pkgs.python3}/bin/python
import json
from clan_cli.secrets.sops_generate import generate_secrets_from_nix
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; secret_submodules = config.clanCore.secrets; })})
generate_secrets_from_nix(**args)
'';
uploadSecrets = pkgs.writeScript "upload-secrets" ''
#!${pkgs.python3}/bin/python
import json
from clan_cli.secrets.sops_generate import upload_age_key_from_nix
# the second toJSON is needed to escape the string for the python
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; deployment_address = config.clan.networking.deploymentAddress; age_key_file = config.sops.age.keyFile; })})
upload_age_key_from_nix(**args)
'';
};
sops.secrets = builtins.mapAttrs
(name: _: {
sopsFile = config.clanCore.clanDir + "/sops/secrets/${name}/secret";

View File

@ -1,34 +1,25 @@
import argparse
import os
import shlex
import shutil
import subprocess
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
from clan_cli.errors import ClanError
from ..dirs import get_clan_flake_toplevel, module_root
from ..nix import nix_build, nix_config
from .folders import sops_secrets_folder
from .machines import add_machine, has_machine
from .secrets import encrypt_secret, has_secret
from .sops import generate_private_key
def generate_secrets(machine: str) -> None:
clan_dir = get_clan_flake_toplevel().as_posix().strip()
env = os.environ.copy()
env["CLAN_DIR"] = clan_dir
env["PYTHONPATH"] = str(module_root().parent)
env["PYTHONPATH"] = str(module_root().parent) # TODO do this in the clanCore module
config = nix_config()
system = config["system"]
cmd = nix_build(
[
f'path:{clan_dir}#nixosConfigurations."{machine}".config.system.clan.{system}.generateSecrets'
f'path:{clan_dir}#clanInternals.machines."{machine}".{system}.config.system.clan.generateSecrets'
]
)
proc = subprocess.run(cmd, stdout=subprocess.PIPE, text=True)
@ -50,87 +41,6 @@ def generate_secrets(machine: str) -> None:
print("successfully generated secrets")
def generate_host_key(machine_name: str) -> None:
if has_machine(machine_name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(sops_secrets_folder() / f"{machine_name}-age.key", priv_key)
add_machine(machine_name, pub_key, False)
def generate_secrets_group(
secret_group: str, machine_name: str, tempdir: Path, secret_options: dict[str, Any]
) -> None:
clan_dir = get_clan_flake_toplevel()
secrets = secret_options["secrets"]
needs_regeneration = any(
not has_secret(f"{machine_name}-{secret['name']}")
for secret in secrets.values()
)
generator = secret_options["generator"]
subdir = tempdir / secret_group
if needs_regeneration:
facts_dir = subdir / "facts"
facts_dir.mkdir(parents=True)
secrets_dir = subdir / "secrets"
secrets_dir.mkdir(parents=True)
text = f"""\
set -euo pipefail
facts={shlex.quote(str(facts_dir))}
secrets={shlex.quote(str(secrets_dir))}
{generator}
"""
try:
subprocess.run(["bash", "-c", text], check=True)
except subprocess.CalledProcessError:
msg = "failed to the following command:\n"
msg += text
raise ClanError(msg)
for secret in secrets.values():
secret_file = secrets_dir / secret["name"]
if not secret_file.is_file():
msg = f"did not generate a file for '{secret['name']}' when running the following command:\n"
msg += text
raise ClanError(msg)
encrypt_secret(
sops_secrets_folder() / f"{machine_name}-{secret['name']}",
secret_file.read_text(),
)
for fact in secret_options["facts"].values():
fact_file = facts_dir / fact["name"]
if not fact_file.is_file():
msg = f"did not generate a file for '{fact['name']}' when running the following command:\n"
msg += text
raise ClanError(msg)
fact_path = clan_dir.joinpath(fact["path"])
fact_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(fact_file, fact_path)
# this is called by the sops.nix clan core module
def generate_secrets_from_nix(
machine_name: str,
secret_submodules: dict[str, Any],
) -> None:
generate_host_key(machine_name)
errors = {}
with TemporaryDirectory() as d:
# if any of the secrets are missing, we regenerate all connected facts/secrets
for secret_group, secret_options in secret_submodules.items():
try:
generate_secrets_group(
secret_group, machine_name, Path(d), secret_options
)
except ClanError as e:
errors[secret_group] = e
for secret_group, error in errors.items():
print(f"failed to generate secrets for {machine_name}/{secret_group}:")
print(error, file=sys.stderr)
if len(errors) > 0:
sys.exit(1)
def generate_command(args: argparse.Namespace) -> None:
generate_secrets(args.machine)

View File

@ -0,0 +1,124 @@
import shlex
import shutil
import subprocess
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
from ..dirs import get_clan_flake_toplevel
from ..errors import ClanError
from ..ssh import parse_deployment_address
from .folders import sops_secrets_folder
from .machines import add_machine, has_machine
from .secrets import decrypt_secret, encrypt_secret, has_secret
from .sops import generate_private_key
def generate_host_key(machine_name: str) -> None:
if has_machine(machine_name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(sops_secrets_folder() / f"{machine_name}-age.key", priv_key)
add_machine(machine_name, pub_key, False)
def generate_secrets_group(
secret_group: str, machine_name: str, tempdir: Path, secret_options: dict[str, Any]
) -> None:
clan_dir = get_clan_flake_toplevel()
secrets = secret_options["secrets"]
needs_regeneration = any(
not has_secret(f"{machine_name}-{secret['name']}")
for secret in secrets.values()
)
generator = secret_options["generator"]
subdir = tempdir / secret_group
if needs_regeneration:
facts_dir = subdir / "facts"
facts_dir.mkdir(parents=True)
secrets_dir = subdir / "secrets"
secrets_dir.mkdir(parents=True)
text = f"""\
set -euo pipefail
facts={shlex.quote(str(facts_dir))}
secrets={shlex.quote(str(secrets_dir))}
{generator}
"""
try:
subprocess.run(["bash", "-c", text], check=True)
except subprocess.CalledProcessError:
msg = "failed to the following command:\n"
msg += text
raise ClanError(msg)
for secret in secrets.values():
secret_file = secrets_dir / secret["name"]
if not secret_file.is_file():
msg = f"did not generate a file for '{secret['name']}' when running the following command:\n"
msg += text
raise ClanError(msg)
encrypt_secret(
sops_secrets_folder() / f"{machine_name}-{secret['name']}",
secret_file.read_text(),
)
for fact in secret_options["facts"].values():
fact_file = facts_dir / fact["name"]
if not fact_file.is_file():
msg = f"did not generate a file for '{fact['name']}' when running the following command:\n"
msg += text
raise ClanError(msg)
fact_path = clan_dir.joinpath(fact["path"])
fact_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(fact_file, fact_path)
# this is called by the sops.nix clan core module
def generate_secrets_from_nix(
machine_name: str,
secret_submodules: dict[str, Any],
) -> None:
generate_host_key(machine_name)
errors = {}
with TemporaryDirectory() as d:
# if any of the secrets are missing, we regenerate all connected facts/secrets
for secret_group, secret_options in secret_submodules.items():
try:
generate_secrets_group(
secret_group, machine_name, Path(d), secret_options
)
except ClanError as e:
errors[secret_group] = e
for secret_group, error in errors.items():
print(f"failed to generate secrets for {machine_name}/{secret_group}:")
print(error, file=sys.stderr)
if len(errors) > 0:
sys.exit(1)
# this is called by the sops.nix clan core module
def upload_age_key_from_nix(
machine_name: str, deployment_address: str, age_key_file: str
) -> None:
secret_name = f"{machine_name}-age.key"
if not has_secret(secret_name): # skip uploading the secret, not managed by us
return
secret = decrypt_secret(secret_name)
h = parse_deployment_address(machine_name, deployment_address)
path = Path(age_key_file)
proc = h.run(
[
"bash",
"-c",
'mkdir -p "$0" && echo -n "$1" > "$2"',
str(path.parent),
secret,
age_key_file,
],
check=False,
)
if proc.returncode != 0:
print(f"failed to upload age key to {deployment_address}")
sys.exit(1)

View File

@ -1,14 +1,10 @@
import argparse
import json
import subprocess
import sys
from pathlib import Path
from ..dirs import get_clan_flake_toplevel
from ..errors import ClanError
from ..nix import nix_build, nix_config, nix_eval
from ..ssh import parse_deployment_address
from .secrets import decrypt_secret, has_secret
def upload_secrets(machine: str) -> None:
@ -19,7 +15,7 @@ def upload_secrets(machine: str) -> None:
proc = subprocess.run(
nix_build(
[
f'{clan_dir}#nixosConfigurations."{machine}".config.system.clan.{system}.uploadSecrets'
f'{clan_dir}#clanInternals.machines."{machine}".{system}.config.system.clan.uploadSecrets'
]
),
stdout=subprocess.PIPE,
@ -30,7 +26,7 @@ def upload_secrets(machine: str) -> None:
subprocess.run(
nix_eval(
[
f'{clan_dir}#nixosConfigurations."{machine}".config.clan.networking.deploymentAddress'
f'{clan_dir}#clanInternals.machines."{machine}".{system}.config.clan.networking.deploymentAddress'
]
),
stdout=subprocess.PIPE,
@ -53,34 +49,6 @@ def upload_secrets(machine: str) -> None:
print("successfully uploaded secrets")
# this is called by the sops.nix clan core module
def upload_age_key_from_nix(
machine_name: str, deployment_address: str, age_key_file: str
) -> None:
secret_name = f"{machine_name}-age.key"
if not has_secret(secret_name): # skip uploading the secret, not managed by us
return
secret = decrypt_secret(secret_name)
h = parse_deployment_address(machine_name, deployment_address)
path = Path(age_key_file)
proc = h.run(
[
"bash",
"-c",
'mkdir -p "$0" && echo -n "$1" > "$2"',
str(path.parent),
secret,
age_key_file,
],
check=False,
)
if proc.returncode != 0:
print(f"failed to upload age key to {deployment_address}")
sys.exit(1)
def upload_command(args: argparse.Namespace) -> None:
upload_secrets(args.machine)

View File

@ -1,3 +1,4 @@
{ lib, ... }:
{
perSystem = { self', pkgs, ... }: {
devShells.clan-cli = pkgs.callPackage ./shell.nix {