1
0
forked from clan/clan-core

machines/machines: drop unused qmp wrapper

This commit is contained in:
Jörg Thalheim 2024-07-02 11:41:53 +02:00
parent d27e474b66
commit 47010f458c

View File

@ -1,14 +1,10 @@
import json import json
import logging import logging
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import Any from typing import Any
from clan_cli.clan_uri import ClanURI, MachineData from clan_cli.clan_uri import ClanURI, MachineData
from clan_cli.dirs import vm_state_dir
from clan_cli.qemu.qmp import QEMUMonitorProtocol
from ..cmd import run_no_stdout from ..cmd import run_no_stdout
from ..errors import ClanError from ..errors import ClanError
@ -18,28 +14,6 @@ from ..ssh import Host, parse_deployment_address
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class QMPWrapper:
def __init__(self, state_dir: Path) -> None:
# These sockets here are just symlinks to the real sockets which
# are created by the run.py file. The reason being that we run into
# file path length issues on Linux. If no qemu process is running
# the symlink will be dangling.
self._qmp_socket: Path = state_dir / "qmp.sock"
self._qga_socket: Path = state_dir / "qga.sock"
@contextmanager
def qmp_ctx(self) -> Generator[QEMUMonitorProtocol, None, None]:
rpath = self._qmp_socket.resolve()
if not rpath.exists():
raise ClanError(f"qmp socket {rpath} does not exist. Is the VM running?")
qmp = QEMUMonitorProtocol(str(rpath))
qmp.connect()
try:
yield qmp
finally:
qmp.close()
class Machine: class Machine:
name: str name: str
flake: str | Path flake: str | Path
@ -49,7 +23,6 @@ class Machine:
build_cache: dict[str, Path] build_cache: dict[str, Path]
_flake_path: Path | None _flake_path: Path | None
_deployment_info: None | dict _deployment_info: None | dict
vm: QMPWrapper
def __init__( def __init__(
self, self,
@ -80,10 +53,6 @@ class Machine:
self._deployment_info: None | dict = deployment_info self._deployment_info: None | dict = deployment_info
self.nix_options = nix_options self.nix_options = nix_options
state_dir = vm_state_dir(flake_url=str(self.flake), vm_name=self.data.name)
self.vm: QMPWrapper = QMPWrapper(state_dir)
def flush_caches(self) -> None: def flush_caches(self) -> None:
self._deployment_info = None self._deployment_info = None
self._flake_path = None self._flake_path = None