clan-core/pkgs/clan-cli/tests/test_vms_cli.py

288 lines
10 KiB
Python
Raw Normal View History

import base64
import json
2023-10-03 15:18:36 +00:00
import os
import socket
import threading
from pathlib import Path
from time import sleep
2023-10-05 13:56:15 +00:00
from typing import TYPE_CHECKING
2023-10-23 20:34:43 +00:00
2023-10-03 15:18:36 +00:00
import pytest
from cli import Cli
from fixtures_flakes import FlakeForTest, generate_flake
from root import CLAN_CORE
2023-10-03 15:18:36 +00:00
from clan_cli.dirs import vm_state_dir
2023-10-05 13:56:15 +00:00
if TYPE_CHECKING:
from age_keys import KeyPair
2023-10-03 15:18:36 +00:00
no_kvm = not os.path.exists("/dev/kvm")
# qga is almost like qmp, but not quite, because:
# - server doesn't send initial message
# - no need to initialize by asking for capabilities
# - results need to be base64 decoded
# TODO: move this to an extra file and make it available to other parts like GUI
class QgaSession:
def __init__(self, socket_file: Path | str) -> None:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# try to reconnect a couple of times if connetion refused
for _ in range(100):
try:
self.sock.connect(str(socket_file))
return
except ConnectionRefusedError:
sleep(0.1)
self.sock.connect(str(socket_file))
def get_response(self) -> dict:
result = self.sock.recv(9999999)
return json.loads(result)
# only execute, don't wait for response
def exec_cmd(self, cmd: str) -> None:
self.sock.send(
json.dumps(
{
"execute": "guest-exec",
"arguments": {
"path": "/bin/sh",
"arg": ["-l", "-c", cmd],
"capture-output": True,
},
}
).encode("utf-8")
)
# run, wait for result, return exitcode and output
def run(self, cmd: str) -> tuple[int, str]:
self.exec_cmd(cmd)
result_pid = self.get_response()
pid = result_pid["return"]["pid"]
# loop until exited=true
status_payload = json.dumps(
{
"execute": "guest-exec-status",
"arguments": {
"pid": pid,
},
}
).encode("utf-8")
while True:
self.sock.send(status_payload)
result = self.get_response()
if "error" in result and result["error"]["desc"].startswith("PID"):
raise Exception("PID could not be found")
if result["return"]["exited"]:
break
sleep(0.1)
exitcode = result["return"]["exitcode"]
if exitcode == 0:
out = (
""
if "out-data" not in result["return"]
else base64.b64decode(result["return"]["out-data"]).decode("utf-8")
)
else:
out = (
""
if "err-data" not in result["return"]
else base64.b64decode(result["return"]["err-data"]).decode("utf-8")
)
return exitcode, out
2023-10-03 15:18:36 +00:00
@pytest.mark.impure
2023-10-23 20:34:43 +00:00
def test_inspect(
test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture
) -> None:
2023-10-03 15:18:36 +00:00
cli = Cli()
2023-11-15 13:28:40 +00:00
cli.run(["--flake", str(test_flake_with_core.path), "vms", "inspect", "vm1"])
2023-10-03 15:18:36 +00:00
out = capsys.readouterr() # empty the buffer
assert "Cores" in out.out
@pytest.mark.skipif(no_kvm, reason="Requires KVM")
@pytest.mark.impure
2023-11-24 13:52:38 +00:00
def test_run(
2023-10-05 13:56:15 +00:00
monkeypatch: pytest.MonkeyPatch,
2023-10-23 20:31:12 +00:00
test_flake_with_core: FlakeForTest,
2023-10-05 13:56:15 +00:00
age_keys: list["KeyPair"],
) -> None:
2023-10-23 20:31:12 +00:00
monkeypatch.chdir(test_flake_with_core.path)
2023-10-05 13:56:15 +00:00
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
2023-10-03 15:18:36 +00:00
cli = Cli()
2023-10-23 20:34:43 +00:00
cli.run(
[
"secrets",
"users",
"add",
"user1",
age_keys[0].pubkey,
]
)
2023-11-24 13:52:38 +00:00
cli.run(["vms", "run", "vm1"])
@pytest.mark.skipif(no_kvm, reason="Requires KVM")
@pytest.mark.impure
def test_vm_persistence(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
2024-01-19 13:24:40 +00:00
age_keys: list["KeyPair"],
) -> None:
2024-01-19 13:24:40 +00:00
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "new-clan",
substitutions={
"__CHANGE_ME__": "_test_vm_persistence",
"git+https://git.clan.lol/clan/clan-core": "path://" + str(CLAN_CORE),
},
machine_configs=dict(
my_machine=dict(
services=dict(getty=dict(autologinUser="root")),
clanCore=dict(
state=dict(
my_state=dict(
folders=[
# to be owned by root
"/var/my-state"
# to be owned by user 'test'
"/var/user-state"
]
)
)
),
# create test user
# TODO: test persisting files via that user
users=dict(
users=dict(
test=dict(
password="test",
isNormalUser=True,
),
root=dict(password="root"),
)
),
systemd=dict(
services=dict(
create_state=dict(
description="Create a file in the state folder",
wantedBy=["multi-user.target"],
script="""
if [ ! -f /var/my-state/root ]; then
echo "Creating a file in the state folder"
echo "dream2nix" > /var/my-state/root
# create /var/my-state/test owned by user test
echo "dream2nix" > /var/my-state/test
chown test /var/my-state/test
# make sure /var/user-state is owned by test
chown test /var/user-state
fi
""",
serviceConfig=dict(
Type="oneshot",
),
),
reboot=dict(
description="Reboot the machine",
wantedBy=["multi-user.target"],
after=["my-state.service"],
script="""
if [ ! -f /var/my-state/rebooting ]; then
echo "Rebooting the machine"
touch /var/my-state/rebooting
reboot
else
touch /var/my-state/rebooted
fi
""",
),
read_after_reboot=dict(
description="Read a file in the state folder",
wantedBy=["multi-user.target"],
after=["reboot.service"],
# TODO: currently state folders itself cannot be owned by users
script="""
if ! cat /var/my-state/test; then
echo "cannot read from state file" > /var/my-state/error
# ensure root file is owned by root
elif [ "$(stat -c '%U' /var/my-state/root)" != "root" ]; then
echo "state file /var/my-state/root is not owned by user root" > /var/my-state/error
# ensure test file is owned by test
elif [ "$(stat -c '%U' /var/my-state/test)" != "test" ]; then
echo "state file /var/my-state/test is not owned by user test" > /var/my-state/error
fi
# ensure /var/user-state is owned by test
# if [ "$(stat -c '%U' /var/user-state)" != "test" ]; then
# echo "state folder /var/user-state is not owned by user test" > /var/my-state/error
# fi
""",
serviceConfig=dict(
Type="oneshot",
),
),
# TODO: implement shutdown via qmp instead of this hack
poweroff=dict(
description="Poweroff the machine",
wantedBy=["multi-user.target"],
after=["read_after_reboot.service"],
script="""
sleep 5
poweroff
""",
),
)
),
clan=dict(virtualisation=dict(graphics=False)),
)
),
)
monkeypatch.chdir(flake.path)
# run the machine in a separate thread
def run() -> None:
Cli().run(["vms", "run", "my_machine"])
t = threading.Thread(target=run, name="run")
t.daemon = True
t.start()
state_dir = vm_state_dir("_test_vm_persistence", str(flake.path), "my_machine")
# wait until socket file exists
while True:
if (state_dir / "qga.sock").exists():
break
sleep(0.1)
qga = QgaSession(os.path.realpath(str(state_dir / "qga.sock")))
# wait for the machine to reboot
while True:
try:
# this might crash as the operation is not atomic
exitcode, out = qga.run("cat /var/my-state/rebooted")
if exitcode == 0:
break
except Exception:
pass
finally:
sleep(0.1)
# ensure that /etc get persisted (required to persist user IDs)
exitcode, out = qga.run("ls /vmstate/.rw-etc/upper")
assert exitcode == 0, out
exitcode, out = qga.run("cat /var/my-state/test")
assert exitcode == 0, out
assert out == "dream2nix\n", out
# check for errors
exitcode, out = qga.run("cat /var/my-state/error")
assert exitcode == 1, out