clan_vm_manager: Added VM shutdown timeout
All checks were successful
checks-impure / test (pull_request) Successful in 1m42s
checks / test (pull_request) Successful in 2m48s

This commit is contained in:
Luis Hebendanz 2024-02-12 14:16:44 +07:00
parent 7b48535a98
commit 0ce8bcd018
8 changed files with 65 additions and 81 deletions

View File

@ -177,7 +177,7 @@ class Machine:
[
"--impure",
"--expr",
f'(builtins.fetchTree {{ type = "file"; url = "{config_json.name}"; }}).narHash',
f'(builtins.fetchTree {{ type = "file"; url = "file://{config_json.name}"; }}).narHash',
]
)
).stdout.strip()

View File

@ -12,7 +12,6 @@ from collections.abc import Iterator
from dataclasses import dataclass, field
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import IO
from ..cmd import Log, run
from ..dirs import machine_gcroot, module_root, user_cache_dir, vm_state_dir
@ -147,9 +146,7 @@ def qemu_command(
# TODO move this to the Machines class
def get_vm_create_info(
machine: Machine, vm: VmConfig, nix_options: list[str]
) -> dict[str, str]:
def build_vm(machine: Machine, vm: VmConfig, nix_options: list[str]) -> dict[str, str]:
config = nix_config()
system = config["system"]
@ -272,19 +269,12 @@ def start_waypipe(cid: int | None, title_prefix: str) -> Iterator[None]:
proc.kill()
def run_vm(
vm: VmConfig,
nix_options: list[str] = [],
log_fd: IO[str] | None = None,
) -> None:
"""
log_fd can be used to stream the output of all commands to a UI
"""
def run_vm(vm: VmConfig, nix_options: list[str] = []) -> None:
machine = Machine(vm.machine_name, vm.flake_url)
log.debug(f"Creating VM for {machine}")
# TODO: We should get this from the vm argument
nixos_config = get_vm_create_info(machine, vm, nix_options)
nixos_config = build_vm(machine, vm, nix_options)
# store the temporary rootfs inside XDG_CACHE_HOME on the host
# otherwise, when using /tmp, we risk running out of memory

View File

@ -26,7 +26,6 @@ def test_history_add(
"add",
str(uri),
]
cli.run(cmd)
history_file = user_history_file()

View File

@ -1,5 +1,4 @@
from dataclasses import dataclass
from enum import StrEnum
import gi
@ -9,8 +8,3 @@ gi.require_version("Gtk", "4.0")
@dataclass
class ClanConfig:
initial_view: str
class VMStatus(StrEnum):
RUNNING = "Running"
STOPPED = "Stopped"

View File

@ -1,6 +1,7 @@
import os
import tempfile
import weakref
from datetime import datetime
from pathlib import Path
from typing import IO, Any, ClassVar
@ -13,7 +14,6 @@ from clan_cli.history.list import list_history
from clan_vm_manager import assets
from clan_vm_manager.errors.show_error import show_error_dialog
from clan_vm_manager.models.interfaces import VMStatus
from .executor import MPProcess, spawn
@ -98,27 +98,26 @@ class VM(GObject.Object):
# Define a custom signal with the name "vm_stopped" and a string argument for the message
__gsignals__: ClassVar = {
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object]),
"build_vm": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object, bool]),
}
def __init__(
self,
icon: Path,
status: VMStatus,
data: HistoryEntry,
) -> None:
super().__init__()
self.data = data
self.process = MPProcess("dummy", mp.Process(), Path("./dummy"))
self._watcher_id: int = 0
self._stop_watcher_id: int = 0
self._stop_timer_init: datetime | None = None
self._logs_id: int = 0
self._log_file: IO[str] | None = None
self.status = status
self._last_liveness: bool = False
self.log_dir = tempfile.TemporaryDirectory(
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
)
self._finalizer = weakref.finalize(self, self.stop)
self.connect("vm_status_changed", self._start_logs_task)
uri = ClanURI.from_str(
url=self.data.flake.flake_url, flake_attr=self.data.flake.flake_attr
@ -136,50 +135,47 @@ class VM(GObject.Object):
)
def __start(self) -> None:
if self.is_running():
log.warn("VM is already running")
return
log.info(f"Starting VM {self.get_id()}")
vm = vms.run.inspect_vm(self.machine)
GLib.idle_add(self.emit, "build_vm", self, True)
vms.run.build_vm(self.machine, vm, [])
GLib.idle_add(self.emit, "build_vm", self, False)
self.process = spawn(
on_except=None,
log_dir=Path(str(self.log_dir.name)),
func=vms.run.run_vm,
vm=vm,
)
log.debug("Starting VM")
log.debug(f"Started VM {self.get_id()}")
GLib.idle_add(self.emit, "vm_status_changed", self)
log.debug(f"Starting logs watcher on file: {self.process.out_file}")
self._logs_id = GLib.timeout_add(50, self._get_logs_task)
if self._logs_id == 0:
raise ClanError("Failed to add logs watcher")
log.debug(f"Starting VM watcher for: {self.machine.name}")
self._watcher_id = GLib.timeout_add(50, self._vm_watcher_task)
if self._watcher_id == 0:
raise ClanError("Failed to add watcher")
self.machine.qmp_connect()
def start(self) -> None:
if self.is_running():
log.warn("VM is already running")
return
threading.Thread(target=self.__start).start()
# Every 50ms check if the VM is still running
self._watcher_id = GLib.timeout_add(50, self._vm_watcher_task)
if self._watcher_id == 0:
raise ClanError("Failed to add watcher")
def _vm_watcher_task(self) -> bool:
if self.is_running() != self._last_liveness:
if not self.is_running():
self.emit("vm_status_changed", self)
prev_liveness = self._last_liveness
self._last_liveness = self.is_running()
log.debug("Removing VM watcher")
return GLib.SOURCE_REMOVE
# If the VM was running and now it is not, remove the watcher
if prev_liveness and not self.is_running():
log.debug("Removing VM watcher")
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
def _start_logs_task(self, obj: Any, vm: Any) -> None:
if self.is_running():
log.debug(f"Starting logs watcher on file: {self.process.out_file}")
self._logs_id = GLib.timeout_add(50, self._get_logs_task)
else:
log.debug("Not starting logs watcher")
def _get_logs_task(self) -> bool:
if not self.process.out_file.exists():
return GLib.SOURCE_CONTINUE
@ -192,15 +188,15 @@ class VM(GObject.Object):
self._log_file = None
return GLib.SOURCE_REMOVE
line = os.read(self._log_file.fileno(), 4096)
if len(line) != 0:
print(line.decode("utf-8"), end="", flush=True)
if not self.is_running():
log.debug("Removing logs watcher")
self._log_file = None
return GLib.SOURCE_REMOVE
line = os.read(self._log_file.fileno(), 4096)
if len(line) != 0:
print(line.decode("utf-8"), end="", flush=True)
return GLib.SOURCE_CONTINUE
def is_running(self) -> bool:
@ -209,12 +205,32 @@ class VM(GObject.Object):
def get_id(self) -> str:
return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}"
def __shutdown_watchdog(self) -> None:
if self.is_running():
assert self._stop_timer_init is not None
diff = datetime.now() - self._stop_timer_init
if diff.seconds > 10:
log.error(f"VM {self.get_id()} has not stopped. Killing it")
self.process.kill_group()
return GLib.SOURCE_CONTINUE
else:
log.info(f"VM {self.get_id()} has stopped")
return GLib.SOURCE_REMOVE
def __stop(self) -> None:
log.info(f"Stopping VM {self.get_id()}")
self.machine.qmp_command("system_powerdown")
self._stop_timer_init = datetime.now()
self._stop_watcher_id = GLib.timeout_add(100, self.__shutdown_watchdog)
if self._stop_watcher_id == 0:
raise ClanError("Failed to add stop watcher")
def stop(self) -> None:
if not self.is_running():
return
log.info(f"Stopping VM {self.get_id()}")
# TODO: add fallback to kill the process if the QMP command fails
self.machine.qmp_command("system_powerdown")
threading.Thread(target=self.__stop).start()
def read_whole_log(self) -> str:
if not self.process.out_file.exists():
@ -296,7 +312,6 @@ def get_saved_vms() -> list[VM]:
base = VM(
icon=Path(icon),
status=VMStatus.STOPPED,
data=entry,
)
vm_list.append(base)

