clan-vm-manager: Add register_on_change to GKVStore. Improve overall signal typing.
All checks were successful
checks / check-links (pull_request) Successful in 22s
checks / checks-impure (pull_request) Successful in 1m54s
checks / checks (pull_request) Successful in 2m22s

This commit is contained in:
Luis Hebendanz 2024-03-06 15:05:10 +07:00
parent b9ae911246
commit fb21a7378d
7 changed files with 75 additions and 62 deletions

View File

@ -52,7 +52,7 @@ class MainApplication(Adw.Application):
self.connect("activate", self.on_activate)
self.connect("shutdown", self.on_shutdown)
def on_shutdown(self, *_args: Any) -> None:
def on_shutdown(self, source: "MainApplication") -> None:
log.debug("Shutting down Adw.Application")
log.debug(f"get_windows: {self.get_windows()}")
if self.window:
@ -97,7 +97,7 @@ class MainApplication(Adw.Application):
def dummy_menu_entry(self) -> None:
log.info("Dummy menu entry called")
def on_activate(self, app: Any) -> None:
def on_activate(self, source: "MainApplication") -> None:
if not self.window:
self.init_style()
self.window = MainWindow(config=ClanConfig(initial_view="list"))

View File

@ -167,7 +167,7 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
def __setitem__(self, key: K, value: V) -> None:
# If the key already exists, remove it O(n)
if key in self._items:
log.warning("Updating an existing key in GKVStore is O(n)")
log.debug("Updating an existing key in GKVStore is O(n)")
position = self.keys().index(key)
self._items[key] = value
self.items_changed(position, 1, 1)
@ -213,3 +213,8 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
def last(self) -> V:
return self.values()[-1]
def register_on_change(
self, callback: Callable[["GKVStore[K,V]", int, int, int], None]
) -> None:
self.connect("items-changed", callback)

View File

@ -30,11 +30,11 @@ log = logging.getLogger(__name__)
class VMObject(GObject.Object):
# Define a custom signal with the name "vm_stopped" and a string argument for the message
__gsignals__: ClassVar = {
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object])
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, [])
}
def vm_status_changed_task(self) -> bool:
self.emit("vm_status_changed", self)
def _vm_status_changed_task(self) -> bool:
self.emit("vm_status_changed")
return GLib.SOURCE_REMOVE
def update(self, data: HistoryEntry) -> None:
@ -78,14 +78,14 @@ class VMObject(GObject.Object):
# and block the signal while we change the state. This is cursed.
self.switch = Gtk.Switch()
self.switch_handler_id: int = self.switch.connect(
"notify::active", self.on_switch_toggle
"notify::active", self._on_switch_toggle
)
self.connect("vm_status_changed", self.on_vm_status_changed)
self.connect("vm_status_changed", self._on_vm_status_changed)
# Make sure the VM is killed when the reference to this object is dropped
self._finalizer = weakref.finalize(self, self.kill_ref_drop)
self._finalizer = weakref.finalize(self, self._kill_ref_drop)
def on_vm_status_changed(self, vm: "VMObject", _vm: "VMObject") -> None:
def _on_vm_status_changed(self, source: "VMObject") -> None:
self.switch.set_state(self.is_running() and not self.is_building())
if self.switch.get_sensitive() is False and not self.is_building():
self.switch.set_sensitive(True)
@ -99,7 +99,7 @@ class VMObject(GObject.Object):
self.switch.handler_unblock(self.switch_handler_id)
log.error(f"VM exited with error. Exitcode: {exitc}")
def on_switch_toggle(self, switch: Gtk.Switch, user_state: bool) -> None:
def _on_switch_toggle(self, switch: Gtk.Switch, user_state: bool) -> None:
if switch.get_active():
switch.set_state(False)
self.start()
@ -111,7 +111,7 @@ class VMObject(GObject.Object):
# We use a context manager to create the machine object
# and make sure it is destroyed when the context is exited
@contextmanager
def create_machine(self) -> Generator[Machine, None, None]:
def _create_machine(self) -> Generator[Machine, None, None]:
uri = ClanURI.from_str(
url=self.data.flake.flake_url, flake_attr=self.data.flake.flake_attr
)
@ -137,7 +137,7 @@ class VMObject(GObject.Object):
return GLib.SOURCE_REMOVE
def __start(self) -> None:
with self.create_machine() as machine:
with self._create_machine() as machine:
# Start building VM
tstart = datetime.now()
log.info(f"Building VM {self.get_id()}")
@ -149,7 +149,7 @@ class VMObject(GObject.Object):
machine=machine,
tmpdir=log_dir,
)
GLib.idle_add(self.vm_status_changed_task)
GLib.idle_add(self._vm_status_changed_task)
# Start the logs watcher
self._logs_id = GLib.timeout_add(
@ -174,7 +174,7 @@ class VMObject(GObject.Object):
# Check if the VM was built successfully
if self.build_process.proc.exitcode != 0:
log.error(f"Failed to build VM {self.get_id()}")
GLib.idle_add(self.vm_status_changed_task)
GLib.idle_add(self._vm_status_changed_task)
return
log.info(f"Successfully built VM {self.get_id()}")
@ -186,7 +186,7 @@ class VMObject(GObject.Object):
vm=self.data.flake.vm,
)
log.debug(f"Started VM {self.get_id()}")
GLib.idle_add(self.vm_status_changed_task)
GLib.idle_add(self._vm_status_changed_task)
# Start the logs watcher
self._logs_id = GLib.timeout_add(50, self._get_logs_task, self.vm_process)
@ -197,7 +197,7 @@ class VMObject(GObject.Object):
# Wait for the VM to stop
self.vm_process.proc.join()
log.debug(f"VM {self.get_id()} has stopped")
GLib.idle_add(self.vm_status_changed_task)
GLib.idle_add(self._vm_status_changed_task)
def start(self) -> None:
if self.is_running():
@ -273,7 +273,7 @@ class VMObject(GObject.Object):
# Try 20 times to stop the VM
time.sleep(self.KILL_TIMEOUT / 20)
GLib.idle_add(self.vm_status_changed_task)
GLib.idle_add(self._vm_status_changed_task)
log.debug(f"VM {self.get_id()} has stopped")
def shutdown(self) -> None:
@ -288,7 +288,7 @@ class VMObject(GObject.Object):
self._stop_thread = threading.Thread(target=self.__stop)
self._stop_thread.start()
def kill_ref_drop(self) -> None:
def _kill_ref_drop(self) -> None:
if self.is_running():
log.warning("Killing VM due to reference drop")
self.kill()

