Fixing a multitude of tests
This commit is contained in:
parent
d1c35301e3
commit
3581e0c9a8
|
@ -39,6 +39,7 @@ in
|
|||
uploadSecrets = pkgs.writeScript "upload-secrets" ''
|
||||
#!${pkgs.python3}/bin/python
|
||||
import json
|
||||
import sys
|
||||
from clan_cli.secrets.sops_generate import upload_age_key_from_nix
|
||||
# the second toJSON is needed to escape the string for the python
|
||||
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; })})
|
||||
|
|
|
@ -60,11 +60,17 @@ By default tests run in parallel using pytest-parallel.
|
|||
pytest-parallel however breaks `breakpoint()`. To disable it, use this:
|
||||
|
||||
```console
|
||||
pytest --workers "" -s
|
||||
pytest -n0 -s
|
||||
```
|
||||
|
||||
You can also run a single test like this:
|
||||
|
||||
```console
|
||||
pytest --workers "" -s tests/test_secrets_cli.py::test_users
|
||||
pytest -n0 -s tests/test_secrets_cli.py::test_users
|
||||
```
|
||||
|
||||
## Debugging functions
|
||||
Debugging functions can be found under `src/debug.py`
|
||||
quite interesting is the function repro_env_break() which drops you into a shell
|
||||
with the test environment loaded.
|
||||
|
||||
|
|
|
@ -23,10 +23,6 @@ def repro_env_break(work_dir: Path, env: Optional[Dict[str, str]] = None, cmd: O
|
|||
else:
|
||||
env = env.copy()
|
||||
|
||||
# Error checking
|
||||
if "bash" in env["SHELL"]:
|
||||
raise Exception("I assumed you use zsh, not bash")
|
||||
|
||||
# Cmd appending
|
||||
args = ["xterm", "-e", "zsh", "-df"]
|
||||
if cmd is not None:
|
||||
|
@ -48,7 +44,9 @@ def write_command(command: str, loc:Path) -> None:
|
|||
os.chmod(loc, st.st_mode | stat.S_IEXEC)
|
||||
|
||||
def spawn_process(func: Callable, **kwargs:Any) -> mp.Process:
|
||||
mp.set_start_method(method="spawn")
|
||||
if mp.get_start_method(allow_none=True) is None:
|
||||
mp.set_start_method(method="spawn")
|
||||
|
||||
proc = mp.Process(target=func, kwargs=kwargs)
|
||||
proc.start()
|
||||
return proc
|
||||
|
|
|
@ -4,7 +4,6 @@ import argparse
|
|||
from .create import register_create_parser
|
||||
from .list import register_list_parser
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
subparser = parser.add_subparsers(
|
||||
|
|
|
@ -71,7 +71,7 @@ class Machine:
|
|||
env["SECRETS_DIR"] = str(secrets_dir)
|
||||
print(f"uploading secrets... {self.upload_secrets}")
|
||||
proc = subprocess.run(
|
||||
[self.upload_secrets],
|
||||
[self.upload_secrets, self.flake_dir.name],
|
||||
env=env,
|
||||
stdout=subprocess.PIPE,
|
||||
text=True,
|
||||
|
|
|
@ -8,18 +8,19 @@ from clan_cli.errors import ClanError
|
|||
|
||||
from ..dirs import specific_flake_dir
|
||||
from ..machines.machines import Machine
|
||||
from ..types import FlakeName
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def generate_secrets(machine: Machine) -> None:
|
||||
def generate_secrets(machine: Machine, flake_name: FlakeName) -> None:
|
||||
env = os.environ.copy()
|
||||
env["CLAN_DIR"] = str(machine.flake_dir)
|
||||
env["PYTHONPATH"] = ":".join(sys.path) # TODO do this in the clanCore module
|
||||
|
||||
print(f"generating secrets... {machine.generate_secrets}")
|
||||
proc = subprocess.run(
|
||||
[machine.generate_secrets],
|
||||
[machine.generate_secrets, flake_name],
|
||||
env=env,
|
||||
)
|
||||
|
||||
|
@ -31,7 +32,7 @@ def generate_secrets(machine: Machine) -> None:
|
|||
|
||||
def generate_command(args: argparse.Namespace) -> None:
|
||||
machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake))
|
||||
generate_secrets(machine)
|
||||
generate_secrets(machine, args.flake)
|
||||
|
||||
|
||||
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
|
||||
|
|
|
@ -91,4 +91,10 @@ def register_import_sops_parser(parser: argparse.ArgumentParser) -> None:
|
|||
type=str,
|
||||
help="the sops file to import (- for stdin)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake",
|
||||
)
|
||||
|
||||
parser.set_defaults(func=import_sops)
|
||||
|
|
|
@ -4,6 +4,7 @@ import json
|
|||
import os
|
||||
import shlex
|
||||
import sys
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Dict
|
||||
from uuid import UUID
|
||||
|
@ -17,6 +18,17 @@ from ..errors import ClanError
|
|||
from ..debug import repro_env_break
|
||||
|
||||
|
||||
def is_path_or_url(s: str) -> str | None:
|
||||
# check if s is a valid path
|
||||
if os.path.exists(s):
|
||||
return "path"
|
||||
# check if s is a valid URL
|
||||
elif re.match(r"^https?://[a-zA-Z0-9.-]+/[a-zA-Z0-9.-]+", s):
|
||||
return "URL"
|
||||
# otherwise, return None
|
||||
else:
|
||||
return None
|
||||
|
||||
class BuildVmTask(BaseTask):
|
||||
def __init__(self, uuid: UUID, vm: VmConfig) -> None:
|
||||
super().__init__(uuid, num_cmds=7)
|
||||
|
@ -78,19 +90,25 @@ class BuildVmTask(BaseTask):
|
|||
) # TODO do this in the clanCore module
|
||||
env["SECRETS_DIR"] = str(secrets_dir)
|
||||
|
||||
cmd = next(cmds)
|
||||
repro_env_break(work_dir=flake_dir, env=env, cmd=[vm_config["generateSecrets"], clan_name])
|
||||
if Path(self.vm.flake_url).is_dir():
|
||||
cmd.run(
|
||||
[vm_config["generateSecrets"], clan_name],
|
||||
env=env,
|
||||
res = is_path_or_url(str(self.vm.flake_url))
|
||||
if res is None:
|
||||
raise ClanError(
|
||||
f"flake_url must be a valid path or URL, got {self.vm.flake_url}"
|
||||
)
|
||||
else:
|
||||
self.log.warning("won't generate secrets for non local clan")
|
||||
elif res == "path": # Only generate secrets for local clans
|
||||
cmd = next(cmds)
|
||||
if Path(self.vm.flake_url).is_dir():
|
||||
cmd.run(
|
||||
[vm_config["generateSecrets"], clan_name],
|
||||
env=env,
|
||||
)
|
||||
else:
|
||||
self.log.warning("won't generate secrets for non local clan")
|
||||
|
||||
|
||||
cmd = next(cmds)
|
||||
cmd.run(
|
||||
[vm_config["uploadSecrets"]],
|
||||
[vm_config["uploadSecrets"], clan_name],
|
||||
env=env,
|
||||
)
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ import logging
|
|||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, Body
|
||||
|
||||
from clan_cli.debug import repro_env_break
|
||||
from ...config.machine import (
|
||||
config_for_machine,
|
||||
schema_for_machine,
|
||||
|
@ -33,6 +33,7 @@ async def list_machines(flake_name: FlakeName) -> MachinesResponse:
|
|||
machines = []
|
||||
for m in _list_machines(flake_name):
|
||||
machines.append(Machine(name=m, status=Status.UNKNOWN))
|
||||
|
||||
return MachinesResponse(machines=machines)
|
||||
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ from typing import Iterator
|
|||
# XXX: can we dynamically load this using nix develop?
|
||||
import uvicorn
|
||||
from pydantic import AnyUrl, IPvAnyAddress
|
||||
|
||||
from pydantic.tools import parse_obj_as
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -25,7 +25,8 @@ def open_browser(base_url: AnyUrl, sub_url: str) -> None:
|
|||
break
|
||||
except OSError:
|
||||
time.sleep(i)
|
||||
url = AnyUrl(f"{base_url}/{sub_url.removeprefix('/')}")
|
||||
url = parse_obj_as(
|
||||
AnyUrl,f"{base_url}/{sub_url.removeprefix('/')}")
|
||||
_open_browser(url)
|
||||
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ class FlakeForTest(NamedTuple):
|
|||
|
||||
def create_flake(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
temporary_dir: Path,
|
||||
temporary_home: Path,
|
||||
flake_name: FlakeName,
|
||||
clan_core_flake: Path | None = None,
|
||||
machines: list[str] = [],
|
||||
|
@ -51,7 +51,7 @@ def create_flake(
|
|||
template = Path(__file__).parent / flake_name
|
||||
|
||||
# copy the template to a new temporary location
|
||||
home = Path(temporary_dir)
|
||||
home = Path(temporary_home)
|
||||
flake = home / ".local/state/clan/flake" / flake_name
|
||||
shutil.copytree(template, flake)
|
||||
|
||||
|
@ -87,21 +87,21 @@ def test_flake(
|
|||
|
||||
@pytest.fixture
|
||||
def test_flake_with_core(
|
||||
monkeypatch: pytest.MonkeyPatch, temporary_dir: Path
|
||||
monkeypatch: pytest.MonkeyPatch, temporary_home: Path
|
||||
) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
raise Exception(
|
||||
"clan-core flake not found. This test requires the clan-core flake to be present"
|
||||
)
|
||||
yield from create_flake(
|
||||
monkeypatch, temporary_dir, FlakeName("test_flake_with_core"), CLAN_CORE
|
||||
monkeypatch, temporary_home, FlakeName("test_flake_with_core"), CLAN_CORE
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_flake_with_core_and_pass(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
temporary_dir: Path,
|
||||
temporary_home: Path,
|
||||
) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
raise Exception(
|
||||
|
@ -109,7 +109,8 @@ def test_flake_with_core_and_pass(
|
|||
)
|
||||
yield from create_flake(
|
||||
monkeypatch,
|
||||
temporary_dir,
|
||||
temporary_home,
|
||||
FlakeName("test_flake_with_core_and_pass"),
|
||||
CLAN_CORE,
|
||||
|
||||
)
|
||||
|
|
|
@ -7,15 +7,15 @@ from clan_cli.errors import ClanError
|
|||
|
||||
|
||||
def test_get_clan_flake_toplevel(
|
||||
monkeypatch: pytest.MonkeyPatch, temporary_dir: Path
|
||||
monkeypatch: pytest.MonkeyPatch, temporary_home: Path
|
||||
) -> None:
|
||||
monkeypatch.chdir(temporary_dir)
|
||||
monkeypatch.chdir(temporary_home)
|
||||
with pytest.raises(ClanError):
|
||||
print(_get_clan_flake_toplevel())
|
||||
(temporary_dir / ".git").touch()
|
||||
assert _get_clan_flake_toplevel() == temporary_dir
|
||||
(temporary_home / ".git").touch()
|
||||
assert _get_clan_flake_toplevel() == temporary_home
|
||||
|
||||
subdir = temporary_dir / "subdir"
|
||||
subdir = temporary_home / "subdir"
|
||||
subdir.mkdir()
|
||||
monkeypatch.chdir(subdir)
|
||||
(subdir / ".clan-flake").touch()
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures_flakes import FlakeForTest
|
||||
import pytest
|
||||
from api import TestClient
|
||||
|
||||
|
||||
@pytest.mark.impure
|
||||
def test_inspect_ok(api: TestClient, test_flake_with_core: Path) -> None:
|
||||
params = {"url": str(test_flake_with_core)}
|
||||
def test_inspect_ok(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
|
||||
params = {"url": str(test_flake_with_core.path)}
|
||||
response = api.get(
|
||||
"/api/flake/attrs",
|
||||
params=params,
|
||||
|
@ -32,8 +32,8 @@ def test_inspect_err(api: TestClient) -> None:
|
|||
|
||||
|
||||
@pytest.mark.impure
|
||||
def test_inspect_flake(api: TestClient, test_flake_with_core: Path) -> None:
|
||||
params = {"url": str(test_flake_with_core)}
|
||||
def test_inspect_flake(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
|
||||
params = {"url": str(test_flake_with_core.path)}
|
||||
response = api.get(
|
||||
"/api/flake",
|
||||
params=params,
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
let
|
||||
clan = clan-core.lib.buildClan {
|
||||
directory = self;
|
||||
clanName = "test_with_core_clan";
|
||||
clanName = "test_flake_with_core";
|
||||
machines = {
|
||||
vm1 = { lib, ... }: {
|
||||
clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__";
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
let
|
||||
clan = clan-core.lib.buildClan {
|
||||
directory = self;
|
||||
clanName = "test_with_core_and_pass_clan";
|
||||
clanName = "test_flake_with_core_and_pass";
|
||||
machines = {
|
||||
vm1 = { lib, ... }: {
|
||||
clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__";
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
let
|
||||
clan = clan-core.lib.buildClan {
|
||||
directory = self;
|
||||
clanName = "core_dynamic_machine_clan";
|
||||
clanName = "test_flake_with_core_dynamic_machines";
|
||||
machines =
|
||||
let
|
||||
machineModules = builtins.readDir (self + "/machines");
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from fixtures_flakes import FlakeForTest
|
||||
from clan_cli.debug import repro_env_break
|
||||
|
||||
import pytest
|
||||
from cli import Cli
|
||||
|
@ -10,7 +12,7 @@ if TYPE_CHECKING:
|
|||
|
||||
def test_import_sops(
|
||||
test_root: Path,
|
||||
test_flake: Path,
|
||||
test_flake: FlakeForTest,
|
||||
capsys: pytest.CaptureFixture,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
age_keys: list["KeyPair"],
|
||||
|
@ -18,16 +20,15 @@ def test_import_sops(
|
|||
cli = Cli()
|
||||
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[1].privkey)
|
||||
cli.run(["secrets", "machines", "add", "machine1", age_keys[0].pubkey])
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey])
|
||||
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey])
|
||||
cli.run(["secrets", "groups", "add-user", "group1", "user1"])
|
||||
cli.run(["secrets", "groups", "add-user", "group1", "user2"])
|
||||
cli.run(["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name])
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey, test_flake.name])
|
||||
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey, test_flake.name])
|
||||
cli.run(["secrets", "groups", "add-user", "group1", "user1", test_flake.name])
|
||||
cli.run(["secrets", "groups", "add-user", "group1", "user2", test_flake.name])
|
||||
|
||||
# To edit:
|
||||
# SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml
|
||||
cli.run(
|
||||
[
|
||||
cmd = [
|
||||
"secrets",
|
||||
"import-sops",
|
||||
"--group",
|
||||
|
@ -35,13 +36,17 @@ def test_import_sops(
|
|||
"--machine",
|
||||
"machine1",
|
||||
str(test_root.joinpath("data", "secrets.yaml")),
|
||||
test_flake.name
|
||||
]
|
||||
repro_env_break(work_dir=test_flake.path, cmd=cmd)
|
||||
cli.run(
|
||||
cmd
|
||||
)
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "users", "list"])
|
||||
cli.run(["secrets", "users", "list", test_flake.name])
|
||||
users = sorted(capsys.readouterr().out.rstrip().split())
|
||||
assert users == ["user1", "user2"]
|
||||
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "get", "secret-key"])
|
||||
cli.run(["secrets", "get", "secret-key", test_flake.name])
|
||||
assert capsys.readouterr().out == "secret-value"
|
||||
|
|
|
@ -1,46 +1,47 @@
|
|||
from pathlib import Path
|
||||
|
||||
from api import TestClient
|
||||
from fixtures_flakes import FlakeForTest
|
||||
from clan_cli.debug import repro_env_break
|
||||
|
||||
|
||||
def test_machines(api: TestClient, test_flake: Path) -> None:
|
||||
response = api.get("/api/machines")
|
||||
def test_machines(api: TestClient, test_flake: FlakeForTest) -> None:
|
||||
response = api.get(f"/api/{test_flake.name}/machines")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"machines": []}
|
||||
|
||||
response = api.post("/api/machines", json={"name": "test"})
|
||||
response = api.post(f"/api/{test_flake.name}/machines", json={"name": "test"})
|
||||
assert response.status_code == 201
|
||||
assert response.json() == {"machine": {"name": "test", "status": "unknown"}}
|
||||
|
||||
response = api.get("/api/machines/test")
|
||||
response = api.get(f"/api/{test_flake.name}/machines/test")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"machine": {"name": "test", "status": "unknown"}}
|
||||
|
||||
response = api.get("/api/machines")
|
||||
response = api.get(f"/api/{test_flake.name}/machines")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"machines": [{"name": "test", "status": "unknown"}]}
|
||||
|
||||
|
||||
def test_configure_machine(api: TestClient, test_flake: Path) -> None:
|
||||
def test_configure_machine(api: TestClient, test_flake: FlakeForTest) -> None:
|
||||
# ensure error 404 if machine does not exist when accessing the config
|
||||
response = api.get("/api/machines/machine1/config")
|
||||
response = api.get(f"/api/{test_flake.name}/machines/machine1/config")
|
||||
assert response.status_code == 404
|
||||
|
||||
# ensure error 404 if machine does not exist when writing to the config
|
||||
response = api.put("/api/machines/machine1/config", json={})
|
||||
response = api.put(f"/api/{test_flake.name}/machines/machine1/config", json={})
|
||||
assert response.status_code == 404
|
||||
|
||||
# create the machine
|
||||
response = api.post("/api/machines", json={"name": "machine1"})
|
||||
response = api.post(f"/api/{test_flake.name}/machines", json={"name": "machine1"})
|
||||
assert response.status_code == 201
|
||||
|
||||
# ensure an empty config is returned by default for a new machine
|
||||
response = api.get("/api/machines/machine1/config")
|
||||
response = api.get(f"/api/{test_flake.name}/machines/machine1/config")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"config": {}}
|
||||
|
||||
# get jsonschema for machine
|
||||
response = api.get("/api/machines/machine1/schema")
|
||||
response = api.get(f"/api/{test_flake.name}/machines/machine1/schema")
|
||||
assert response.status_code == 200
|
||||
json_response = response.json()
|
||||
assert "schema" in json_response and "properties" in json_response["schema"]
|
||||
|
@ -91,6 +92,11 @@ def test_configure_machine(api: TestClient, test_flake: Path) -> None:
|
|||
devices=["/dev/fake_disk"],
|
||||
),
|
||||
),
|
||||
f"/api/{test_flake.name}machines/machine1/config",
|
||||
json=dict(
|
||||
clan=dict(
|
||||
jitsi=True,
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
|
@ -110,8 +116,8 @@ def test_configure_machine(api: TestClient, test_flake: Path) -> None:
|
|||
assert response.status_code == 200
|
||||
assert response.json() == {"config": config2}
|
||||
|
||||
# ensure that the config has actually been updated
|
||||
response = api.get("/api/machines/machine1/config")
|
||||
# get the config again
|
||||
response = api.get(f"/api/{test_flake.name}/machines/machine1/config")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"config": config2}
|
||||
|
||||
|
|
|
@ -21,8 +21,8 @@ def test_generate_secret(
|
|||
monkeypatch.chdir(test_flake_with_core.path)
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
|
||||
cli = Cli()
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey])
|
||||
cli.run(["secrets", "generate", "vm1"])
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake_with_core.name])
|
||||
cli.run(["secrets", "generate", "vm1", test_flake_with_core.name])
|
||||
has_secret(test_flake_with_core.name, "vm1-age.key")
|
||||
has_secret(test_flake_with_core.name, "vm1-zerotier-identity-secret")
|
||||
network_id = machine_get_fact(
|
||||
|
@ -43,7 +43,7 @@ def test_generate_secret(
|
|||
secret1_mtime = identity_secret.lstat().st_mtime_ns
|
||||
|
||||
# test idempotency
|
||||
cli.run(["secrets", "generate", "vm1"])
|
||||
cli.run(["secrets", "generate", "vm1", test_flake_with_core.name])
|
||||
assert age_key.lstat().st_mtime_ns == age_key_mtime
|
||||
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
|
||||
|
||||
|
|
|
@ -14,15 +14,15 @@ from clan_cli.ssh import HostGroup
|
|||
def test_upload_secret(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
test_flake_with_core_and_pass: FlakeForTest,
|
||||
temporary_dir: Path,
|
||||
temporary_home: Path,
|
||||
host_group: HostGroup,
|
||||
) -> None:
|
||||
monkeypatch.chdir(test_flake_with_core_and_pass.path)
|
||||
gnupghome = temporary_dir / "gpg"
|
||||
gnupghome = temporary_home / "gpg"
|
||||
gnupghome.mkdir(mode=0o700)
|
||||
monkeypatch.setenv("GNUPGHOME", str(gnupghome))
|
||||
monkeypatch.setenv("PASSWORD_STORE_DIR", str(temporary_dir / "pass"))
|
||||
gpg_key_spec = temporary_dir / "gpg_key_spec"
|
||||
monkeypatch.setenv("PASSWORD_STORE_DIR", str(temporary_home / "pass"))
|
||||
gpg_key_spec = temporary_home / "gpg_key_spec"
|
||||
gpg_key_spec.write_text(
|
||||
"""
|
||||
Key-Type: 1
|
||||
|
@ -39,18 +39,18 @@ def test_upload_secret(
|
|||
check=True,
|
||||
)
|
||||
subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True)
|
||||
cli.run(["secrets", "generate", "vm1"])
|
||||
cli.run(["secrets", "generate", "vm1", test_flake_with_core_and_pass.name])
|
||||
network_id = machine_get_fact(
|
||||
test_flake_with_core_and_pass.name, "vm1", "zerotier-network-id"
|
||||
)
|
||||
assert len(network_id) == 16
|
||||
identity_secret = (
|
||||
temporary_dir / "pass" / "machines" / "vm1" / "zerotier-identity-secret.gpg"
|
||||
temporary_home / "pass" / "machines" / "vm1" / "zerotier-identity-secret.gpg"
|
||||
)
|
||||
secret1_mtime = identity_secret.lstat().st_mtime_ns
|
||||
|
||||
# test idempotency
|
||||
cli.run(["secrets", "generate", "vm1"])
|
||||
cli.run(["secrets", "generate", "vm1", test_flake_with_core_and_pass.name])
|
||||
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
|
||||
|
||||
flake = test_flake_with_core_and_pass.path.joinpath("flake.nix")
|
||||
|
@ -58,7 +58,7 @@ def test_upload_secret(
|
|||
addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}"
|
||||
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
|
||||
flake.write_text(new_text)
|
||||
cli.run(["secrets", "upload", "vm1"])
|
||||
cli.run(["secrets", "upload", "vm1", test_flake_with_core_and_pass.name])
|
||||
zerotier_identity_secret = (
|
||||
test_flake_with_core_and_pass.path / "secrets" / "zerotier-identity-secret"
|
||||
)
|
||||
|
|
|
@ -3,7 +3,7 @@ from typing import TYPE_CHECKING
|
|||
|
||||
import pytest
|
||||
from cli import Cli
|
||||
|
||||
from fixtures_flakes import FlakeForTest
|
||||
from clan_cli.ssh import HostGroup
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -13,29 +13,29 @@ if TYPE_CHECKING:
|
|||
@pytest.mark.impure
|
||||
def test_secrets_upload(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
test_flake_with_core: Path,
|
||||
test_flake_with_core: FlakeForTest,
|
||||
host_group: HostGroup,
|
||||
age_keys: list["KeyPair"],
|
||||
) -> None:
|
||||
monkeypatch.chdir(test_flake_with_core)
|
||||
monkeypatch.chdir(test_flake_with_core.path)
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
|
||||
|
||||
cli = Cli()
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey])
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake_with_core.name])
|
||||
|
||||
cli.run(["secrets", "machines", "add", "vm1", age_keys[1].pubkey])
|
||||
cli.run(["secrets", "machines", "add", "vm1", age_keys[1].pubkey, test_flake_with_core.name])
|
||||
monkeypatch.setenv("SOPS_NIX_SECRET", age_keys[0].privkey)
|
||||
cli.run(["secrets", "set", "vm1-age.key"])
|
||||
cli.run(["secrets", "set", "vm1-age.key", test_flake_with_core.name])
|
||||
|
||||
flake = test_flake_with_core.joinpath("flake.nix")
|
||||
flake = test_flake_with_core.path.joinpath("flake.nix")
|
||||
host = host_group.hosts[0]
|
||||
addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}"
|
||||
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
|
||||
|
||||
flake.write_text(new_text)
|
||||
cli.run(["secrets", "upload", "vm1"])
|
||||
cli.run(["secrets", "upload", "vm1", test_flake_with_core.name])
|
||||
|
||||
# the flake defines this path as the location where the sops key should be installed
|
||||
sops_key = test_flake_with_core.joinpath("key.txt")
|
||||
sops_key = test_flake_with_core.path.joinpath("key.txt")
|
||||
assert sops_key.exists()
|
||||
assert sops_key.read_text() == age_keys[0].privkey
|
||||
|
|
|
@ -2,13 +2,13 @@ from pathlib import Path
|
|||
|
||||
import pytest
|
||||
from api import TestClient
|
||||
|
||||
from fixtures_flakes import FlakeForTest
|
||||
|
||||
@pytest.mark.impure
|
||||
def test_inspect(api: TestClient, test_flake_with_core: Path) -> None:
|
||||
def test_inspect(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
|
||||
response = api.post(
|
||||
"/api/vms/inspect",
|
||||
json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"),
|
||||
json=dict(flake_url=str(test_flake_with_core.path), flake_attr="vm1"),
|
||||
)
|
||||
|
||||
assert response.status_code == 200, f"Failed to inspect vm: {response.text}"
|
||||
|
|
|
@ -9,6 +9,7 @@ from fixtures_flakes import FlakeForTest, create_flake
|
|||
from httpx import SyncByteStream
|
||||
from root import CLAN_CORE
|
||||
|
||||
from clan_cli.debug import repro_env_break
|
||||
from clan_cli.types import FlakeName
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -17,7 +18,8 @@ if TYPE_CHECKING:
|
|||
|
||||
@pytest.fixture
|
||||
def flake_with_vm_with_secrets(
|
||||
monkeypatch: pytest.MonkeyPatch, temporary_home: Path
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
temporary_home: Path
|
||||
) -> Iterator[FlakeForTest]:
|
||||
yield from create_flake(
|
||||
monkeypatch,
|
||||
|
@ -42,15 +44,6 @@ def remote_flake_with_vm_without_secrets(
|
|||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def create_user_with_age_key(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
test_flake: FlakeForTest,
|
||||
age_keys: list["KeyPair"],
|
||||
) -> None:
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
|
||||
cli = Cli()
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake.name])
|
||||
|
||||
|
||||
def generic_create_vm_test(api: TestClient, flake: Path, vm: str) -> None:
|
||||
|
@ -97,8 +90,13 @@ def test_create_local(
|
|||
api: TestClient,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
flake_with_vm_with_secrets: FlakeForTest,
|
||||
create_user_with_age_key: None,
|
||||
age_keys: list["KeyPair"],
|
||||
) -> None:
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
|
||||
cli = Cli()
|
||||
cmd = ["secrets", "users", "add", "user1", age_keys[0].pubkey, flake_with_vm_with_secrets.name]
|
||||
cli.run(cmd)
|
||||
|
||||
generic_create_vm_test(api, flake_with_vm_with_secrets.path, "vm_with_secrets")
|
||||
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures_flakes import FlakeForTest
|
||||
import pytest
|
||||
from cli import Cli
|
||||
|
||||
|
@ -12,9 +12,9 @@ no_kvm = not os.path.exists("/dev/kvm")
|
|||
|
||||
|
||||
@pytest.mark.impure
|
||||
def test_inspect(test_flake_with_core: Path, capsys: pytest.CaptureFixture) -> None:
|
||||
def test_inspect(test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture) -> None:
|
||||
cli = Cli()
|
||||
cli.run(["vms", "inspect", "vm1"])
|
||||
cli.run(["vms", "inspect", "vm1", test_flake_with_core.name])
|
||||
out = capsys.readouterr() # empty the buffer
|
||||
assert "Cores" in out.out
|
||||
|
||||
|
@ -23,11 +23,11 @@ def test_inspect(test_flake_with_core: Path, capsys: pytest.CaptureFixture) -> N
|
|||
@pytest.mark.impure
|
||||
def test_create(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
test_flake_with_core: Path,
|
||||
test_flake_with_core: FlakeForTest,
|
||||
age_keys: list["KeyPair"],
|
||||
) -> None:
|
||||
monkeypatch.chdir(test_flake_with_core)
|
||||
monkeypatch.chdir(test_flake_with_core.path)
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
|
||||
cli = Cli()
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey])
|
||||
cli.run(["vms", "create", "vm1"])
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake_with_core.name])
|
||||
cli.run(["vms", "create", "vm1", test_flake_with_core.name])
|
||||
|
|
|
@ -10,12 +10,12 @@ from ports import PortFunction
|
|||
|
||||
|
||||
@pytest.mark.timeout(10)
|
||||
def test_start_server(unused_tcp_port: PortFunction, temporary_dir: Path) -> None:
|
||||
def test_start_server(unused_tcp_port: PortFunction, temporary_home: Path) -> None:
|
||||
port = unused_tcp_port()
|
||||
|
||||
fifo = temporary_dir / "fifo"
|
||||
fifo = temporary_home / "fifo"
|
||||
os.mkfifo(fifo)
|
||||
notify_script = temporary_dir / "firefox"
|
||||
notify_script = temporary_home / "firefox"
|
||||
bash = shutil.which("bash")
|
||||
assert bash is not None
|
||||
notify_script.write_text(
|
||||
|
@ -27,8 +27,8 @@ echo "1" > {fifo}
|
|||
notify_script.chmod(0o700)
|
||||
|
||||
env = os.environ.copy()
|
||||
print(str(temporary_dir.absolute()))
|
||||
env["PATH"] = ":".join([str(temporary_dir.absolute())] + env["PATH"].split(":"))
|
||||
print(str(temporary_home.absolute()))
|
||||
env["PATH"] = ":".join([str(temporary_home.absolute())] + env["PATH"].split(":"))
|
||||
with subprocess.Popen(
|
||||
[sys.executable, "-m", "clan_cli.webui", "--port", str(port)], env=env
|
||||
) as p:
|
||||
|
|
Loading…
Reference in New Issue
Block a user