forked from clan/clan-core
clan_cli: Added lazy qmp
This commit is contained in:
parent
92ec3fb9f9
commit
03b9183e04
@ -2,10 +2,10 @@ import json
|
||||
import logging
|
||||
from collections.abc import Generator
|
||||
from contextlib import contextmanager
|
||||
from os import path
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
from clan_cli.dirs import vm_state_dir
|
||||
from qemu.qmp import QEMUMonitorProtocol
|
||||
|
||||
from ..cmd import run
|
||||
@ -17,37 +17,25 @@ log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VMAttr:
|
||||
def __init__(self, machine_name: str) -> None:
|
||||
self.temp_dir = TemporaryDirectory(prefix="clan_vm-", suffix=f"-{machine_name}")
|
||||
self._qmp_socket: Path = Path(self.temp_dir.name) / "qmp.sock"
|
||||
self._qga_socket: Path = Path(self.temp_dir.name) / "qga.sock"
|
||||
def __init__(self, state_dir: Path) -> None:
|
||||
self._qmp_socket: Path = state_dir / "qmp.sock"
|
||||
self._qga_socket: Path = state_dir / "qga.sock"
|
||||
self._qmp: QEMUMonitorProtocol | None = None
|
||||
|
||||
@contextmanager
|
||||
def qmp(self) -> Generator[QEMUMonitorProtocol, None, None]:
|
||||
if self._qmp is None:
|
||||
log.debug(f"qmp_socket: {self._qmp_socket}")
|
||||
self._qmp = QEMUMonitorProtocol(path.realpath(self._qmp_socket))
|
||||
rpath = self._qmp_socket.resolve()
|
||||
if not rpath.exists():
|
||||
raise ClanError(f"qmp socket {rpath} does not exist")
|
||||
self._qmp = QEMUMonitorProtocol(str(rpath))
|
||||
self._qmp.connect()
|
||||
try:
|
||||
yield self._qmp
|
||||
finally:
|
||||
self._qmp.close()
|
||||
|
||||
@property
|
||||
def qmp_socket(self) -> Path:
|
||||
if self._qmp is None:
|
||||
log.debug(f"qmp_socket: {self._qmp_socket}")
|
||||
self._qmp = QEMUMonitorProtocol(path.realpath(self._qmp_socket))
|
||||
return self._qmp_socket
|
||||
|
||||
@property
|
||||
def qga_socket(self) -> Path:
|
||||
if self._qmp is None:
|
||||
log.debug(f"qmp_socket: {self.qga_socket}")
|
||||
self._qmp = QEMUMonitorProtocol(path.realpath(self._qmp_socket))
|
||||
return self._qga_socket
|
||||
|
||||
|
||||
class Machine:
|
||||
def __init__(
|
||||
@ -70,7 +58,9 @@ class Machine:
|
||||
|
||||
self._deployment_info: None | dict[str, str] = deployment_info
|
||||
|
||||
self.vm: VMAttr = VMAttr(name)
|
||||
state_dir = vm_state_dir(flake_url=str(self.flake), vm_name=self.name)
|
||||
|
||||
self.vm: VMAttr = VMAttr(state_dir)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Machine(name={self.name}, flake={self.flake})"
|
||||
|
@ -1,10 +1,7 @@
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import traceback
|
||||
from collections.abc import Generator
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
from typing import TYPE_CHECKING
|
||||
@ -24,34 +21,6 @@ if TYPE_CHECKING:
|
||||
no_kvm = not os.path.exists("/dev/kvm")
|
||||
|
||||
|
||||
@contextmanager
|
||||
def monkeypatch_tempdir_with_custom_path(
|
||||
*, monkeypatch: pytest.MonkeyPatch, custom_path: str, prefix_condition: str
|
||||
) -> Generator[None, None, None]:
|
||||
# Custom wrapper function that checks the prefix and either modifies the behavior or falls back to the original
|
||||
class CustomTemporaryDirectory(tempfile.TemporaryDirectory):
|
||||
def __init__(
|
||||
self,
|
||||
suffix: str | None = None,
|
||||
prefix: str | None = None,
|
||||
dir: str | None = None, # noqa: A002
|
||||
) -> None:
|
||||
if prefix == prefix_condition:
|
||||
self.name = custom_path # Use the custom path
|
||||
self._finalizer = None # Prevent cleanup attempts on the custom path by the original finalizer
|
||||
else:
|
||||
super().__init__(suffix=suffix, prefix=prefix, dir=dir)
|
||||
|
||||
# Use ExitStack to ensure unpatching
|
||||
try:
|
||||
# Patch the TemporaryDirectory with our custom class
|
||||
monkeypatch.setattr(tempfile, "TemporaryDirectory", CustomTemporaryDirectory)
|
||||
yield # This allows the code within the 'with' block of this context manager to run
|
||||
finally:
|
||||
# Unpatch the TemporaryDirectory
|
||||
monkeypatch.undo()
|
||||
|
||||
|
||||
def run_vm_in_thread(machine_name: str) -> None:
|
||||
# runs machine and prints exceptions
|
||||
def run() -> None:
|
||||
@ -71,14 +40,14 @@ def run_vm_in_thread(machine_name: str) -> None:
|
||||
# wait for qmp socket to exist
|
||||
def wait_vm_up(state_dir: Path) -> None:
|
||||
socket_file = state_dir / "qga.sock"
|
||||
timeout: float = 50
|
||||
while timeout > 0:
|
||||
timeout = 5.0
|
||||
while True:
|
||||
if timeout <= 0:
|
||||
raise TimeoutError(f"qga socket {socket_file} not found")
|
||||
if socket_file.exists():
|
||||
break
|
||||
sleep(0.1)
|
||||
timeout -= 0.1
|
||||
if timeout <= 0:
|
||||
raise TimeoutError(f"{socket_file} did not appear")
|
||||
|
||||
|
||||
# wait for vm to be down by checking if qga socket is down
|
||||
@ -161,18 +130,18 @@ def test_vm_qmp(
|
||||
monkeypatch.chdir(flake.path)
|
||||
|
||||
# the state dir is a point of reference for qemu interactions as it links to the qga/qmp sockets
|
||||
qmp_state_dir = temporary_home / "vm-tmp"
|
||||
state_dir = vm_state_dir(str(flake.path), "my_machine")
|
||||
|
||||
with monkeypatch_tempdir_with_custom_path(
|
||||
monkeypatch=monkeypatch,
|
||||
custom_path=str(qmp_state_dir),
|
||||
prefix_condition="machine_vm-",
|
||||
):
|
||||
# start the VM
|
||||
run_vm_in_thread("my_machine")
|
||||
# start the VM
|
||||
run_vm_in_thread("my_machine")
|
||||
|
||||
# connect with qmp
|
||||
qmp = qmp_connect(qmp_state_dir)
|
||||
qmp = qmp_connect(state_dir)
|
||||
|
||||
# verify that issuing a command works
|
||||
# result = qmp.cmd_obj({"execute": "query-status"})
|
||||
result = qmp.command("query-status")
|
||||
assert result["status"] == "running", result
|
||||
|
||||
# shutdown machine (prevent zombie qemu processes)
|
||||
qmp.command("system_powerdown")
|
||||
|
@ -169,8 +169,13 @@ class VM(GObject.Object):
|
||||
vm=vm,
|
||||
)
|
||||
self.process.proc.join()
|
||||
|
||||
GLib.idle_add(self.emit, "build_vm", self, False)
|
||||
|
||||
if self.process.proc.exitcode != 0:
|
||||
log.error(f"Failed to build VM {self.get_id()}")
|
||||
return
|
||||
|
||||
self.process = spawn(
|
||||
on_except=None,
|
||||
log_dir=Path(str(self.log_dir.name)),
|
||||
@ -189,8 +194,6 @@ class VM(GObject.Object):
|
||||
if self._watcher_id == 0:
|
||||
raise ClanError("Failed to add watcher")
|
||||
|
||||
self.machine.qmp_connect()
|
||||
|
||||
def start(self) -> None:
|
||||
if self.is_running():
|
||||
log.warn("VM is already running")
|
||||
@ -249,7 +252,12 @@ class VM(GObject.Object):
|
||||
def __stop(self) -> None:
|
||||
log.info(f"Stopping VM {self.get_id()}")
|
||||
|
||||
self.machine.qmp_command("system_powerdown")
|
||||
try:
|
||||
with self.machine.vm.qmp() as qmp:
|
||||
qmp.command("system_powerdown")
|
||||
except ClanError as e:
|
||||
log.debug(e)
|
||||
|
||||
self._stop_timer_init = datetime.now()
|
||||
self._stop_watcher_id = GLib.timeout_add(100, self.__shutdown_watchdog)
|
||||
if self._stop_watcher_id == 0:
|
||||
|
Loading…
Reference in New Issue
Block a user