View File

@ -109,6 +109,7 @@ class MyLog:
self.log = logging.getLogger(__name__)
def add_debug(self, *args: Any, **kwargs: Any) -> None:
return
self.log.debug(*args, **kwargs)
@ -215,30 +216,12 @@ class BaseImplementation:
"default", self.application.dummy_menu_entry
)
self.create_item()
self.downloads_item = self.create_item(
"Downloads", self.application.dummy_menu_entry
)
self.uploads_item = self.create_item(
"Uploads", self.application.dummy_menu_entry
)
self.create_item()
self.create_item("Private Chat", self.application.dummy_menu_entry)
self.create_item("Chat Rooms", self.application.dummy_menu_entry)
self.create_item("Searches", self.application.dummy_menu_entry)
self.create_item()
self.connect_disconnect_item = self.create_item(
"default", self.application.dummy_menu_entry
)
self.create_item()
self.create_item("Preferences", self.application.dummy_menu_entry)
self.create_item("_Quit", self.application.dummy_menu_entry)
def update_window_visibility(self) -> None:
@ -246,9 +229,9 @@ class BaseImplementation:
return
if self.application.window.is_visible():
label = "Hide Nicotine+"
label = "Hide VM Manager"
else:
label = "Show Nicotine+"
label = "Show VM Manager"
self.set_item_text(self.show_hide_item, label)
self.update_menu()
@ -298,11 +281,9 @@ class BaseImplementation:
pass
def set_download_status(self, status: str) -> None:
self.set_item_text(self.downloads_item, status)
self.update_menu()
def set_upload_status(self, status) -> None:
self.set_item_text(self.uploads_item, status)
self.update_menu()
def show_notification(self, title, message) -> None:

View File

@ -190,6 +190,7 @@ class ClanList(Gtk.Box):
switch.connect("notify::active", partial(self.on_row_toggle, vm))
vm.connect("vm_status_changed", partial(self.vm_status_changed, switch))
vm.connect("build_vm", self.build_vm)
# suffix.append(box)
row.add_suffix(box)
@ -293,6 +294,12 @@ class ClanList(Gtk.Box):
row.set_state(True)
vm.stop()
def build_vm(self, vm: VM, _vm: VM, building: bool) -> None:
if building:
log.info("Building VM")
else:
log.info("VM built")
def vm_status_changed(self, switch: Gtk.Switch, vm: VM, _vm: VM) -> None:
switch.set_active(vm.is_running())
switch.set_state(vm.is_running())

View File

@ -10,7 +10,6 @@
, clan-cli
, makeDesktopItem
, libadwaita
, libayatana-appindicator
}:
let
source = ./.;
@ -31,7 +30,6 @@ python3.pkgs.buildPythonApplication {
copyDesktopItems
wrapGAppsHook
gobject-introspection
libayatana-appindicator
];
buildInputs = [ gtk4 libadwaita gnome.adwaita-icon-theme ];