View File

@ -7,6 +7,7 @@ import gi
from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import HistoryEntry, add_history
from clan_vm_manager.components.gkvstore import GKVStore
from clan_vm_manager.singletons.use_vms import ClanStore
gi.require_version("Gtk", "4.0")
@ -20,14 +21,14 @@ class JoinValue(GObject.Object):
# TODO: custom signals for async join
__gsignals__: ClassVar = {
"join_finished": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object]),
"join_finished": (GObject.SignalFlags.RUN_FIRST, None, []),
}
url: ClanURI
entry: HistoryEntry | None
def _join_finished_task(self) -> bool:
self.emit("join_finished", self)
self.emit("join_finished")
return GLib.SOURCE_REMOVE
def __init__(self, url: ClanURI) -> None:
@ -64,25 +65,11 @@ class JoinList:
cls.list_store = Gio.ListStore.new(JoinValue)
# Rerendering the join list every time an item changes in the clan_store
ClanStore.use().clan_store.connect(
"items-changed", cls._instance.on_clan_store_items_changed
)
ClanStore.use().register_on_deep_change(cls._instance._rerender_join_list)
return cls._instance
def on_clan_store_items_changed(
self, source: Any, position: int, removed: int, added: int
) -> None:
if added > 0:
# Rerendering the join list every time an item changes in the vmstore
ClanStore.use().clan_store.values()[position].connect(
"items-changed", self.on_vm_store_items_changed
)
self.list_store.items_changed(
0, self.list_store.get_n_items(), self.list_store.get_n_items()
)
def on_vm_store_items_changed(
self, source: Any, position: int, removed: int, added: int
def _rerender_join_list(
self, source: GKVStore, position: int, removed: int, added: int
) -> None:
self.list_store.items_changed(
0, self.list_store.get_n_items(), self.list_store.get_n_items()
@ -91,9 +78,7 @@ class JoinList:
def is_empty(self) -> bool:
return self.list_store.get_n_items() == 0
def push(
self, uri: ClanURI, after_join: Callable[[JoinValue, JoinValue], None]
) -> None:
def push(self, uri: ClanURI, after_join: Callable[[JoinValue], None]) -> None:
"""
Add a join request.
This method can add multiple join requests if called subsequently for each request.
@ -109,10 +94,10 @@ class JoinList:
self.list_store.append(value)
def _on_join_finished(self, _source: GObject.Object, value: JoinValue) -> None:
log.info(f"Join finished: {value.url}")
self.discard(value)
ClanStore.use().push_history_entry(value.entry)
def _on_join_finished(self, source: JoinValue) -> None:
log.info(f"Join finished: {source.url}")
self.discard(source)
ClanStore.use().push_history_entry(source.entry)
def discard(self, value: JoinValue) -> None:
(has, idx) = self.list_store.find(value)

View File

@ -1,4 +1,5 @@
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any
@ -40,6 +41,27 @@ class ClanStore:
return cls._instance
def register_on_deep_change(
self, callback: Callable[[GKVStore, int, int, int], None]
) -> None:
"""
Register a callback that is called when a clan_store or one of the included VMStores changes
"""
def on_vmstore_change(
store: VMStore, position: int, removed: int, added: int
) -> None:
callback(store, position, removed, added)
def on_clanstore_change(
store: "GKVStore", position: int, removed: int, added: int
) -> None:
if added > 0:
store.register_on_change(on_vmstore_change)
callback(store, position, removed, added)
self.clan_store.register_on_change(on_clanstore_change)
@property
def clan_store(self) -> GKVStore[str, VMStore]:
return self._clan_store

View File

@ -102,7 +102,7 @@ class ClanList(Gtk.Box):
return grp
def on_add(self, action: Any, parameter: Any) -> None:
def on_add(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Adding new machine", target)
@ -176,23 +176,25 @@ class ClanList(Gtk.Box):
return row
def on_edit(self, action: Any, parameter: Any) -> None:
def on_edit(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Editing settings for machine", target)
def render_join_row(self, boxed_list: Gtk.ListBox, item: JoinValue) -> Gtk.Widget:
def render_join_row(
self, boxed_list: Gtk.ListBox, join_val: JoinValue
) -> Gtk.Widget:
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
log.debug("Rendering join row for %s", item.url)
log.debug("Rendering join row for %s", join_val.url)
row = Adw.ActionRow()
row.set_title(item.url.params.flake_attr)
row.set_subtitle(item.url.get_internal())
row.set_title(join_val.url.params.flake_attr)
row.set_subtitle(join_val.url.get_internal())
row.add_css_class("trust")
vm = ClanStore.use().get_vm(item.url)
vm = ClanStore.use().get_vm(join_val.url)
# Can't do this here because clan store is empty at this point
if vm is not None:
@ -202,19 +204,19 @@ class ClanList(Gtk.Box):
)
avatar = Adw.Avatar()
avatar.set_text(str(item.url.params.flake_attr))
avatar.set_text(str(join_val.url.params.flake_attr))
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
cancel_button = Gtk.Button(label="Cancel")
cancel_button.add_css_class("error")
cancel_button.connect("clicked", partial(self.on_discard_clicked, item))
cancel_button.connect("clicked", partial(self.on_discard_clicked, join_val))
self.cancel_button = cancel_button
trust_button = Gtk.Button(label="Join")
trust_button.add_css_class("success")
trust_button.connect("clicked", partial(self.on_trust_clicked, item))
trust_button.connect("clicked", partial(self.on_trust_clicked, join_val))
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
@ -225,22 +227,22 @@ class ClanList(Gtk.Box):
return row
def on_join_request(self, widget: Any, url: str) -> None:
def on_join_request(self, source: Any, url: str) -> None:
log.debug("Join request: %s", url)
clan_uri = ClanURI.from_str(url)
JoinList.use().push(clan_uri, self.on_after_join)
def on_after_join(self, source: JoinValue, item: JoinValue) -> None:
def on_after_join(self, source: JoinValue) -> None:
# If the join request list is empty disable the shadow artefact
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")
def on_trust_clicked(self, value: JoinValue, widget: Gtk.Widget) -> None:
widget.set_sensitive(False)
def on_trust_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
source.set_sensitive(False)
self.cancel_button.set_sensitive(False)
value.join()
def on_discard_clicked(self, value: JoinValue, widget: Gtk.Widget) -> None:
def on_discard_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
JoinList.use().discard(value)
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")

View File

@ -1,6 +1,5 @@
import logging
import threading
from typing import Any
import gi
from clan_cli.history.list import list_history
@ -69,7 +68,7 @@ class MainWindow(Adw.ApplicationWindow):
log.debug("Killing all VMs")
ClanStore.use().kill_all()
def on_destroy(self, *_args: Any) -> None:
def on_destroy(self, source: "Adw.ApplicationWindow") -> None:
log.info("====Destroying Adw.ApplicationWindow===")
ClanStore.use().kill_all()
self.tray_icon.destroy()