From 70ca824e889f8e8c08b5bcfe2623e3f8808e11c2 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Mon, 24 Jun 2024 21:41:16 +0200 Subject: [PATCH] clan-cli: Acutally test SecretStore for age and password-store. --- .../clan_cli/facts/secret_modules/sops.py | 4 +- pkgs/clan-cli/tests/age_keys.py | 14 ---- pkgs/clan-cli/tests/helpers/validator.py | 34 ++++++++++ .../test_flake_with_core_and_pass/flake.nix | 8 +++ pkgs/clan-cli/tests/test_secrets_generate.py | 68 +++++++------------ .../tests/test_secrets_password_store.py | 37 ++++++++++ 6 files changed, 105 insertions(+), 60 deletions(-) create mode 100644 pkgs/clan-cli/tests/helpers/validator.py diff --git a/pkgs/clan-cli/clan_cli/facts/secret_modules/sops.py b/pkgs/clan-cli/clan_cli/facts/secret_modules/sops.py index 9b6733cf..38ce96c7 100644 --- a/pkgs/clan-cli/clan_cli/facts/secret_modules/sops.py +++ b/pkgs/clan-cli/clan_cli/facts/secret_modules/sops.py @@ -47,7 +47,9 @@ class SecretStore(SecretStoreBase): return path def get(self, service: str, name: str) -> bytes: - raise NotImplementedError() + return decrypt_secret( + self.machine.flake_dir, f"{self.machine.name}-{name}" + ).encode("utf-8") def exists(self, service: str, name: str) -> bool: return has_secret( diff --git a/pkgs/clan-cli/tests/age_keys.py b/pkgs/clan-cli/tests/age_keys.py index 2c7943ec..5a0e038a 100644 --- a/pkgs/clan-cli/tests/age_keys.py +++ b/pkgs/clan-cli/tests/age_keys.py @@ -1,5 +1,3 @@ -import subprocess - import pytest @@ -9,18 +7,6 @@ class KeyPair: self.privkey = privkey -def is_valid_age_key(secret_key: str) -> bool: - # Run the age-keygen command with the -y flag to check the key format - result = subprocess.run( - ["age-keygen", "-y"], input=secret_key, capture_output=True, text=True - ) - - if result.returncode == 0: - return True - else: - raise ValueError(f"Invalid age key: {secret_key}") - - KEYS = [ KeyPair( "age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c", diff --git a/pkgs/clan-cli/tests/helpers/validator.py b/pkgs/clan-cli/tests/helpers/validator.py new file mode 100644 index 00000000..1aafecab --- /dev/null +++ b/pkgs/clan-cli/tests/helpers/validator.py @@ -0,0 +1,34 @@ +import subprocess +import tempfile + + +def is_valid_age_key(secret_key: str) -> bool: + # Run the age-keygen command with the -y flag to check the key format + result = subprocess.run( + ["age-keygen", "-y"], input=secret_key, capture_output=True, text=True + ) + + if result.returncode == 0: + return True + else: + raise ValueError(f"Invalid age key: {secret_key}") + + +def is_valid_ssh_key(secret_key: str, ssh_pub: str) -> bool: + # create tempfile and write secret_key to it + with tempfile.NamedTemporaryFile() as temp: + temp.write(secret_key.encode("utf-8")) + temp.flush() + # Run the ssh-keygen command with the -y flag to check the key format + result = subprocess.run( + ["ssh-keygen", "-y", "-f", temp.name], capture_output=True, text=True + ) + + if result.returncode == 0: + if result.stdout != ssh_pub: + raise ValueError( + f"Expected '{ssh_pub}' got '{result.stdout}' for ssh key: {secret_key}" + ) + return True + else: + raise ValueError(f"Invalid ssh key: {secret_key}") diff --git a/pkgs/clan-cli/tests/test_flake_with_core_and_pass/flake.nix b/pkgs/clan-cli/tests/test_flake_with_core_and_pass/flake.nix index 68e3f2a1..ed869336 100644 --- a/pkgs/clan-cli/tests/test_flake_with_core_and_pass/flake.nix +++ b/pkgs/clan-cli/tests/test_flake_with_core_and_pass/flake.nix @@ -15,6 +15,14 @@ vm1 = { lib, ... }: { + imports = [ + clan-core.clanModules.sshd + clan-core.clanModules.root-password + clan-core.clanModules.user-password + ]; + clan.user-password.user = "alice"; + clan.user-password.prompt = false; + clan.networking.targetHost = "__CLAN_TARGET_ADDRESS__"; system.stateVersion = lib.version; clan.core.secretStore = "password-store"; diff --git a/pkgs/clan-cli/tests/test_secrets_generate.py b/pkgs/clan-cli/tests/test_secrets_generate.py index 1306cc35..71e4f675 100644 --- a/pkgs/clan-cli/tests/test_secrets_generate.py +++ b/pkgs/clan-cli/tests/test_secrets_generate.py @@ -1,41 +1,20 @@ import ipaddress -import subprocess -import tempfile from typing import TYPE_CHECKING import pytest -from age_keys import is_valid_age_key from cli import Cli from fixtures_flakes import FlakeForTest +from validator import is_valid_age_key, is_valid_ssh_key +from clan_cli.facts.secret_modules.sops import SecretStore from clan_cli.machines.facts import machine_get_fact +from clan_cli.machines.machines import Machine from clan_cli.secrets.folders import sops_secrets_folder -from clan_cli.secrets.secrets import decrypt_secret, has_secret if TYPE_CHECKING: from age_keys import KeyPair -def is_valid_ssh_key(secret_key: str, ssh_pub: str) -> bool: - # create tempfile and write secret_key to it - with tempfile.NamedTemporaryFile() as temp: - temp.write(secret_key.encode("utf-8")) - temp.flush() - # Run the ssh-keygen command with the -y flag to check the key format - result = subprocess.run( - ["ssh-keygen", "-y", "-f", temp.name], capture_output=True, text=True - ) - - if result.returncode == 0: - if result.stdout != ssh_pub: - raise ValueError( - f"Expected '{ssh_pub}' got '{result.stdout}' for ssh key: {secret_key}" - ) - return True - else: - raise ValueError(f"Invalid ssh key: {secret_key}") - - @pytest.mark.impure def test_generate_secret( monkeypatch: pytest.MonkeyPatch, @@ -69,8 +48,10 @@ def test_generate_secret( ) cmd = ["facts", "generate", "--flake", str(test_flake_with_core.path), "vm1"] cli.run(cmd) - assert has_secret(test_flake_with_core.path, "vm1-age.key") - assert has_secret(test_flake_with_core.path, "vm1-zerotier-identity-secret") + store1 = SecretStore(Machine(name="vm1", flake=test_flake_with_core.path)) + + assert store1.exists("", "age.key") + assert store1.exists("", "zerotier-identity-secret") network_id = machine_get_fact( test_flake_with_core.path, "vm1", "zerotier-network-id" ) @@ -82,15 +63,10 @@ def test_generate_secret( secret1_mtime = identity_secret.lstat().st_mtime_ns # Assert that the age key is valid - age_secret = decrypt_secret(test_flake_with_core.path, "vm1-age.key") + age_secret = store1.get("", "age.key").decode() assert age_secret.isprintable() assert is_valid_age_key(age_secret) - # # Assert that the ssh key is valid - # ssh_secret = decrypt_secret(test_flake_with_core.path, "vm1-ssh.id_ed25519") - # ssh_pub = machine_get_fact(test_flake_with_core.path, "vm1", "ssh.id_ed25519.pub") - # assert is_valid_ssh_key(ssh_secret, ssh_pub) - # test idempotency for vm1 and also generate for vm2 cli.run(["facts", "generate", "--flake", str(test_flake_with_core.path)]) assert age_key.lstat().st_mtime_ns == age_key_mtime @@ -100,39 +76,41 @@ def test_generate_secret( secrets_folder / "vm1-zerotier-identity-secret" / "machines" / "vm1" ).exists() - assert has_secret(test_flake_with_core.path, "vm2-password") - assert has_secret(test_flake_with_core.path, "vm2-password-hash") - assert has_secret(test_flake_with_core.path, "vm2-user-password") - assert has_secret(test_flake_with_core.path, "vm2-user-password-hash") - assert has_secret(test_flake_with_core.path, "vm2-ssh.id_ed25519") - assert has_secret(test_flake_with_core.path, "vm2-age.key") - assert has_secret(test_flake_with_core.path, "vm2-zerotier-identity-secret") + store2 = SecretStore(Machine(name="vm2", flake=test_flake_with_core.path)) + + assert store2.exists("", "password") + assert store2.exists("", "password-hash") + assert store2.exists("", "user-password") + assert store2.exists("", "user-password-hash") + assert store2.exists("", "ssh.id_ed25519") + assert store2.exists("", "age.key") + assert store2.exists("", "zerotier-identity-secret") ip = machine_get_fact(test_flake_with_core.path, "vm1", "zerotier-ip") assert ipaddress.IPv6Address(ip).is_private # Assert that the age key is valid - age_secret = decrypt_secret(test_flake_with_core.path, "vm2-age.key") + age_secret = store2.get("", "age.key").decode() assert age_secret.isprintable() assert is_valid_age_key(age_secret) # Assert that the ssh key is valid - ssh_secret = decrypt_secret(test_flake_with_core.path, "vm2-ssh.id_ed25519") + ssh_secret = store2.get("", "ssh.id_ed25519").decode() ssh_pub = machine_get_fact(test_flake_with_core.path, "vm2", "ssh.id_ed25519.pub") assert is_valid_ssh_key(ssh_secret, ssh_pub) # Assert that root-password is valid - pwd_secret = decrypt_secret(test_flake_with_core.path, "vm2-password") + pwd_secret = store2.get("", "password").decode() assert pwd_secret.isprintable() assert pwd_secret.isascii() - pwd_hash = decrypt_secret(test_flake_with_core.path, "vm2-password-hash") + pwd_hash = store2.get("", "password-hash").decode() assert pwd_hash.isprintable() assert pwd_hash.isascii() # Assert that user-password is valid - pwd_secret = decrypt_secret(test_flake_with_core.path, "vm2-user-password") + pwd_secret = store2.get("", "user-password").decode() assert pwd_secret.isprintable() assert pwd_secret.isascii() - pwd_hash = decrypt_secret(test_flake_with_core.path, "vm2-user-password-hash") + pwd_hash = store2.get("", "user-password-hash").decode() assert pwd_hash.isprintable() assert pwd_hash.isascii() diff --git a/pkgs/clan-cli/tests/test_secrets_password_store.py b/pkgs/clan-cli/tests/test_secrets_password_store.py index b59c278d..5ce9ce67 100644 --- a/pkgs/clan-cli/tests/test_secrets_password_store.py +++ b/pkgs/clan-cli/tests/test_secrets_password_store.py @@ -4,8 +4,11 @@ from pathlib import Path import pytest from cli import Cli from fixtures_flakes import FlakeForTest +from validator import is_valid_ssh_key +from clan_cli.facts.secret_modules.password_store import SecretStore from clan_cli.machines.facts import machine_get_fact +from clan_cli.machines.machines import Machine from clan_cli.nix import nix_shell from clan_cli.ssh import HostGroup @@ -44,6 +47,9 @@ def test_upload_secret( nix_shell(["nixpkgs#pass"], ["pass", "init", "test@local"]), check=True ) cli.run(["facts", "generate", "vm1"]) + + store = SecretStore(Machine(name="vm1", flake=test_flake_with_core_and_pass.path)) + network_id = machine_get_fact( test_flake_with_core_and_pass.path, "vm1", "zerotier-network-id" ) @@ -66,3 +72,34 @@ def test_upload_secret( test_flake_with_core_and_pass.path / "secrets" / "zerotier-identity-secret" ) assert zerotier_identity_secret.exists() + assert store.exists("", "zerotier-identity-secret") + + assert store.exists("", "password") + assert store.exists("", "password-hash") + assert store.exists("", "user-password") + assert store.exists("", "user-password-hash") + assert store.exists("", "ssh.id_ed25519") + assert store.exists("", "zerotier-identity-secret") + + # Assert that the ssh key is valid + ssh_secret = store.get("", "ssh.id_ed25519").decode() + ssh_pub = machine_get_fact( + test_flake_with_core_and_pass.path, "vm1", "ssh.id_ed25519.pub" + ) + assert is_valid_ssh_key(ssh_secret, ssh_pub) + + # Assert that root-password is valid + pwd_secret = store.get("", "password").decode() + assert pwd_secret.isprintable() + assert pwd_secret.isascii() + pwd_hash = store.get("", "password-hash").decode() + assert pwd_hash.isprintable() + assert pwd_hash.isascii() + + # Assert that user-password is valid + pwd_secret = store.get("", "user-password").decode() + assert pwd_secret.isprintable() + assert pwd_secret.isascii() + pwd_hash = store.get("", "user-password-hash").decode() + assert pwd_hash.isprintable() + assert pwd_hash.isascii()