1
0
forked from clan/clan-core

Merge pull request 'vars: add support for password-store' (#1794) from DavHau/clan-core:DavHau-vars into main

This commit is contained in:
clan-bot 2024-07-23 07:59:50 +00:00
commit aec1238f20
4 changed files with 130 additions and 12 deletions

View File

@ -17,7 +17,7 @@ in
imports = [
./public/in_repo.nix
# ./public/vm.nix
# ./secret/password-store.nix
./secret/password-store.nix
./secret/sops.nix
# ./secret/vm.nix
];

View File

@ -0,0 +1,12 @@
{ config, lib, ... }:
{
config.clan.core.vars.settings =
lib.mkIf (config.clan.core.vars.settings.secretStore == "password-store")
{
fileModule = file: {
path = lib.mkIf file.secret "${config.clan.core.password-store.targetDirectory}/${config.clan.core.machineName}-${file.config.generatorName}-${file.config.name}";
};
secretUploadDirectory = lib.mkDefault "/etc/secrets";
secretModule = "clan_cli.vars.secret_modules.password_store";
};
}

View File

@ -13,33 +13,45 @@ class SecretStore(SecretStoreBase):
self.machine = machine
def set(
self, service: str, name: str, value: bytes, groups: list[str]
self, generator_name: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
[
"pass",
"insert",
"-m",
f"machines/{self.machine.name}/{generator_name}/{name}",
],
),
input=value,
check=True,
)
return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes:
def get(self, generator_name: str, name: str) -> bytes:
return subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "show", f"machines/{self.machine.name}/{name}"],
[
"pass",
"show",
f"machines/{self.machine.name}/{generator_name}/{name}",
],
),
check=True,
stdout=subprocess.PIPE,
).stdout
def exists(self, service: str, name: str) -> bool:
def exists(self, generator_name: str, name: str) -> bool:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
secret_path = (
Path(password_store)
/ f"machines/{self.machine.name}/{generator_name}/{name}.gpg"
)
return secret_path.exists()
def generate_hash(self) -> bytes:

View File

@ -1,4 +1,5 @@
import os
import subprocess
from collections import defaultdict
from collections.abc import Callable
from io import StringIO
@ -14,7 +15,8 @@ from root import CLAN_CORE
from clan_cli.clan_uri import FlakeId
from clan_cli.machines.machines import Machine
from clan_cli.vars.secret_modules.sops import SecretStore
from clan_cli.nix import nix_shell
from clan_cli.vars.secret_modules import password_store, sops
def def_value() -> defaultdict:
@ -95,7 +97,45 @@ def test_generate_public_var(
@pytest.mark.impure
def test_generate_secret_var_with_default_group(
def test_generate_secret_var_sops(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
user = os.environ.get("USER", "user")
config = nested_dict()
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
my_generator["files"]["my_secret"]["secret"] = True
my_generator["script"] = "echo hello > $out/my_secret"
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config),
)
monkeypatch.chdir(flake.path)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(flake.path),
user,
sops_setup.keys[0].pubkey,
]
)
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
var_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret"
)
assert not var_file_path.is_file()
sops_store = sops.SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path)))
assert sops_store.exists("my_generator", "my_secret")
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n"
@pytest.mark.impure
def test_generate_secret_var_sops_with_default_group(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
@ -128,7 +168,7 @@ def test_generate_secret_var_with_default_group(
assert not (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret"
).is_file()
sops_store = SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path)))
sops_store = sops.SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path)))
assert sops_store.exists("my_generator", "my_secret")
assert (
flake.path
@ -141,6 +181,60 @@ def test_generate_secret_var_with_default_group(
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n"
@pytest.mark.impure
def test_generate_secret_var_password_store(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
) -> None:
config = nested_dict()
my_generator = config["clan"]["core"]["vars"]["settings"]["secretStore"] = (
"password-store"
)
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
my_generator["files"]["my_secret"]["secret"] = True
my_generator["script"] = "echo hello > $out/my_secret"
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config),
)
monkeypatch.chdir(flake.path)
gnupghome = temporary_home / "gpg"
gnupghome.mkdir(mode=0o700)
monkeypatch.setenv("GNUPGHOME", str(gnupghome))
monkeypatch.setenv("PASSWORD_STORE_DIR", str(temporary_home / "pass"))
gpg_key_spec = temporary_home / "gpg_key_spec"
gpg_key_spec.write_text(
"""
Key-Type: 1
Key-Length: 1024
Name-Real: Root Superuser
Name-Email: test@local
Expire-Date: 0
%no-protection
"""
)
subprocess.run(
nix_shell(
["nixpkgs#gnupg"], ["gpg", "--batch", "--gen-key", str(gpg_key_spec)]
),
check=True,
)
subprocess.run(
nix_shell(["nixpkgs#pass"], ["pass", "init", "test@local"]), check=True
)
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
var_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret"
)
assert not var_file_path.is_file()
store = password_store.SecretStore(
Machine(name="my_machine", flake=FlakeId(flake.path))
)
assert store.exists("my_generator", "my_secret")
assert store.get("my_generator", "my_secret").decode() == "hello\n"
@pytest.mark.impure
def test_generate_secret_for_multiple_machines(
monkeypatch: pytest.MonkeyPatch,
@ -196,8 +290,8 @@ def test_generate_secret_for_multiple_machines(
assert machine2_var_file_path.is_file()
assert machine2_var_file_path.read_text() == "machine2\n"
# check if secret vars have been created correctly
sops_store1 = SecretStore(Machine(name="machine1", flake=FlakeId(flake.path)))
sops_store2 = SecretStore(Machine(name="machine2", flake=FlakeId(flake.path)))
sops_store1 = sops.SecretStore(Machine(name="machine1", flake=FlakeId(flake.path)))
sops_store2 = sops.SecretStore(Machine(name="machine2", flake=FlakeId(flake.path)))
assert sops_store1.exists("my_generator", "my_secret")
assert sops_store2.exists("my_generator", "my_secret")
assert sops_store1.get("my_generator", "my_secret").decode() == "machine1\n"