forked from clan/clan-core
tests: improve testing framework for impure vm tests
Also fix computet sops secret paths for vars
This commit is contained in:
parent
b93aa1896e
commit
b2646aa0fe
@ -10,9 +10,10 @@ let
|
||||
|
||||
inherit (import ./funcs.nix { inherit lib; }) listVars;
|
||||
|
||||
varsDir = config.clan.core.clanDir + "/sops/vars";
|
||||
varsDirMachines = config.clan.core.clanDir + "/sops/vars/per-machine";
|
||||
varsDirShared = config.clan.core.clanDir + "/sops/vars/shared";
|
||||
|
||||
vars = listVars varsDir;
|
||||
vars = (listVars varsDirMachines) ++ (listVars varsDirShared);
|
||||
|
||||
in
|
||||
{
|
||||
@ -33,7 +34,7 @@ in
|
||||
flip map vars (secret: {
|
||||
name = secret.id;
|
||||
value = {
|
||||
sopsFile = config.clan.core.clanDir + "/sops/vars/${secret.id}/secret";
|
||||
sopsFile = secret.sopsFile;
|
||||
format = "binary";
|
||||
};
|
||||
})
|
||||
|
@ -23,6 +23,7 @@ rec {
|
||||
generator = generator_name;
|
||||
name = secret_name;
|
||||
id = "${machine_name}/${generator_name}/${secret_name}";
|
||||
sopsFile = "${varsDir}/${machine_name}/${generator_name}/${secret_name}/secret";
|
||||
})
|
||||
)
|
||||
);
|
||||
|
@ -39,9 +39,6 @@ let
|
||||
|
||||
boot.initrd.systemd.enable = true;
|
||||
|
||||
# currently needed for system.etc.overlay.enable
|
||||
boot.kernelPackages = pkgs.linuxPackages_latest;
|
||||
|
||||
boot.initrd.systemd.storePaths = [
|
||||
pkgs.util-linux
|
||||
pkgs.e2fsprogs
|
||||
|
83
pkgs/clan-cli/tests/helpers/vms.py
Normal file
83
pkgs/clan-cli/tests/helpers/vms.py
Normal file
@ -0,0 +1,83 @@
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
|
||||
from clan_cli.dirs import vm_state_dir
|
||||
from clan_cli.qemu.qga import QgaSession
|
||||
from clan_cli.qemu.qmp import QEMUMonitorProtocol
|
||||
|
||||
from . import cli
|
||||
|
||||
|
||||
def run_vm_in_thread(machine_name: str) -> None:
|
||||
# runs machine and prints exceptions
|
||||
def run() -> None:
|
||||
try:
|
||||
cli.run(["vms", "run", machine_name])
|
||||
except Exception:
|
||||
# print exception details
|
||||
print(traceback.format_exc(), file=sys.stderr)
|
||||
print(sys.exc_info()[2], file=sys.stderr)
|
||||
|
||||
# run the machine in a separate thread
|
||||
t = threading.Thread(target=run, name="run")
|
||||
t.daemon = True
|
||||
t.start()
|
||||
return
|
||||
|
||||
|
||||
# wait for qmp socket to exist
|
||||
def wait_vm_up(machine_name: str, flake_url: str | None = None) -> None:
|
||||
if flake_url is None:
|
||||
flake_url = str(Path.cwd())
|
||||
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
|
||||
timeout: float = 100
|
||||
while True:
|
||||
if timeout <= 0:
|
||||
raise TimeoutError(
|
||||
f"qmp socket {socket_file} not found. Is the VM running?"
|
||||
)
|
||||
if socket_file.exists():
|
||||
break
|
||||
sleep(0.1)
|
||||
timeout -= 0.1
|
||||
|
||||
|
||||
# wait for vm to be down by checking if qmp socket is down
|
||||
def wait_vm_down(machine_name: str, flake_url: str | None = None) -> None:
|
||||
if flake_url is None:
|
||||
flake_url = str(Path.cwd())
|
||||
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
|
||||
timeout: float = 300
|
||||
while socket_file.exists():
|
||||
if timeout <= 0:
|
||||
raise TimeoutError(
|
||||
f"qmp socket {socket_file} still exists. Is the VM down?"
|
||||
)
|
||||
sleep(0.1)
|
||||
timeout -= 0.1
|
||||
|
||||
|
||||
# wait for vm to be up then connect and return qmp instance
|
||||
def qmp_connect(machine_name: str, flake_url: str | None = None) -> QEMUMonitorProtocol:
|
||||
if flake_url is None:
|
||||
flake_url = str(Path.cwd())
|
||||
state_dir = vm_state_dir(flake_url, machine_name)
|
||||
wait_vm_up(machine_name, flake_url)
|
||||
qmp = QEMUMonitorProtocol(
|
||||
address=str(os.path.realpath(state_dir / "qmp.sock")),
|
||||
)
|
||||
qmp.connect()
|
||||
return qmp
|
||||
|
||||
|
||||
# wait for vm to be up then connect and return qga instance
|
||||
def qga_connect(machine_name: str, flake_url: str | None = None) -> QgaSession:
|
||||
if flake_url is None:
|
||||
flake_url = str(Path.cwd())
|
||||
state_dir = vm_state_dir(flake_url, machine_name)
|
||||
wait_vm_up(machine_name, flake_url)
|
||||
return QgaSession(os.path.realpath(state_dir / "qga.sock"))
|
@ -4,17 +4,17 @@ from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
import pytest
|
||||
from age_keys import SopsSetup
|
||||
from fixtures_flakes import generate_flake
|
||||
from helpers import cli
|
||||
from helpers.nixos_config import nested_dict
|
||||
from root import CLAN_CORE
|
||||
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.nix import nix_shell
|
||||
from clan_cli.vars.public_modules import in_repo
|
||||
from clan_cli.vars.secret_modules import password_store, sops
|
||||
from tests.age_keys import SopsSetup
|
||||
from tests.fixtures_flakes import generate_flake
|
||||
from tests.helpers import cli
|
||||
from tests.helpers.nixos_config import nested_dict
|
||||
from tests.root import CLAN_CORE
|
||||
|
||||
|
||||
def test_get_subgraph() -> None:
|
||||
|
@ -1,98 +1,21 @@
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures_flakes import FlakeForTest, generate_flake
|
||||
from helpers import cli
|
||||
from helpers.nixos_config import nested_dict
|
||||
from root import CLAN_CORE
|
||||
|
||||
from clan_cli.dirs import vm_state_dir
|
||||
from clan_cli.qemu.qga import QgaSession
|
||||
from clan_cli.qemu.qmp import QEMUMonitorProtocol
|
||||
from tests.fixtures_flakes import FlakeForTest, generate_flake
|
||||
from tests.helpers import cli
|
||||
from tests.helpers.nixos_config import nested_dict
|
||||
from tests.helpers.vms import qga_connect, qmp_connect, run_vm_in_thread, wait_vm_down
|
||||
from tests.root import CLAN_CORE
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from age_keys import KeyPair
|
||||
from tests.age_keys import KeyPair
|
||||
|
||||
no_kvm = not os.path.exists("/dev/kvm")
|
||||
|
||||
|
||||
def run_vm_in_thread(machine_name: str) -> None:
|
||||
# runs machine and prints exceptions
|
||||
def run() -> None:
|
||||
try:
|
||||
cli.run(["vms", "run", machine_name])
|
||||
except Exception:
|
||||
# print exception details
|
||||
print(traceback.format_exc(), file=sys.stderr)
|
||||
print(sys.exc_info()[2], file=sys.stderr)
|
||||
|
||||
# run the machine in a separate thread
|
||||
t = threading.Thread(target=run, name="run")
|
||||
t.daemon = True
|
||||
t.start()
|
||||
return
|
||||
|
||||
|
||||
# wait for qmp socket to exist
|
||||
def wait_vm_up(machine_name: str, flake_url: str | None = None) -> None:
|
||||
if flake_url is None:
|
||||
flake_url = str(Path.cwd())
|
||||
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
|
||||
timeout: float = 100
|
||||
while True:
|
||||
if timeout <= 0:
|
||||
raise TimeoutError(
|
||||
f"qmp socket {socket_file} not found. Is the VM running?"
|
||||
)
|
||||
if socket_file.exists():
|
||||
break
|
||||
sleep(0.1)
|
||||
timeout -= 0.1
|
||||
|
||||
|
||||
# wait for vm to be down by checking if qmp socket is down
|
||||
def wait_vm_down(machine_name: str, flake_url: str | None = None) -> None:
|
||||
if flake_url is None:
|
||||
flake_url = str(Path.cwd())
|
||||
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
|
||||
timeout: float = 300
|
||||
while socket_file.exists():
|
||||
if timeout <= 0:
|
||||
raise TimeoutError(
|
||||
f"qmp socket {socket_file} still exists. Is the VM down?"
|
||||
)
|
||||
sleep(0.1)
|
||||
timeout -= 0.1
|
||||
|
||||
|
||||
# wait for vm to be up then connect and return qmp instance
|
||||
def qmp_connect(machine_name: str, flake_url: str | None = None) -> QEMUMonitorProtocol:
|
||||
if flake_url is None:
|
||||
flake_url = str(Path.cwd())
|
||||
state_dir = vm_state_dir(flake_url, machine_name)
|
||||
wait_vm_up(machine_name, flake_url)
|
||||
qmp = QEMUMonitorProtocol(
|
||||
address=str(os.path.realpath(state_dir / "qmp.sock")),
|
||||
)
|
||||
qmp.connect()
|
||||
return qmp
|
||||
|
||||
|
||||
# wait for vm to be up then connect and return qga instance
|
||||
def qga_connect(machine_name: str, flake_url: str | None = None) -> QgaSession:
|
||||
if flake_url is None:
|
||||
flake_url = str(Path.cwd())
|
||||
state_dir = vm_state_dir(flake_url, machine_name)
|
||||
wait_vm_up(machine_name, flake_url)
|
||||
return QgaSession(os.path.realpath(state_dir / "qga.sock"))
|
||||
|
||||
|
||||
@pytest.mark.impure
|
||||
def test_inspect(
|
||||
test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture
|
||||
@ -141,7 +64,7 @@ def test_vm_qmp(
|
||||
# set up a simple clan flake
|
||||
flake = generate_flake(
|
||||
temporary_home,
|
||||
flake_template=CLAN_CORE / "templates" / "new-clan",
|
||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||
machine_configs=dict(
|
||||
my_machine=dict(
|
||||
clan=dict(
|
||||
@ -197,7 +120,7 @@ def test_vm_persistence(
|
||||
|
||||
flake = generate_flake(
|
||||
temporary_home,
|
||||
flake_template=CLAN_CORE / "templates" / "new-clan",
|
||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||
machine_configs=config,
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user