Merge pull request 'clan-cli secrets: add secret_store as python class' (#733) from lassulus-HEAD into main
All checks were successful
checks-impure / test (push) Successful in 1m20s
checks / test (push) Successful in 2m14s

This commit is contained in:
clan-bot 2024-01-24 14:52:14 +00:00
commit a3ef8ce832
25 changed files with 479 additions and 496 deletions

View File

@ -27,11 +27,12 @@ in
];
};
};
test_backup_client = { pkgs, lib, ... }:
test_backup_client = { pkgs, lib, config, ... }:
let
dependencies = [
self
pkgs.stdenv.drvPath
clan.clanInternals.machines.x86_64-linux.test_backup_client.config.system.clan.deployment.file
] ++ builtins.map (i: i.outPath) (builtins.attrValues self.inputs);
closureInfo = pkgs.closureInfo { rootPaths = dependencies; };
in

View File

@ -41,8 +41,9 @@ in
dependencies = [
self
pkgs.stdenv.drvPath
self.nixosConfigurations.test_install_machine.config.system.build.toplevel
self.nixosConfigurations.test_install_machine.config.system.build.diskoScript
clan.clanInternals.machines.x86_64-linux.test_install_machine.config.system.build.toplevel
clan.clanInternals.machines.x86_64-linux.test_install_machine.config.system.build.diskoScript
clan.clanInternals.machines.x86_64-linux.test_install_machine.config.system.clan.deployment.file
pkgs.nixos-anywhere
] ++ builtins.map (i: i.outPath) (builtins.attrValues self.inputs);
closureInfo = pkgs.closureInfo { rootPaths = dependencies; };

View File

@ -31,19 +31,24 @@
the directory on the deployment server where secrets are uploaded
'';
};
uploadSecrets = lib.mkOption {
type = lib.types.path;
secretsModule = lib.mkOption {
type = lib.types.str;
description = ''
script to upload secrets to the deployment server
the python import path to the secrets module
'';
default = "${pkgs.coreutils}/bin/true";
};
generateSecrets = lib.mkOption {
secretsData = lib.mkOption {
type = lib.types.path;
description = ''
script to generate secrets
secret data as json for the generator
'';
default = "${pkgs.coreutils}/bin/true";
default = pkgs.writers.writeJSON "secrets.json" (lib.mapAttrs
(_name: secret: {
secrets = builtins.attrNames secret.secrets;
facts = lib.mapAttrs (_: secret: secret.path) secret.facts;
generator = secret.generator.finalScript;
})
config.clanCore.secrets);
};
vm.create = lib.mkOption {
type = lib.types.path;
@ -60,7 +65,7 @@
# optimization for faster secret generate/upload and machines update
config = {
system.clan.deployment.data = {
inherit (config.system.clan) uploadSecrets generateSecrets;
inherit (config.system.clan) secretsModule secretsData;
inherit (config.clan.networking) deploymentAddress;
inherit (config.clanCore) secretsUploadDirectory;
};

View File

@ -6,7 +6,6 @@
description = ''
method to store secrets
custom can be used to define a custom secret store.
one would have to define system.clan.generateSecrets and system.clan.uploadSecrets
'';
};
@ -71,6 +70,7 @@
internal = true;
default = ''
export PATH="${lib.makeBinPath config.path}"
set -efu -o pipefail
${config.script}
'';
};

View File

@ -1,7 +1,4 @@
{ config, lib, pkgs, ... }:
let
passwordstoreDir = "\${PASSWORD_STORE_DIR:-$HOME/.password-store}";
in
{ config, lib, ... }:
{
options.clan.password-store.targetDirectory = lib.mkOption {
type = lib.types.path;
@ -13,104 +10,7 @@ in
config = lib.mkIf (config.clanCore.secretStore == "password-store") {
clanCore.secretsDirectory = config.clan.password-store.targetDirectory;
clanCore.secretsUploadDirectory = config.clan.password-store.targetDirectory;
system.clan.generateSecrets = lib.mkIf (config.clanCore.secrets != { }) (
pkgs.writeScript "generate-secrets" ''
#!/bin/sh
set -efu
test -d "$CLAN_DIR"
PATH=${lib.makeBinPath [
pkgs.pass
]}:$PATH
# TODO maybe initialize password store if it doesn't exist yet
${lib.foldlAttrs (acc: n: v: ''
${acc}
# ${n}
# if any of the secrets are missing, we regenerate all connected facts/secrets
(if ! (${lib.concatMapStringsSep " && " (x: "test -e ${passwordstoreDir}/machines/${config.clanCore.machineName}/${x.name}.gpg >/dev/null") (lib.attrValues v.secrets)}); then
tmpdir=$(mktemp -d)
trap "rm -rf $tmpdir" EXIT
cd $tmpdir
facts=$(mktemp -d)
trap "rm -rf $facts" EXIT
secrets=$(mktemp -d)
trap "rm -rf $secrets" EXIT
( ${v.generator.finalScript} )
${lib.concatMapStrings (fact: ''
mkdir -p "$CLAN_DIR"/"$(dirname ${fact.path})"
cp "$facts"/${fact.name} "$CLAN_DIR"/${fact.path}
'') (lib.attrValues v.facts)}
${lib.concatMapStrings (secret: ''
cat "$secrets"/${secret.name} | pass insert -m machines/${config.clanCore.machineName}/${secret.name}
'') (lib.attrValues v.secrets)}
fi)
'') "" config.clanCore.secrets}
''
);
system.clan.uploadSecrets = pkgs.writeScript "upload-secrets" ''
#!/bin/sh
set -efu
umask 0077
PATH=${lib.makeBinPath [
pkgs.pass
pkgs.git
pkgs.findutils
pkgs.rsync
]}:$PATH:${lib.getBin pkgs.openssh}
if test -e ${passwordstoreDir}/.git; then
local_pass_info=$(
git -C ${passwordstoreDir} log -1 --format=%H machines/${config.clanCore.machineName}
# we append a hash for every symlink, otherwise we would miss updates on
# files where the symlink points to
find ${passwordstoreDir}/machines/${config.clanCore.machineName} -type l \
-exec realpath {} + |
sort |
xargs -r -n 1 git -C ${passwordstoreDir} log -1 --format=%H
)
remote_pass_info=$(ssh ${config.clan.networking.deploymentAddress} -- ${lib.escapeShellArg ''
cat ${config.clan.password-store.targetDirectory}/.pass_info || :
''} || :)
if test "$local_pass_info" = "$remote_pass_info"; then
echo secrets already match
exit 23
fi
fi
find ${passwordstoreDir}/machines/${config.clanCore.machineName} -type f -follow ! -name .gpg-id |
while read -r gpg_path; do
rel_name=''${gpg_path#${passwordstoreDir}}
rel_name=''${rel_name%.gpg}
pass_date=$(
if test -e ${passwordstoreDir}/.git; then
git -C ${passwordstoreDir} log -1 --format=%aI "$gpg_path"
fi
)
pass_name=$rel_name
tmp_path="$SECRETS_DIR"/$(basename $rel_name)
mkdir -p "$(dirname "$tmp_path")"
pass show "$pass_name" > "$tmp_path"
if [ -n "$pass_date" ]; then
touch -d "$pass_date" "$tmp_path"
fi
done
if test -n "''${local_pass_info-}"; then
echo "$local_pass_info" > "$SECRETS_DIR"/.pass_info
fi
'';
system.clan.secretsModule = "clan_cli.secrets.modules.password_store";
};
}

View File

@ -25,33 +25,7 @@ in
config = lib.mkIf (config.clanCore.secretStore == "sops") {
clanCore.secretsDirectory = "/run/secrets";
clanCore.secretsPrefix = config.clanCore.machineName + "-";
system.clan = lib.mkIf (config.clanCore.secrets != { }) {
generateSecrets = pkgs.writeScript "generate-secrets" ''
#!${pkgs.python3}/bin/python
import json
import sys
from clan_cli.secrets.sops_generate import generate_secrets_from_nix
args = json.loads(${builtins.toJSON (builtins.toJSON {
machine_name = config.clanCore.machineName;
secret_submodules = lib.mapAttrs (_name: secret: {
secrets = builtins.attrNames secret.secrets;
facts = lib.mapAttrs (_: secret: secret.path) secret.facts;
generator = secret.generator.finalScript;
}) config.clanCore.secrets;
})})
generate_secrets_from_nix(**args)
'';
uploadSecrets = pkgs.writeScript "upload-secrets" ''
#!${pkgs.python3}/bin/python
import json
import sys
from clan_cli.secrets.sops_generate import upload_age_key_from_nix
# the second toJSON is needed to escape the string for the python
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; })})
upload_age_key_from_nix(**args)
'';
};
system.clan.secretsModule = "clan_cli.secrets.modules.sops";
sops.secrets = builtins.mapAttrs
(name: _: {
sopsFile = config.clanCore.clanDir + "/sops/secrets/${name}/secret";

View File

@ -140,8 +140,6 @@ in
toplevel = vmConfig.config.system.build.toplevel;
regInfo = (pkgs.closureInfo { rootPaths = vmConfig.config.virtualisation.additionalPaths; });
inherit (config.clan.virtualisation) memorySize cores graphics;
generateSecrets = config.system.clan.generateSecrets;
uploadSecrets = config.system.clan.uploadSecrets;
});
};

