clan_vm_manager: Improve VM start and stop switch. Switch will be disabled while stopping vm
All checks were successful
checks / check-links (pull_request) Successful in 22s
checks / checks-impure (pull_request) Successful in 2m0s
checks / checks (pull_request) Successful in 2m57s

This commit is contained in:
Luis Hebendanz 2024-02-26 01:04:09 +07:00
parent 36771f3ecd
commit 27b9c8915b
4 changed files with 157 additions and 115 deletions

View File

@ -30,7 +30,6 @@ class VMAttr:
@contextmanager @contextmanager
def qmp_ctx(self) -> Generator[QEMUMonitorProtocol, None, None]: def qmp_ctx(self) -> Generator[QEMUMonitorProtocol, None, None]:
if self._qmp is None: if self._qmp is None:
log.debug(f"qmp_socket: {self._qmp_socket}")
rpath = self._qmp_socket.resolve() rpath = self._qmp_socket.resolve()
if not rpath.exists(): if not rpath.exists():
raise ClanError( raise ClanError(

View File

@ -76,7 +76,6 @@ def _init_proc(
# Print some information # Print some information
pid = os.getpid() pid = os.getpid()
gpid = os.getpgid(pid=pid) gpid = os.getpgid(pid=pid)
print(f"Started new process pid={pid} gpid={gpid}", file=sys.stderr)
# Set the process name # Set the process name
_set_proc_name(proc_name) _set_proc_name(proc_name)
@ -84,20 +83,26 @@ def _init_proc(
# Close stdin # Close stdin
sys.stdin.close() sys.stdin.close()
linebreak = "=" * 5
# Execute the main function # Execute the main function
print(f"Executing function {func.__name__} now", file=sys.stderr) print(linebreak + f"{func.__name__}:{pid}" + linebreak, file=sys.stderr)
try: try:
func(**kwargs) func(**kwargs)
except Exception as ex: except Exception as ex:
traceback.print_exc() traceback.print_exc()
if on_except is not None: if on_except is not None:
on_except(ex, mp.current_process()) on_except(ex, mp.current_process())
finally:
# Kill the new process and all its children by sending a SIGTERM signal to the process group
pid = os.getpid() pid = os.getpid()
gpid = os.getpgid(pid=pid) gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr) print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
os.killpg(gpid, signal.SIGTERM) os.killpg(gpid, signal.SIGTERM)
# Don't use a finally block here, because we want the exitcode to be set to
# 0 if the function returns normally
def spawn( def spawn(
*, *,
@ -122,10 +127,6 @@ def spawn(
) )
proc.start() proc.start()
# Print some information
cmd = f"tail -f {out_file}"
log.info(f"Connect to stdout with: {cmd}")
# Return the process # Return the process
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file) mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)

View File

@ -1,6 +1,9 @@
import os import os
import tempfile import tempfile
import time
import weakref import weakref
from collections.abc import Generator
from contextlib import contextmanager
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import IO, Any, ClassVar from typing import IO, Any, ClassVar
@ -97,8 +100,7 @@ class Clans:
class VM(GObject.Object): class VM(GObject.Object):
# Define a custom signal with the name "vm_stopped" and a string argument for the message # Define a custom signal with the name "vm_stopped" and a string argument for the message
__gsignals__: ClassVar = { __gsignals__: ClassVar = {
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object]), "vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object])
"build_vm": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object, bool]),
} }
def __init__( def __init__(
@ -107,23 +109,40 @@ class VM(GObject.Object):
data: HistoryEntry, data: HistoryEntry,
) -> None: ) -> None:
super().__init__() super().__init__()
self.KILL_TIMEOUT = 6 # seconds
# Store the data from the history entry
self.data = data self.data = data
self.process = MPProcess("dummy", mp.Process(), Path("./dummy"))
self._watcher_id: int = 0 # Create a process object to store the VM process
self._stop_watcher_id: int = 0 self.vm_process = MPProcess("vm_dummy", mp.Process(), Path("./dummy"))
self._stop_timer_init: datetime | None = None self.build_process = MPProcess("build_dummy", mp.Process(), Path("./dummy"))
self._logs_id: int = 0 self._start_thread: threading.Thread = threading.Thread()
self._log_file: IO[str] | None = None self.machine: Machine | None = None
# Watcher to stop the VM
self.KILL_TIMEOUT = 20 # seconds
self._stop_thread: threading.Thread = threading.Thread()
# Build progress bar vars
self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar() self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar()
self.progress_bar.hide() self.progress_bar.hide()
self.progress_bar.set_hexpand(True) # Horizontally expand self.progress_bar.set_hexpand(True) # Horizontally expand
self.prog_bar_id: int = 0 self.prog_bar_id: int = 0
# Create a temporary directory to store the logs
self.log_dir = tempfile.TemporaryDirectory( self.log_dir = tempfile.TemporaryDirectory(
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}" prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
) )
self._logs_id: int = 0
self._log_file: IO[str] | None = None
# Make sure the VM is killed when the reference to this object is dropped
self._finalizer = weakref.finalize(self, self.kill_ref_drop) self._finalizer = weakref.finalize(self, self.kill_ref_drop)
self.connect("build_vm", self.build_vm)
# We use a context manager to create the machine object
# and make sure it is destroyed when the context is exited
@contextmanager
def create_machine(self) -> Generator[Machine, None, None]:
uri = ClanURI.from_str( uri = ClanURI.from_str(
url=self.data.flake.flake_url, flake_attr=self.data.flake.flake_attr url=self.data.flake.flake_url, flake_attr=self.data.flake.flake_attr
) )
@ -138,83 +157,82 @@ class VM(GObject.Object):
name=self.data.flake.flake_attr, name=self.data.flake.flake_attr,
flake=url, # type: ignore flake=url, # type: ignore
) )
yield self.machine
self.machine = None
def _pulse_progress_bar(self) -> bool: def _pulse_progress_bar(self) -> bool:
if self.progress_bar.is_visible():
self.progress_bar.pulse() self.progress_bar.pulse()
return GLib.SOURCE_CONTINUE return GLib.SOURCE_CONTINUE
else:
return GLib.SOURCE_REMOVE
def build_vm(self, vm: "VM", _vm: "VM", building: bool) -> None: def __start(self) -> None:
if building: with self.create_machine() as machine:
log.info("Building VM") # Start building VM
log.info(f"Building VM {self.get_id()}")
self.build_process = spawn(
on_except=None,
out_file=Path(str(self.log_dir.name)) / "build.log",
func=vms.run.build_vm,
machine=machine,
vm=self.data.flake.vm,
)
GLib.idle_add(self.emit, "vm_status_changed", self)
# Start the progress bar and show it
self.progress_bar.show() self.progress_bar.show()
self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar) self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar)
if self.prog_bar_id == 0: if self.prog_bar_id == 0:
raise ClanError("Couldn't spawn a progess bar task") log.error("Couldn't spawn a progess bar task")
else:
# Wait for the build to finish then hide the progress bar
self.build_process.proc.join()
self.progress_bar.hide() self.progress_bar.hide()
if not GLib.Source.remove(self.prog_bar_id):
log.error("Failed to remove progress bar task")
log.info("VM built")
def __start(self) -> None: # Check if the VM was built successfully
log.info(f"Starting VM {self.get_id()}") if self.build_process.proc.exitcode != 0:
vm = vms.run.inspect_vm(self.machine) log.error(f"Failed to build VM {self.get_id()}")
GLib.idle_add(self.emit, "vm_status_changed", self)
return
log.info(f"Successfully built VM {self.get_id()}")
# GLib.idle_add(self.emit, "build_vm", self, True) # Start the VM
# self.process = spawn( self.vm_process = spawn(
# on_except=None,
# log_dir=Path(str(self.log_dir.name)),
# func=vms.run.build_vm,
# machine=self.machine,
# vm=vm,
# )
# self.process.proc.join()
# GLib.idle_add(self.emit, "build_vm", self, False)
# if self.process.proc.exitcode != 0:
# log.error(f"Failed to build VM {self.get_id()}")
# return
self.process = spawn(
on_except=None, on_except=None,
out_file=Path(str(self.log_dir.name)) / "vm.log", out_file=Path(str(self.log_dir.name)) / "vm.log",
func=vms.run.run_vm, func=vms.run.run_vm,
vm=vm, vm=self.data.flake.vm,
) )
log.debug(f"Started VM {self.get_id()}") log.debug(f"Started VM {self.get_id()}")
GLib.idle_add(self.emit, "vm_status_changed", self) GLib.idle_add(self.emit, "vm_status_changed", self)
log.debug(f"Starting logs watcher on file: {self.process.out_file}")
# Start the logs watcher
self._logs_id = GLib.timeout_add(50, self._get_logs_task) self._logs_id = GLib.timeout_add(50, self._get_logs_task)
if self._logs_id == 0: if self._logs_id == 0:
raise ClanError("Failed to add logs watcher") log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.vm_process.out_file}")
log.debug(f"Starting VM watcher for: {self.machine.name}") # Wait for the VM to stop
self._watcher_id = GLib.timeout_add(50, self._vm_watcher_task) self.vm_process.proc.join()
if self._watcher_id == 0: log.debug(f"VM {self.get_id()} has stopped")
raise ClanError("Failed to add watcher") GLib.idle_add(self.emit, "vm_status_changed", self)
def start(self) -> None: def start(self) -> None:
if self.is_running(): if self.is_running():
log.warn("VM is already running") log.warn("VM is already running. Ignoring start request")
return
threading.Thread(target=self.__start).start()
def _vm_watcher_task(self) -> bool:
if not self.is_running():
self.emit("vm_status_changed", self) self.emit("vm_status_changed", self)
log.debug("Removing VM watcher") return
return GLib.SOURCE_REMOVE self._start_thread = threading.Thread(target=self.__start)
self._start_thread.start()
return GLib.SOURCE_CONTINUE
def _get_logs_task(self) -> bool: def _get_logs_task(self) -> bool:
if not self.process.out_file.exists(): if not self.vm_process.out_file.exists():
return GLib.SOURCE_CONTINUE return GLib.SOURCE_CONTINUE
if not self._log_file: if not self._log_file:
try: try:
self._log_file = open(self.process.out_file) self._log_file = open(self.vm_process.out_file)
except Exception as ex: except Exception as ex:
log.exception(ex) log.exception(ex)
self._log_file = None self._log_file = None
@ -232,42 +250,60 @@ class VM(GObject.Object):
return GLib.SOURCE_CONTINUE return GLib.SOURCE_CONTINUE
def is_running(self) -> bool: def is_running(self) -> bool:
return self.process.proc.is_alive() return self._start_thread.is_alive()
def is_building(self) -> bool:
return self.build_process.proc.is_alive()
def is_shutting_down(self) -> bool:
return self._stop_thread.is_alive()
def get_id(self) -> str: def get_id(self) -> str:
return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}" return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}"
def __shutdown_watchdog(self) -> None:
if self.is_running():
assert self._stop_timer_init is not None
diff = datetime.now() - self._stop_timer_init
if diff.seconds > self.KILL_TIMEOUT:
log.error(f"VM {self.get_id()} has not stopped. Killing it")
self.process.kill_group()
return GLib.SOURCE_CONTINUE
else:
log.info(f"VM {self.get_id()} has stopped")
return GLib.SOURCE_REMOVE
def __stop(self) -> None: def __stop(self) -> None:
log.info(f"Stopping VM {self.get_id()}") log.info(f"Stopping VM {self.get_id()}")
start_time = datetime.now()
while self.is_running():
diff = datetime.now() - start_time
if diff.seconds > self.KILL_TIMEOUT:
log.error(
f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it"
)
self.vm_process.kill_group()
return
if self.is_building():
log.info(f"VM {self.get_id()} is still building. Killing it")
self.build_process.kill_group()
return
if not self.machine:
log.error(f"Machine object is None. Killing VM {self.get_id()}")
self.vm_process.kill_group()
return
# Try to shutdown the VM gracefully using QMP
try: try:
with self.machine.vm.qmp_ctx() as qmp: with self.machine.vm.qmp_ctx() as qmp:
qmp.command("system_powerdown") qmp.command("system_powerdown")
except ClanError as e: except (OSError, ClanError):
log.debug(e) # log.debug(f"QMP command 'system_powerdown' ignored. Error: {e}")
pass
self._stop_timer_init = datetime.now() # Try 2 times a second
self._stop_watcher_id = GLib.timeout_add(100, self.__shutdown_watchdog) time.sleep(0.5)
if self._stop_watcher_id == 0: GLib.idle_add(self.emit, "vm_status_changed", self)
raise ClanError("Failed to add stop watcher") log.debug(f"VM {self.get_id()} has stopped")
def shutdown(self) -> None: def shutdown(self) -> None:
if not self.is_running(): if not self.is_running():
log.warning("VM not running. Ignoring shutdown request.")
return return
log.info(f"Stopping VM {self.get_id()}") if self.is_shutting_down():
threading.Thread(target=self.__stop).start() log.warning("Shutdown already in progress")
return
self._stop_thread = threading.Thread(target=self.__stop)
self._stop_thread.start()
def kill_ref_drop(self) -> None: def kill_ref_drop(self) -> None:
if self.is_running(): if self.is_running():
@ -279,13 +315,13 @@ class VM(GObject.Object):
log.warning(f"Tried to kill VM {self.get_id()} is not running") log.warning(f"Tried to kill VM {self.get_id()} is not running")
return return
log.info(f"Killing VM {self.get_id()} now") log.info(f"Killing VM {self.get_id()} now")
self.process.kill_group() self.vm_process.kill_group()
def read_whole_log(self) -> str: def read_whole_log(self) -> str:
if not self.process.out_file.exists(): if not self.vm_process.out_file.exists():
log.error(f"Log file {self.process.out_file} does not exist") log.error(f"Log file {self.vm_process.out_file} does not exist")
return "" return ""
return self.process.out_file.read_text() return self.vm_process.out_file.read_text()
class VMs: class VMs:

