1
0
forked from clan/clan-core

vm-manager: move signals to emitter

This commit is contained in:
Johannes Kirschbauer 2024-04-24 14:41:29 +02:00
parent 216c560830
commit 4a66cdffaf
Signed by: hsjobeki
SSH Key Fingerprint: SHA256:vX3utDqig7Ph5L0JPv87ZTPb/w7cMzREKVZzzLFg9qU
5 changed files with 30 additions and 18 deletions

View File

@ -1,6 +1,6 @@
import logging
from collections.abc import Callable
from typing import Any, ClassVar, Generic, TypeVar
from typing import Any, Generic, TypeVar
import gi
@ -24,10 +24,6 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
This class could be optimized by having the objects remember their position in the list.
"""
__gsignals__: ClassVar = {
"is_ready": (GObject.SignalFlags.RUN_FIRST, None, []),
}
def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None:
super().__init__()
self.gtype = gtype

View File

@ -43,8 +43,11 @@ class EmptySplash(Gtk.Box):
join_button = Gtk.Button(label="Join")
join_button.connect("clicked", self._on_join, join_entry)
join_entry.connect("activate", lambda e: self._on_join(join_button, e))
clamp = Adw.Clamp()
clamp.set_maximum_size(400)
clamp.set_margin_bottom(40)
vbox.append(empty_label)
hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
hbox.append(join_entry)

View File

@ -1,7 +1,7 @@
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any
from typing import Any, ClassVar
import gi
from clan_cli.clan_uri import ClanURI
@ -15,7 +15,7 @@ from clan_vm_manager.views.logs import Logs
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
@ -25,10 +25,18 @@ class VMStore(GKVStore):
super().__init__(VMObject, lambda vm: vm.data.flake.flake_attr)
class Emitter(GObject.GObject):
__gsignals__: ClassVar = {
"is_ready": (GObject.SignalFlags.RUN_FIRST, None, []),
}
class ClanStore:
_instance: "None | ClanStore" = None
_clan_store: GKVStore[str, VMStore]
_emitter: Emitter
# set the vm that is outputting logs
# build logs are automatically streamed to the logs-view
_logging_vm: VMObject | None = None
@ -44,9 +52,16 @@ class ClanStore:
cls._clan_store = GKVStore(
VMStore, lambda store: store.first().data.flake.flake_url
)
cls._emitter = Emitter()
return cls._instance
def emit(self, signal: str) -> None:
self._emitter.emit(signal)
def connect(self, signal: str, cb: Callable[(...), Any]) -> None:
self._emitter.connect(signal, cb)
def set_logging_vm(self, ident: str) -> VMObject | None:
vm = self.get_vm(ClanURI(f"clan://{ident}"))
if vm is not None:

View File

@ -74,11 +74,11 @@ class ClanList(Gtk.Box):
self.join_boxed_list.add_css_class("join-list")
self.append(self.join_boxed_list)
clan_store = ClanStore.use().clan_store
clan_store = ClanStore.use()
clan_store.connect("is_ready", self.display_splash)
self.group_list = create_boxed_list(
model=clan_store, render_row=self.render_group_row
model=clan_store.clan_store, render_row=self.render_group_row
)
self.group_list.add_css_class("group-list")
self.append(self.group_list)
@ -338,7 +338,7 @@ class ClanList(Gtk.Box):
def on_after_join(self, source: JoinValue) -> None:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"Added/updated {source.url.machine.name}").toast,
SuccessToast(f"Updated {source.url.machine.name}").toast,
"success.join",
)
# If the join request list is empty disable the shadow artefact

View File

@ -1,6 +1,5 @@
import logging
import threading
from collections.abc import Callable
import gi
from clan_cli.history.list import list_history
@ -42,9 +41,7 @@ class MainWindow(Adw.ApplicationWindow):
self.tray_icon: TrayIcon = TrayIcon(app)
# Initialize all ClanStore
threading.Thread(
target=self._populate_vms, args=[self._set_clan_store_ready]
).start()
threading.Thread(target=self._populate_vms).start()
# Initialize all views
stack_view = ViewStack.use().view
@ -68,16 +65,17 @@ class MainWindow(Adw.ApplicationWindow):
self.connect("destroy", self.on_destroy)
def _set_clan_store_ready(self) -> None:
ClanStore.use().clan_store.emit("is_ready")
def _set_clan_store_ready(self) -> bool:
ClanStore.use().emit("is_ready")
return GLib.SOURCE_REMOVE
def _populate_vms(self, done: Callable[[], None]) -> None:
def _populate_vms(self) -> None:
# Execute `clan flakes add <path>` to democlan for this to work
# TODO: Make list_history a generator function
for entry in list_history():
GLib.idle_add(ClanStore.use().create_vm_task, entry)
GLib.idle_add(done)
GLib.idle_add(self._set_clan_store_ready)
def kill_vms(self) -> None:
log.debug("Killing all VMs")