View File

@ -57,21 +57,15 @@ def create_parser(prog: str | None = None) -> argparse.ArgumentParser:
default=[],
)
def flake_path(arg: str) -> Path:
def flake_path(arg: str) -> str | Path:
flake_dir = Path(arg).resolve()
if not flake_dir.exists():
raise argparse.ArgumentTypeError(
f"flake directory {flake_dir} does not exist"
)
if not flake_dir.is_dir():
raise argparse.ArgumentTypeError(
f"flake directory {flake_dir} is not a directory"
)
return flake_dir
if flake_dir.exists() and flake_dir.is_dir():
return flake_dir
return arg
parser.add_argument(
"--flake",
help="path to the flake where the clan resides in",
help="path to the flake where the clan resides in, can be a remote flake or local",
default=get_clan_flake_toplevel(),
type=flake_path,
)

View File

@ -1,14 +1,16 @@
import argparse
import json
import logging
from ..errors import ClanError
from ..machines.machines import Machine
log = logging.getLogger(__name__)
def create_backup(machine: Machine, provider: str | None = None) -> None:
backup_scripts = json.loads(
machine.eval_nix(f"nixosConfigurations.{machine.name}.config.clanCore.backups")
)
log.info(f"creating backup for {machine.name}")
backup_scripts = json.loads(machine.eval_nix("config.clanCore.backups"))
if provider is None:
for provider in backup_scripts["providers"]:
proc = machine.host.run(
@ -31,7 +33,7 @@ def create_backup(machine: Machine, provider: str | None = None) -> None:
def create_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=args.flake)
machine = Machine(name=args.machine, flake=args.flake)
create_backup(machine=machine, provider=args.provider)

View File