View File

@ -194,6 +194,10 @@ class ClanList(Gtk.Box):
box.append(pref_button) box.append(pref_button)
switch.connect("notify::active", partial(self.on_row_toggle, vm)) switch.connect("notify::active", partial(self.on_row_toggle, vm))
# def on_switch_state_set(switch: Any, state: bool) -> bool:
# return True
# switch.connect("state-set", on_switch_state_set)
vm.connect("vm_status_changed", partial(self.vm_status_changed, switch)) vm.connect("vm_status_changed", partial(self.vm_status_changed, switch))
# suffix.append(box) # suffix.append(box)
@ -286,18 +290,20 @@ class ClanList(Gtk.Box):
if not Join.use().list_store.get_n_items(): if not Join.use().list_store.get_n_items():
self.join_boxed_list.add_css_class("no-shadow") self.join_boxed_list.add_css_class("no-shadow")
def on_row_toggle(self, vm: VM, row: Adw.SwitchRow, state: bool) -> None: def on_row_toggle(self, vm: VM, switch: Gtk.Switch, user_state: bool) -> None:
if row.get_active(): if switch.get_active():
row.set_state(False) switch.set_state(False)
vm.start() vm.start()
else:
if not row.get_active(): switch.set_state(True)
row.set_state(True)
vm.shutdown() vm.shutdown()
switch.set_sensitive(False)
def vm_status_changed(self, switch: Gtk.Switch, vm: VM, _vm: VM) -> None: def vm_status_changed(self, switch: Gtk.Switch, vm: VM, _vm: VM) -> None:
switch.set_active(vm.is_running()) switch.set_state(vm.is_running() and not vm.is_building())
switch.set_state(vm.is_running()) if switch.get_sensitive() is False and not vm.is_building():
exitc = vm.process.proc.exitcode switch.set_sensitive(True)
exitc = vm.vm_process.proc.exitcode
if not vm.is_running() and exitc != 0: if not vm.is_running() and exitc != 0:
log.error(f"VM exited with error. Exitcode: {exitc}") log.error(f"VM exited with error. Exitcode: {exitc}")