@ -18,9 +18,7 @@ class Backup:
def list_provider(machine: Machine, provider: str) -> list[Backup]:
results = []
backup_metadata = json.loads(
machine.eval_nix(f"nixosConfigurations.{machine.name}.config.clanCore.backups")
)
backup_metadata = json.loads(machine.eval_nix("config.clanCore.backups"))
proc = machine.host.run(
["bash", "-c", backup_metadata["providers"][provider]["list"]],
stdout=subprocess.PIPE,
@ -45,9 +43,7 @@ def list_provider(machine: Machine, provider: str) -> list[Backup]:
def list_backups(machine: Machine, provider: str | None = None) -> list[Backup]:
backup_metadata = json.loads(
machine.eval_nix(f"nixosConfigurations.{machine.name}.config.clanCore.backups")
)
backup_metadata = json.loads(machine.eval_nix("config.clanCore.backups"))
results = []
if provider is None:
for _provider in backup_metadata["providers"]:
@ -60,7 +56,7 @@ def list_backups(machine: Machine, provider: str | None = None) -> list[Backup]:
def list_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=args.flake)
machine = Machine(name=args.machine, flake=args.flake)
backups = list_backups(machine=machine, provider=args.provider)
print(backups)

View File

@ -11,12 +11,8 @@ from .list import Backup, list_backups
def restore_service(
machine: Machine, backup: Backup, provider: str, service: str
) -> None:
backup_metadata = json.loads(
machine.eval_nix(f"nixosConfigurations.{machine.name}.config.clanCore.backups")
)
backup_folders = json.loads(
machine.eval_nix(f"nixosConfigurations.{machine.name}.config.clanCore.state")
)
backup_metadata = json.loads(machine.eval_nix("config.clanCore.backups"))
backup_folders = json.loads(machine.eval_nix("config.clanCore.state"))
folders = backup_folders[service]["folders"]
env = os.environ.copy()
env["ARCHIVE_ID"] = backup.archive_id
@ -77,11 +73,7 @@ def restore_backup(
if service is None:
for backup in backups:
if backup.archive_id == archive_id:
backup_folders = json.loads(
machine.eval_nix(
f"nixosConfigurations.{machine.name}.config.clanCore.state"
)
)
backup_folders = json.loads(machine.eval_nix("config.clanCore.state"))
for _service in backup_folders:
restore_service(machine, backup, provider, _service)
else:
@ -91,7 +83,7 @@ def restore_backup(
def restore_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=args.flake)
machine = Machine(name=args.machine, flake=args.flake)
backups = list_backups(machine=machine, provider=args.provider)
restore_backup(
machine=machine,

View File

@ -47,7 +47,7 @@ def user_gcroot_dir() -> Path:
return p
def specific_groot_dir(*, clan_name: str, flake_url: str) -> Path:
def machine_gcroot(*, clan_name: str, flake_url: str) -> Path:
# Always build icon so that we can symlink it to the gcroot
gcroot_dir = user_gcroot_dir()
clan_gcroot = gcroot_dir / clan_key_safe(clan_name, flake_url)

View File

@ -3,9 +3,10 @@ from dataclasses import dataclass
from pathlib import Path
from ..cmd import run
from ..dirs import specific_groot_dir
from ..dirs import machine_gcroot
from ..errors import ClanError
from ..machines.list import list_machines
from ..machines.machines import Machine
from ..nix import nix_build, nix_config, nix_eval, nix_metadata
from ..vms.inspect import VmConfig, inspect_vm
@ -29,23 +30,24 @@ def run_cmd(cmd: list[str]) -> str:
return proc.stdout.strip()
def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
def inspect_flake(flake_url: str | Path, machine_name: str) -> FlakeConfig:
config = nix_config()
system = config["system"]
# Check if the machine exists
machines = list_machines(flake_url)
if flake_attr not in machines:
if machine_name not in machines:
raise ClanError(
f"Machine {flake_attr} not found in {flake_url}. Available machines: {', '.join(machines)}"
f"Machine {machine_name} not found in {flake_url}. Available machines: {', '.join(machines)}"
)
vm = inspect_vm(flake_url, flake_attr)
machine = Machine(machine_name, flake_url)
vm = inspect_vm(machine)
# Get the cLAN name
cmd = nix_eval(
[
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.clanName'
f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clanCore.clanName'
]
)
res = run_cmd(cmd)
@ -54,7 +56,7 @@ def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
# Get the clan icon path
cmd = nix_eval(
[
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.clanIcon'
f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clanCore.clanIcon'
]
)
res = run_cmd(cmd)
@ -67,10 +69,9 @@ def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
cmd = nix_build(
[
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.clanIcon'
f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clanCore.clanIcon'
],
specific_groot_dir(clan_name=clan_name, flake_url=str(flake_url))
/ "clanIcon",
machine_gcroot(clan_name=clan_name, flake_url=str(flake_url)) / "clanIcon",
)
run_cmd(cmd)
@ -81,7 +82,7 @@ def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
vm=vm,
flake_url=flake_url,
clan_name=clan_name,
flake_attr=flake_attr,
flake_attr=machine_name,
nar_hash=meta["locked"]["narHash"],
icon=icon_path,
description=meta.get("description"),
@ -102,7 +103,7 @@ def inspect_command(args: argparse.Namespace) -> None:
flake=args.flake or Path.cwd(),
)
res = inspect_flake(
flake_url=inspect_options.flake, flake_attr=inspect_options.machine
flake_url=inspect_options.flake, machine_name=inspect_options.machine
)
print("cLAN name:", res.clan_name)
print("Icon:", res.icon)

View File

@ -1,4 +1,6 @@
import argparse
import importlib
import logging
from dataclasses import dataclass
from pathlib import Path
from tempfile import TemporaryDirectory
@ -8,10 +10,20 @@ from ..machines.machines import Machine
from ..nix import nix_shell
from ..secrets.generate import generate_secrets
log = logging.getLogger(__name__)
def install_nixos(machine: Machine, kexec: str | None = None) -> None:
log.info(f"deployment address1: {machine.deployment_info['deploymentAddress']}")
secrets_module = importlib.import_module(machine.secrets_module)
log.info(f"installing {machine.name}")
log.info(f"using secret store: {secrets_module.SecretStore}")
secret_store = secrets_module.SecretStore(machine=machine)
h = machine.host
log.info(f"deployment address2: {machine.deployment_info['deploymentAddress']}")
target_host = f"{h.user or 'root'}@{h.host}"
log.info(f"target host: {target_host}")
flake_attr = h.meta.get("flake_attr", "")
@ -19,18 +31,18 @@ def install_nixos(machine: Machine, kexec: str | None = None) -> None:
with TemporaryDirectory() as tmpdir_:
tmpdir = Path(tmpdir_)
upload_dir = machine.secrets_upload_directory
upload_dir_ = machine.secrets_upload_directory
if upload_dir.startswith("/"):
upload_dir = upload_dir[1:]
upload_dir = tmpdir / upload_dir
if upload_dir_.startswith("/"):
upload_dir_ = upload_dir_[1:]
upload_dir = tmpdir / upload_dir_
upload_dir.mkdir(parents=True)
machine.run_upload_secrets(upload_dir)
secret_store.upload(upload_dir)
cmd = [
"nixos-anywhere",
"-f",
f"{machine.flake_dir}#{flake_attr}",
f"{machine.flake}#{flake_attr}",
"-t",
"--no-reboot",
"--extra-files",
@ -64,8 +76,11 @@ def install_command(args: argparse.Namespace) -> None:
target_host=args.target_host,
kexec=args.kexec,
)
machine = Machine(opts.machine, flake_dir=opts.flake)
machine.deployment_address = opts.target_host
machine = Machine(opts.machine, flake=opts.flake)
machine.get_deployment_info()
machine.deployment_info["deploymentAddress"] = opts.target_host
log.info(f"target host: {opts.target_host}")
log.info(f"deployment address: {machine.deployment_info['deploymentAddress']}")
install_nixos(machine, kexec=opts.kexec)

View File

@ -1,36 +1,20 @@
import json
import os
import sys
import logging
from pathlib import Path
from ..cmd import Log, run
from ..cmd import run
from ..nix import nix_build, nix_config, nix_eval
from ..ssh import Host, parse_deployment_address
def build_machine_data(machine_name: str, clan_dir: Path) -> dict:
config = nix_config()
system = config["system"]
proc = run(
nix_build(
[
f'{clan_dir}#clanInternals.machines."{system}"."{machine_name}".config.system.clan.deployment.file'
]
),
log=Log.BOTH,
error_msg="failed to build machine data",
)
return json.loads(Path(proc.stdout.strip()).read_text())
log = logging.getLogger(__name__)
class Machine:
def __init__(
self,
name: str,
flake_dir: Path,
machine_data: dict | None = None,
flake: Path | str,
deployment_info: dict | None = None,
) -> None:
"""
Creates a Machine
@ -38,64 +22,93 @@ class Machine:
@clan_dir: the directory of the clan, optional, if not set it will be determined from the current working directory
@machine_json: can be optionally used to skip evaluation of the machine, location of the json file with machine data
"""
self.name = name
self.flake_dir = flake_dir
self.name: str = name
self.flake: str | Path = flake
if machine_data is None:
self.machine_data = build_machine_data(name, self.flake_dir)
else:
self.machine_data = machine_data
self.deployment_address = self.machine_data["deploymentAddress"]
self.upload_secrets = self.machine_data["uploadSecrets"]
self.generate_secrets = self.machine_data["generateSecrets"]
self.secrets_upload_directory = self.machine_data["secretsUploadDirectory"]
self.eval_cache: dict[str, str] = {}
self.build_cache: dict[str, Path] = {}
if deployment_info is not None:
self.deployment_info = deployment_info
def get_deployment_info(self) -> None:
self.deployment_info = json.loads(
self.build_nix("config.system.clan.deployment.file").read_text()
)
@property
def deployment_address(self) -> str:
if not hasattr(self, "deployment_info"):
self.get_deployment_info()
return self.deployment_info["deploymentAddress"]
@property
def secrets_module(self) -> str:
if not hasattr(self, "deployment_info"):
self.get_deployment_info()
return self.deployment_info["secretsModule"]
@property
def secrets_data(self) -> dict:
if not hasattr(self, "deployment_info"):
self.get_deployment_info()
if self.deployment_info["secretsData"]:
try:
return json.loads(Path(self.deployment_info["secretsData"]).read_text())
except json.JSONDecodeError:
log.error(
f"Failed to parse secretsData for machine {self.name} as json"
)
return {}
return {}
@property
def secrets_upload_directory(self) -> str:
if not hasattr(self, "deployment_info"):
self.get_deployment_info()
return self.deployment_info["secretsUploadDirectory"]
@property
def flake_dir(self) -> Path:
if isinstance(self.flake, Path):
return self.flake
if hasattr(self, "flake_path"):
return Path(self.flake_path)
print(nix_eval([f"{self.flake}"]))
self.flake_path = run(nix_eval([f"{self.flake}"])).stdout.strip()
return Path(self.flake_path)
@property
def host(self) -> Host:
return parse_deployment_address(
self.name, self.deployment_address, meta={"machine": self}
)
def run_upload_secrets(self, secrets_dir: Path) -> bool:
"""
Upload the secrets to the provided directory
@secrets_dir: the directory to store the secrets in
"""
env = os.environ.copy()
env["CLAN_DIR"] = str(self.flake_dir)
env["PYTHONPATH"] = str(
":".join(sys.path)
) # TODO do this in the clanCore module
env["SECRETS_DIR"] = str(secrets_dir)
print(f"uploading secrets... {self.upload_secrets}")
proc = run(
[self.upload_secrets],
env=env,
check=False,
)
if proc.returncode == 23:
print("no secrets to upload")
return False
elif proc.returncode != 0:
print("failed generate secrets directory")
exit(1)
return True
def eval_nix(self, attr: str, refresh: bool = False) -> str:
"""
eval a nix attribute of the machine
@attr: the attribute to get
"""
config = nix_config()
system = config["system"]
attr = f'clanInternals.machines."{system}".{self.name}.{attr}'
if attr in self.eval_cache and not refresh:
return self.eval_cache[attr]
output = run(
nix_eval([f"path:{self.flake_dir}#{attr}"]),
).stdout.strip()
if isinstance(self.flake, Path):
if (self.flake / ".git").exists():
flake = f"git+file://{self.flake}"
else:
flake = f"path:{self.flake}"
else:
flake = self.flake
log.info(f"evaluating {flake}#{attr}")
output = run(nix_eval([f"{flake}#{attr}"])).stdout.strip()
self.eval_cache[attr] = output
return output
@ -104,10 +117,21 @@ class Machine:
build a nix attribute of the machine
@attr: the attribute to get
"""
config = nix_config()
system = config["system"]
attr = f'clanInternals.machines."{system}".{self.name}.{attr}'
if attr in self.build_cache and not refresh:
return self.build_cache[attr]
outpath = run(
nix_build([f"path:{self.flake_dir}#{attr}"]),
).stdout.strip()
if isinstance(self.flake, Path):
flake = f"path:{self.flake}"
else:
flake = self.flake
log.info(f"building {flake}#{attr}")
outpath = run(nix_build([f"{flake}#{attr}"])).stdout.strip()
self.build_cache[attr] = Path(outpath)
return Path(outpath)

View File

@ -94,7 +94,7 @@ def get_all_machines(clan_dir: Path) -> HostGroup:
machine_data["deploymentAddress"],
meta={
"machine": Machine(
name=name, flake_dir=clan_dir, machine_data=machine_data
name=name, flake=clan_dir, deployment_info=machine_data
)
},
)
@ -105,7 +105,7 @@ def get_all_machines(clan_dir: Path) -> HostGroup:
def get_selected_machines(machine_names: list[str], flake_dir: Path) -> HostGroup:
hosts = []
for name in machine_names:
machine = Machine(name=name, flake_dir=flake_dir)
machine = Machine(name=name, flake=flake_dir)
hosts.append(machine.host)
return HostGroup(hosts)
@ -115,8 +115,8 @@ def update(args: argparse.Namespace) -> None:
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
if len(args.machines) == 1 and args.target_host is not None:
machine = Machine(name=args.machines[0], flake_dir=args.flake)
machine.deployment_address = args.target_host
machine = Machine(name=args.machines[0], flake=args.flake)
machine.deployment_info["deploymentAddress"] = args.target_host
host = parse_deployment_address(
args.machines[0],
args.target_host,

View File

@ -1,33 +1,80 @@
import argparse
import importlib
import logging
import os
import sys
import shutil
from pathlib import Path
from tempfile import TemporaryDirectory
from clan_cli.cmd import Log, run
from clan_cli.cmd import run
from ..errors import ClanError
from ..machines.machines import Machine
from ..nix import nix_shell
log = logging.getLogger(__name__)
def generate_secrets(machine: Machine) -> None:
env = os.environ.copy()
env["CLAN_DIR"] = str(machine.flake_dir)
env["PYTHONPATH"] = ":".join(sys.path) # TODO do this in the clanCore module
secrets_module = importlib.import_module(machine.secrets_module)
secret_store = secrets_module.SecretStore(machine=machine)
print(f"generating secrets... {machine.generate_secrets}")
run(
[machine.generate_secrets],
env=env,
error_msg="failed to generate secrets",
log=Log.BOTH,
)
with TemporaryDirectory() as d:
for service in machine.secrets_data:
print(service)
tmpdir = Path(d) / service
# check if all secrets exist and generate them if at least one is missing
needs_regeneration = any(
not secret_store.exists(service, secret)
for secret in machine.secrets_data[service]["secrets"]
) or any(
not (machine.flake / fact).exists()
for fact in machine.secrets_data[service]["facts"].values()
)
for fact in machine.secrets_data[service]["facts"].values():
if not (machine.flake / fact).exists():
print(f"fact {fact} is missing")
if needs_regeneration:
env = os.environ.copy()
facts_dir = tmpdir / "facts"
facts_dir.mkdir(parents=True)
env["facts"] = str(facts_dir)
secrets_dir = tmpdir / "secrets"
secrets_dir.mkdir(parents=True)
env["secrets"] = str(secrets_dir)
# TODO use bubblewrap here
cmd = nix_shell(
["nixpkgs#bash"],
["bash", "-c", machine.secrets_data[service]["generator"]],
)
run(
cmd,
env=env,
)
# store secrets
for secret in machine.secrets_data[service]["secrets"]:
secret_file = secrets_dir / secret
if not secret_file.is_file():
msg = f"did not generate a file for '{secret}' when running the following command:\n"
msg += machine.secrets_data[service]["generator"]
raise ClanError(msg)
secret_store.set(service, secret, secret_file.read_text())
# store facts
for name, fact_path in machine.secrets_data[service]["facts"].items():
fact_file = facts_dir / name
if not fact_file.is_file():
msg = f"did not generate a file for '{name}' when running the following command:\n"
msg += machine.secrets_data[service]["generator"]
raise ClanError(msg)
fact_path = machine.flake / fact_path
fact_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(fact_file, fact_path)
print("successfully generated secrets")
def generate_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=args.flake)
machine = Machine(name=args.machine, flake=args.flake)
generate_secrets(machine)

View File

@ -0,0 +1,106 @@
import os
import subprocess
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell
class SecretStore:
def __init__(self, machine: Machine) -> None:
self.machine = machine
def set(self, service: str, name: str, value: str) -> None:
subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
),
input=value.encode("utf-8"),
check=True,
)
def get(self, service: str, name: str) -> bytes:
return subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "show", f"machines/{self.machine.name}/{name}"],
),
check=True,
stdout=subprocess.PIPE,
).stdout
def exists(self, service: str, name: str) -> bool:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
print(f"checking {secret_path}")
return secret_path.exists()
def generate_hash(self) -> bytes:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
hashes = []
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
f"machines/{self.machine.name}",
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"):
if symlink.is_symlink():
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
str(symlink),
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
# we sort the hashes to make sure that the order is always the same
hashes.sort()
return b"\n".join(hashes)
def update_check(self) -> bool:
local_hash = self.generate_hash()
remote_hash = self.machine.host.run(
# TODO get the path to the secrets from the machine
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
check=False,
stdout=subprocess.PIPE,
).stdout.strip()
if not remote_hash:
print("remote hash is empty")
return False
return local_hash.decode() == remote_hash
def upload(self, output_dir: Path) -> None:
for service in self.machine.secrets_data:
for secret in self.machine.secrets_data[service]["secrets"]:
(output_dir / secret).write_bytes(self.get(service, secret))
(output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@ -0,0 +1,54 @@
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.machines import add_machine, has_machine
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
from clan_cli.secrets.sops import generate_private_key
class SecretStore:
def __init__(self, machine: Machine) -> None:
self.machine = machine
# no need to generate keys if we don't manage secrets
if not hasattr(self.machine, "secrets_data"):
return
if not self.machine.secrets_data:
return
if has_machine(self.machine.flake_dir, self.machine.name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir)
/ f"{self.machine.name}-age.key",
priv_key,
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def set(self, _service: str, name: str, value: str) -> None:
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
value,
add_machines=[self.machine.name],
)
def get(self, _service: str, _name: str) -> bytes:
raise NotImplementedError()
def exists(self, _service: str, name: str) -> bool:
return has_secret(
self.machine.flake_dir,
f"{self.machine.name}-{name}",
)
def upload(self, output_dir: Path) -> None:
key_name = f"{self.machine.name}-age.key"
if not has_secret(self.machine.flake_dir, key_name):
# skip uploading the secret, not managed by us
return
key = decrypt_secret(self.machine.flake_dir, key_name)
(output_dir / "key.txt").write_text(key)

View File

@ -1,128 +0,0 @@
import logging
import os
import shlex
import shutil
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
from clan_cli.cmd import Log, run
from clan_cli.nix import nix_shell
from ..errors import ClanError
from .folders import sops_secrets_folder
from .machines import add_machine, has_machine
from .secrets import decrypt_secret, encrypt_secret, has_secret
from .sops import generate_private_key
log = logging.getLogger(__name__)
def generate_host_key(flake_dir: Path, machine_name: str) -> None:
if has_machine(flake_dir, machine_name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
flake_dir,
sops_secrets_folder(flake_dir) / f"{machine_name}-age.key",
priv_key,
)
add_machine(flake_dir, machine_name, pub_key, False)
def generate_secrets_group(
flake_dir: Path,
secret_group: str,
machine_name: str,
tempdir: Path,
secret_options: dict[str, Any],
) -> None:
clan_dir = flake_dir
secrets = secret_options["secrets"]
needs_regeneration = any(
not has_secret(flake_dir, f"{machine_name}-{name}") for name in secrets
) or any(
not (flake_dir / fact).exists() for fact in secret_options["facts"].values()
)
generator = secret_options["generator"]
subdir = tempdir / secret_group
if needs_regeneration:
facts_dir = subdir / "facts"
facts_dir.mkdir(parents=True)
secrets_dir = subdir / "secrets"
secrets_dir.mkdir(parents=True)
text = f"""
set -euo pipefail
export facts={shlex.quote(str(facts_dir))}
export secrets={shlex.quote(str(secrets_dir))}
{generator}
"""
cmd = nix_shell(["nixpkgs#bash"], ["bash", "-c", text])
run(cmd, log=Log.BOTH)
for name in secrets:
secret_file = secrets_dir / name
if not secret_file.is_file():
msg = f"did not generate a file for '{name}' when running the following command:\n"
msg += text
raise ClanError(msg)
encrypt_secret(
flake_dir,
sops_secrets_folder(flake_dir) / f"{machine_name}-{name}",
secret_file.read_text(),
add_machines=[machine_name],
)
for name, fact_path in secret_options["facts"].items():
fact_file = facts_dir / name
if not fact_file.is_file():
msg = f"did not generate a file for '{name}' when running the following command:\n"
msg += text
raise ClanError(msg)
fact_path = clan_dir / fact_path
fact_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(fact_file, fact_path)
# this is called by the sops.nix clan core module
def generate_secrets_from_nix(
machine_name: str,
secret_submodules: dict[str, Any],
) -> None:
flake_dir = Path(os.environ["CLAN_DIR"])
generate_host_key(flake_dir, machine_name)
errors = {}
log.debug("Generating secrets for machine %s and flake %s", machine_name, flake_dir)
with TemporaryDirectory() as d:
# if any of the secrets are missing, we regenerate all connected facts/secrets
for secret_group, secret_options in secret_submodules.items():
try:
generate_secrets_group(
flake_dir, secret_group, machine_name, Path(d), secret_options
)
except ClanError as e:
errors[secret_group] = e
for secret_group, error in errors.items():
print(f"failed to generate secrets for {machine_name}/{secret_group}:")
print(error, file=sys.stderr)
if len(errors) > 0:
sys.exit(1)
# this is called by the sops.nix clan core module
def upload_age_key_from_nix(
machine_name: str,
) -> None:
flake_dir = Path(os.environ["CLAN_DIR"])
log.debug("Uploading secrets for machine %s and flake %s", machine_name, flake_dir)
secret_name = f"{machine_name}-age.key"
if not has_secret(
flake_dir, secret_name
): # skip uploading the secret, not managed by us
return
secret = decrypt_secret(flake_dir, secret_name)
secrets_dir = Path(os.environ["SECRETS_DIR"])
(secrets_dir / "key.txt").write_text(secret)

View File

@ -1,4 +1,5 @@
import argparse
import importlib
import logging
from pathlib import Path
from tempfile import TemporaryDirectory
@ -11,33 +12,38 @@ log = logging.getLogger(__name__)
def upload_secrets(machine: Machine) -> None:
with TemporaryDirectory() as tempdir_:
tempdir = Path(tempdir_)
should_upload = machine.run_upload_secrets(tempdir)
secrets_module = importlib.import_module(machine.secrets_module)
secret_store = secrets_module.SecretStore(machine=machine)
if should_upload:
host = machine.host
update_check = getattr(secret_store, "update_check", None)
if callable(update_check):
if update_check():
log.info("Secrets already up to date")
return
with TemporaryDirectory() as tempdir:
secret_store.upload(Path(tempdir))
host = machine.host
ssh_cmd = host.ssh_cmd()
run(
nix_shell(
["nixpkgs#rsync"],
[
"rsync",
"-e",
" ".join(["ssh"] + ssh_cmd[2:]),
"-az",
"--delete",
f"{tempdir!s}/",
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",
],
),
log=Log.BOTH,
)
ssh_cmd = host.ssh_cmd()
run(
nix_shell(
["nixpkgs#rsync"],
[
"rsync",
"-e",
" ".join(["ssh"] + ssh_cmd[2:]),
"-az",
"--delete",
f"{tempdir!s}/",
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",
],
),
log=Log.BOTH,
)
def upload_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=args.flake)
machine = Machine(name=args.machine, flake=args.flake)
upload_secrets(machine)

View File

@ -3,15 +3,14 @@ import json
from dataclasses import dataclass
from pathlib import Path
from ..cmd import run
from ..nix import nix_config, nix_eval
from ..machines.machines import Machine
@dataclass
class VmConfig:
clan_name: str
machine_name: str
flake_url: str | Path
flake_attr: str
clan_name: str
cores: int
memory_size: int
@ -19,21 +18,9 @@ class VmConfig:
wayland: bool = False
def inspect_vm(flake_url: str | Path, flake_attr: str) -> VmConfig:
config = nix_config()
system = config["system"]
cmd = nix_eval(
[
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.vm.inspect',
"--refresh",
]
)
proc = run(cmd)
data = json.loads(proc.stdout)
return VmConfig(flake_url=flake_url, flake_attr=flake_attr, **data)
def inspect_vm(machine: Machine) -> VmConfig:
data = json.loads(machine.eval_nix("config.clanCore.vm.inspect"))
return VmConfig(machine_name=machine.name, flake_url=machine.flake, **data)
@dataclass
@ -47,9 +34,9 @@ def inspect_command(args: argparse.Namespace) -> None:
machine=args.machine,
flake=args.flake or Path.cwd(),
)
res = inspect_vm(
flake_url=inspect_options.flake, flake_attr=inspect_options.machine
)
machine = Machine(inspect_options.machine, inspect_options.flake)
res = inspect_vm(machine)
print("Cores:", res.cores)
print("Memory size:", res.memory_size)
print("Graphics:", res.graphics)

View File

@ -1,17 +1,19 @@
import argparse
import importlib
import json
import logging
import os
import sys
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import IO
from ..cmd import Log, run
from ..dirs import module_root, specific_groot_dir, vm_state_dir
from ..dirs import machine_gcroot, module_root, vm_state_dir
from ..errors import ClanError
from ..machines.machines import Machine
from ..nix import nix_build, nix_config, nix_shell
from ..secrets.generate import generate_secrets
from .inspect import VmConfig, inspect_vm
log = logging.getLogger(__name__)
@ -73,7 +75,7 @@ def qemu_command(
# fmt: off
command = [
"qemu-kvm",
"-name", vm.flake_attr,
"-name", vm.machine_name,
"-m", f'{nixos_config["memorySize"]}M',
"-smp", str(nixos_config["cores"]),
"-cpu", "max",
@ -102,56 +104,48 @@ def qemu_command(
return command
def get_vm_create_info(vm: VmConfig, nix_options: list[str]) -> dict[str, str]:
# TODO move this to the Machines class
def get_vm_create_info(
machine: Machine, vm: VmConfig, nix_options: list[str]
) -> dict[str, str]:
config = nix_config()
system = config["system"]
clan_dir = vm.flake_url
machine = vm.flake_attr
clan_dir = machine.flake
cmd = nix_build(
[
f'{clan_dir}#clanInternals.machines."{system}"."{machine}".config.system.clan.vm.create',
f'{clan_dir}#clanInternals.machines."{system}"."{machine.name}".config.system.clan.vm.create',
*nix_options,
],
specific_groot_dir(clan_name=vm.clan_name, flake_url=str(vm.flake_url))
/ f"vm-{machine}",
machine_gcroot(clan_name=vm.clan_name, flake_url=str(vm.flake_url))
/ f"vm-{machine.name}",
)
proc = run(
cmd, log=Log.BOTH, error_msg=f"Could not build vm config for {machine.name}"
)
proc = run(cmd, log=Log.BOTH, error_msg=f"Could not build vm config for {machine}")
try:
return json.loads(Path(proc.stdout.strip()).read_text())
except json.JSONDecodeError as e:
raise ClanError(f"Failed to parse vm config: {e}")
def generate_secrets(
vm: VmConfig,
nixos_config: dict[str, str],
def get_secrets(
machine: Machine,
tmpdir: Path,
log_fd: IO[str] | None,
) -> Path:
secrets_dir = tmpdir / "secrets"
secrets_dir.mkdir(exist_ok=True)
env = os.environ.copy()
env["CLAN_DIR"] = str(vm.flake_url)
env["PYTHONPATH"] = str(":".join(sys.path)) # TODO do this in the clanCore module
env["SECRETS_DIR"] = str(secrets_dir)
secrets_module = importlib.import_module(machine.secrets_module)
secret_store = secrets_module.SecretStore(machine=machine)
# Only generate secrets for local clans
if isinstance(vm.flake_url, Path) and vm.flake_url.is_dir():
if Path(vm.flake_url).is_dir():
run([nixos_config["generateSecrets"], vm.clan_name], env=env)
else:
log.warning("won't generate secrets for non local clan")
if isinstance(machine.flake, Path) and machine.flake.is_dir():
generate_secrets(machine)
else:
log.warning("won't generate secrets for non local clan")
cmd = [nixos_config["uploadSecrets"]]
run(
cmd,
env=env,
log=Log.BOTH,
error_msg=f"Could not upload secrets for {vm.flake_attr}",
)
secret_store.upload(secrets_dir)
return secrets_dir
@ -192,26 +186,28 @@ def prepare_disk(tmpdir: Path, log_fd: IO[str] | None) -> Path:
def run_vm(
vm: VmConfig, nix_options: list[str] = [], log_fd: IO[str] | None = None
vm: VmConfig,
nix_options: list[str] = [],
log_fd: IO[str] | None = None,
) -> None:
"""
log_fd can be used to stream the output of all commands to a UI
"""
machine = vm.flake_attr
machine = Machine(vm.machine_name, vm.flake_url)
log.debug(f"Creating VM for {machine}")
# TODO: We should get this from the vm argument
nixos_config = get_vm_create_info(vm, nix_options)
nixos_config = get_vm_create_info(machine, vm, nix_options)
with tempfile.TemporaryDirectory() as tmpdir_:
tmpdir = Path(tmpdir_)
xchg_dir = tmpdir / "xchg"
xchg_dir.mkdir(exist_ok=True)
secrets_dir = generate_secrets(vm, nixos_config, tmpdir, log_fd)
secrets_dir = get_secrets(machine, tmpdir)
disk_img = prepare_disk(tmpdir, log_fd)
state_dir = vm_state_dir(vm.clan_name, str(vm.flake_url), machine)
state_dir = vm_state_dir(vm.clan_name, str(machine.flake), machine.name)
state_dir.mkdir(parents=True, exist_ok=True)
qemu_cmd = qemu_command(
@ -244,7 +240,6 @@ def run_vm(
@dataclass
class RunOptions:
machine: str
flake_url: str | None
flake: Path
nix_options: list[str] = field(default_factory=list)
wayland: bool = False
@ -253,14 +248,14 @@ class RunOptions:
def run_command(args: argparse.Namespace) -> None:
run_options = RunOptions(
machine=args.machine,
flake_url=args.flake_url,
flake=args.flake or Path.cwd(),
flake=args.flake,
nix_options=args.option,
wayland=args.wayland,
)
flake_url = run_options.flake_url or run_options.flake
vm = inspect_vm(flake_url=flake_url, flake_attr=run_options.machine)
machine = Machine(run_options.machine, run_options.flake)
vm = inspect_vm(machine=machine)
# TODO: allow to set this in the config
vm.wayland = run_options.wayland

View File

@ -52,13 +52,16 @@ def test_run(
def test_vm_persistence(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
age_keys: list["KeyPair"],
) -> None:
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "new-clan",
substitutions=dict(
__CHANGE_ME__="_test_vm_persistence",
),
substitutions={
"__CHANGE_ME__": "_test_vm_persistence",
"git+https://git.clan.lol/clan/clan-core": "path://" + str(CLAN_CORE),
},
machine_configs=dict(
my_machine=dict(
clanCore=dict(state=dict(my_state=dict(folders=["/var/my-state"]))),
@ -90,7 +93,17 @@ def test_vm_persistence(
),
)
monkeypatch.chdir(flake.path)
Cli().run(["vms", "run", "my_machine"])
cli = Cli()
cli.run(
[
"secrets",
"users",
"add",
"user1",
age_keys[0].pubkey,
]
)
cli.run(["vms", "run", "my_machine"])
test_file = (
vm_state_dir("_test_vm_persistence", str(flake.path), "my_machine")
/ "var"