Merge pull request 'clan-vm-manager: Added clan icon to trayicon' (#855) from Qubasa-main into main
All checks were successful
checks-impure / test (push) Successful in 1m55s
checks / test (push) Successful in 2m45s

This commit is contained in:
clan-bot 2024-02-16 09:14:08 +00:00
commit a715364338
11 changed files with 87 additions and 114 deletions

View File

@ -18,17 +18,23 @@ log = logging.getLogger(__name__)
class VMAttr: class VMAttr:
def __init__(self, state_dir: Path) -> None: def __init__(self, state_dir: Path) -> None:
# These sockets here are just symlinks to the real sockets which
# are created by the run.py file. The reason being that we run into
# file path length issues on Linux. If no qemu process is running
# the symlink will be dangling.
self._qmp_socket: Path = state_dir / "qmp.sock" self._qmp_socket: Path = state_dir / "qmp.sock"
self._qga_socket: Path = state_dir / "qga.sock" self._qga_socket: Path = state_dir / "qga.sock"
self._qmp: QEMUMonitorProtocol | None = None self._qmp: QEMUMonitorProtocol | None = None
@contextmanager @contextmanager
def qmp(self) -> Generator[QEMUMonitorProtocol, None, None]: def qmp_ctx(self) -> Generator[QEMUMonitorProtocol, None, None]:
if self._qmp is None: if self._qmp is None:
log.debug(f"qmp_socket: {self._qmp_socket}") log.debug(f"qmp_socket: {self._qmp_socket}")
rpath = self._qmp_socket.resolve() rpath = self._qmp_socket.resolve()
if not rpath.exists(): if not rpath.exists():
raise ClanError(f"qmp socket {rpath} does not exist") raise ClanError(
f"qmp socket {rpath} does not exist. Is the VM running?"
)
self._qmp = QEMUMonitorProtocol(str(rpath)) self._qmp = QEMUMonitorProtocol(str(rpath))
self._qmp.connect() self._qmp.connect()
try: try:

View File

@ -37,7 +37,7 @@ def facts_to_nixos_config(facts: dict[str, dict[str, bytes]]) -> dict:
# TODO move this to the Machines class # TODO move this to the Machines class
def build_vm( def build_vm(
machine: Machine, vm: VmConfig, tmpdir: Path, nix_options: list[str] machine: Machine, vm: VmConfig, tmpdir: Path, nix_options: list[str] = []
) -> dict[str, str]: ) -> dict[str, str]:
secrets_dir = get_secrets(machine, tmpdir) secrets_dir = get_secrets(machine, tmpdir)

View File

@ -43,7 +43,9 @@ def wait_vm_up(state_dir: Path) -> None:
timeout: float = 300 timeout: float = 300
while True: while True:
if timeout <= 0: if timeout <= 0:
raise TimeoutError(f"qga socket {socket_file} not found") raise TimeoutError(
f"qga socket {socket_file} not found. Is the VM running?"
)
if socket_file.exists(): if socket_file.exists():
break break
sleep(0.1) sleep(0.1)
@ -56,7 +58,9 @@ def wait_vm_down(state_dir: Path) -> None:
timeout: float = 300 timeout: float = 300
while socket_file.exists(): while socket_file.exists():
if timeout <= 0: if timeout <= 0:
raise TimeoutError(f"qga socket {socket_file} still exists") raise TimeoutError(
f"qga socket {socket_file} still exists. Is the VM down?"
)
sleep(0.1) sleep(0.1)
timeout -= 0.1 timeout -= 0.1

View File

@ -11,6 +11,10 @@ GTK4 has a demo application showing all widgets. You can run it by executing:
gtk4-widget-factory gtk4-widget-factory
``` ```
To find available icons execute:
```bash
gtk4-icon-browser
```

View File

@ -14,7 +14,7 @@ from gi.repository import Adw, Gdk, Gio, Gtk
from clan_vm_manager.models.interfaces import ClanConfig from clan_vm_manager.models.interfaces import ClanConfig
from clan_vm_manager.models.use_join import GLib, GObject from clan_vm_manager.models.use_join import GLib, GObject
from clan_vm_manager.models.use_vms import VMS from clan_vm_manager.models.use_vms import VMs
from .trayicon import TrayIcon from .trayicon import TrayIcon
from .windows.main_window import MainWindow from .windows.main_window import MainWindow
@ -44,7 +44,8 @@ class MainApplication(Adw.Application):
"enable debug mode", "enable debug mode",
None, None,
) )
self.vms = VMs.use()
log.debug(f"VMS object: {self.vms}")
self.window: Adw.ApplicationWindow | None = None self.window: Adw.ApplicationWindow | None = None
self.connect("shutdown", self.on_shutdown) self.connect("shutdown", self.on_shutdown)
self.connect("activate", self.show_window) self.connect("activate", self.show_window)
@ -69,24 +70,22 @@ class MainApplication(Adw.Application):
log.debug(f"Join request: {args[1]}") log.debug(f"Join request: {args[1]}")
uri = args[1] uri = args[1]
self.emit("join_request", uri) self.emit("join_request", uri)
return 0 return 0
def on_shutdown(self, app: Gtk.Application) -> None: def on_shutdown(self, app: Gtk.Application) -> None:
log.debug("Shutting down") log.debug("Shutting down")
self.vms.kill_all()
if self.tray_icon is not None: if self.tray_icon is not None:
self.tray_icon.destroy() self.tray_icon.destroy()
VMS.use().kill_all()
def on_window_hide_unhide(self, *_args: Any) -> None: def on_window_hide_unhide(self, *_args: Any) -> None:
assert self.window is not None assert self.window is not None
if self.window.is_visible(): if self.window.is_visible():
self.window.hide() self.window.hide()
return else:
self.window.present()
self.window.present()
def dummy_menu_entry(self) -> None: def dummy_menu_entry(self) -> None:
log.info("Dummy menu entry called") log.info("Dummy menu entry called")

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 KiB

View File

@ -7,7 +7,6 @@ from pathlib import Path
from typing import Any from typing import Any
import gi import gi
from clan_cli.errors import ClanError
gi.require_version("GdkPixbuf", "2.0") gi.require_version("GdkPixbuf", "2.0")
@ -24,7 +23,7 @@ def _kill_group(proc: mp.Process) -> None:
if proc.is_alive() and pid: if proc.is_alive() and pid:
os.killpg(pid, signal.SIGTERM) os.killpg(pid, signal.SIGTERM)
else: else:
log.warning(f"Process {proc.name} with pid {pid} is already dead") log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead")
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
@ -102,7 +101,7 @@ def _init_proc(
def spawn( def spawn(
*, *,
log_dir: Path, out_file: Path,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None, on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
func: Callable, func: Callable,
**kwargs: Any, **kwargs: Any,
@ -111,13 +110,8 @@ def spawn(
if mp.get_start_method(allow_none=True) is None: if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="forkserver") mp.set_start_method(method="forkserver")
if not log_dir.is_dir():
raise ClanError(f"Log path {log_dir} is not a directory")
log_dir.mkdir(parents=True, exist_ok=True)
# Set names # Set names
proc_name = f"MPExec:{func.__name__}" proc_name = f"MPExec:{func.__name__}"
out_file = log_dir / "out.log"
# Start the process # Start the process
proc = mp.Process( proc = mp.Process(

View File

@ -9,7 +9,7 @@ from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import add_history from clan_cli.history.add import add_history
from clan_vm_manager.errors.show_error import show_error_dialog from clan_vm_manager.errors.show_error import show_error_dialog
from clan_vm_manager.models.use_vms import VMS, Clans from clan_vm_manager.models.use_vms import Clans
gi.require_version("Gtk", "4.0") gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1") gi.require_version("Adw", "1")
@ -76,7 +76,7 @@ class Join:
def after_join(item: JoinValue, _: Any) -> None: def after_join(item: JoinValue, _: Any) -> None:
self.discard(item) self.discard(item)
Clans.use().refresh() Clans.use().refresh()
VMS.use().refresh() # VMS.use().refresh()
print("Refreshed list after join") print("Refreshed list after join")
on_join(item) on_join(item)

View File

@ -107,6 +107,7 @@ class VM(GObject.Object):
data: HistoryEntry, data: HistoryEntry,
) -> None: ) -> None:
super().__init__() super().__init__()
self.KILL_TIMEOUT = 6 # seconds
self.data = data self.data = data
self.process = MPProcess("dummy", mp.Process(), Path("./dummy")) self.process = MPProcess("dummy", mp.Process(), Path("./dummy"))
self._watcher_id: int = 0 self._watcher_id: int = 0
@ -121,9 +122,8 @@ class VM(GObject.Object):
self.log_dir = tempfile.TemporaryDirectory( self.log_dir = tempfile.TemporaryDirectory(
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}" prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
) )
self._finalizer = weakref.finalize(self, self.stop) self._finalizer = weakref.finalize(self, self.kill)
self.connect("build_vm", self.build_vm) self.connect("build_vm", self.build_vm)
uri = ClanURI.from_str( uri = ClanURI.from_str(
url=self.data.flake.flake_url, flake_attr=self.data.flake.flake_attr url=self.data.flake.flake_url, flake_attr=self.data.flake.flake_attr
) )
@ -160,25 +160,25 @@ class VM(GObject.Object):
log.info(f"Starting VM {self.get_id()}") log.info(f"Starting VM {self.get_id()}")
vm = vms.run.inspect_vm(self.machine) vm = vms.run.inspect_vm(self.machine)
GLib.idle_add(self.emit, "build_vm", self, True) # GLib.idle_add(self.emit, "build_vm", self, True)
self.process = spawn( # self.process = spawn(
on_except=None, # on_except=None,
log_dir=Path(str(self.log_dir.name)), # log_dir=Path(str(self.log_dir.name)),
func=vms.run.build_vm, # func=vms.run.build_vm,
machine=self.machine, # machine=self.machine,
vm=vm, # vm=vm,
) # )
self.process.proc.join() # self.process.proc.join()
GLib.idle_add(self.emit, "build_vm", self, False) # GLib.idle_add(self.emit, "build_vm", self, False)
if self.process.proc.exitcode != 0: # if self.process.proc.exitcode != 0:
log.error(f"Failed to build VM {self.get_id()}") # log.error(f"Failed to build VM {self.get_id()}")
return # return
self.process = spawn( self.process = spawn(
on_except=None, on_except=None,
log_dir=Path(str(self.log_dir.name)), out_file=Path(str(self.log_dir.name)) / "vm.log",
func=vms.run.run_vm, func=vms.run.run_vm,
vm=vm, vm=vm,
) )
@ -241,7 +241,7 @@ class VM(GObject.Object):
if self.is_running(): if self.is_running():
assert self._stop_timer_init is not None assert self._stop_timer_init is not None
diff = datetime.now() - self._stop_timer_init diff = datetime.now() - self._stop_timer_init
if diff.seconds > 10: if diff.seconds > self.KILL_TIMEOUT:
log.error(f"VM {self.get_id()} has not stopped. Killing it") log.error(f"VM {self.get_id()} has not stopped. Killing it")
self.process.kill_group() self.process.kill_group()
return GLib.SOURCE_CONTINUE return GLib.SOURCE_CONTINUE
@ -253,7 +253,7 @@ class VM(GObject.Object):
log.info(f"Stopping VM {self.get_id()}") log.info(f"Stopping VM {self.get_id()}")
try: try:
with self.machine.vm.qmp() as qmp: with self.machine.vm.qmp_ctx() as qmp:
qmp.command("system_powerdown") qmp.command("system_powerdown")
except ClanError as e: except ClanError as e:
log.debug(e) log.debug(e)
@ -263,12 +263,19 @@ class VM(GObject.Object):
if self._stop_watcher_id == 0: if self._stop_watcher_id == 0:
raise ClanError("Failed to add stop watcher") raise ClanError("Failed to add stop watcher")
def stop(self) -> None: def shutdown(self) -> None:
if not self.is_running(): if not self.is_running():
return return
log.info(f"Stopping VM {self.get_id()}") log.info(f"Stopping VM {self.get_id()}")
threading.Thread(target=self.__stop).start() threading.Thread(target=self.__stop).start()
def kill(self) -> None:
if not self.is_running():
log.warning(f"Tried to kill VM {self.get_id()} is not running")
return
log.info(f"Killing VM {self.get_id()} now")
self.process.kill_group()
def read_whole_log(self) -> str: def read_whole_log(self) -> str:
if not self.process.out_file.exists(): if not self.process.out_file.exists():
log.error(f"Log file {self.process.out_file} does not exist") log.error(f"Log file {self.process.out_file} does not exist")
@ -276,28 +283,16 @@ class VM(GObject.Object):
return self.process.out_file.read_text() return self.process.out_file.read_text()
class VMS: class VMs:
"""
This is a singleton.
It is initialized with the first call of use()
Usage:
VMS.use().get_running_vms()
VMS.use() can also be called before the data is needed. e.g. to eliminate/reduce waiting time.
"""
list_store: Gio.ListStore list_store: Gio.ListStore
_instance: "None | VMS" = None _instance: "None | VMs" = None
# Make sure the VMS class is used as a singleton # Make sure the VMS class is used as a singleton
def __init__(self) -> None: def __init__(self) -> None:
raise RuntimeError("Call use() instead") raise RuntimeError("Call use() instead")
@classmethod @classmethod
def use(cls: Any) -> "VMS": def use(cls: Any) -> "VMs":
if cls._instance is None: if cls._instance is None:
cls._instance = cls.__new__(cls) cls._instance = cls.__new__(cls)
cls.list_store = Gio.ListStore.new(VM) cls.list_store = Gio.ListStore.new(VM)
@ -327,10 +322,13 @@ class VMS:
return list(filter(lambda vm: vm.is_running(), self.list_store)) return list(filter(lambda vm: vm.is_running(), self.list_store))
def kill_all(self) -> None: def kill_all(self) -> None:
log.debug(f"Running vms: {self.get_running_vms()}")
for vm in self.get_running_vms(): for vm in self.get_running_vms():
vm.stop() vm.kill()
def refresh(self) -> None: def refresh(self) -> None:
log.error("NEVER FUCKING DO THIS")
return
self.list_store.remove_all() self.list_store.remove_all()
for vm in get_saved_vms(): for vm in get_saved_vms():
self.list_store.append(vm) self.list_store.append(vm)
@ -338,7 +336,7 @@ class VMS:
def get_saved_vms() -> list[VM]: def get_saved_vms() -> list[VM]:
vm_list = [] vm_list = []
log.info("=====CREATING NEW VM OBJ====")
try: try:
# Execute `clan flakes add <path>` to democlan for this to work # Execute `clan flakes add <path>` to democlan for this to work
for entry in list_history(): for entry in list_history():

View File

@ -172,7 +172,7 @@ class BaseImplementation:
self.application = application self.application = application
self.menu_items: dict[int, Any] = {} self.menu_items: dict[int, Any] = {}
self.menu_item_id: int = 1 self.menu_item_id: int = 1
self.activate_callback: Callable = application.on_window_hide_unhide self.activate_callback: Callable = lambda a, b: self.update_window_visibility
self.is_visible: bool = True self.is_visible: bool = True
self.create_menu() self.create_menu()
@ -213,16 +213,12 @@ class BaseImplementation:
def create_menu(self) -> None: def create_menu(self) -> None:
self.show_hide_item = self.create_item( self.show_hide_item = self.create_item(
"default", self.application.dummy_menu_entry "default", self.application.on_window_hide_unhide
) )
self.connect_disconnect_item = self.create_item( # self.create_item()
"default", self.application.dummy_menu_entry
)
self.create_item() # self.create_item("_Quit", self.application.on_shutdown)
self.create_item("_Quit", self.application.dummy_menu_entry)
def update_window_visibility(self) -> None: def update_window_visibility(self) -> None:
if self.application.window is None: if self.application.window is None:
@ -237,14 +233,6 @@ class BaseImplementation:
self.update_menu() self.update_menu()
def update_user_status(self) -> None: def update_user_status(self) -> None:
sensitive = core.users.login_status != slskmessages.UserStatus.OFFLINE
label = "_Disconnect" if sensitive else "_Connect"
# self.set_item_sensitive(self.away_item, sensitive)
self.set_item_text(self.connect_disconnect_item, label)
# self.set_item_toggled(self.away_item, core.users.login_status == slskmessages.UserStatus.AWAY)
self.update_icon() self.update_icon()
self.update_menu() self.update_menu()
@ -266,9 +254,9 @@ class BaseImplementation:
# icon_name = "disconnect" # icon_name = "disconnect"
# icon_name = f"{pynicotine.__application_id__}-{icon_name}" # icon_name = f"{pynicotine.__application_id__}-{icon_name}"
# self.set_icon_name(icon_name) # self.set_icon(icon_name)
def set_icon_name(self, icon_name: str) -> None: def set_icon(self, icon_name: str) -> None:
# Implemented in subclasses # Implemented in subclasses
pass pass
@ -410,6 +398,7 @@ class StatusNotifierImplementation(BaseImplementation):
): ):
method = self.methods[method_name] method = self.methods[method_name]
result = method.callback(*parameters.unpack()) result = method.callback(*parameters.unpack())
out_arg_types = "".join(method.out_args) out_arg_types = "".join(method.out_args)
return_value = None return_value = None
@ -570,6 +559,11 @@ class StatusNotifierImplementation(BaseImplementation):
) )
self.tray_icon.register() self.tray_icon.register()
from .assets import loc
icon_path = str(loc / "clan_white_notext.png")
self.set_icon(icon_path)
self.bus.call_sync( self.bus.call_sync(
bus_name="org.kde.StatusNotifierWatcher", bus_name="org.kde.StatusNotifierWatcher",
object_path="/StatusNotifierWatcher", object_path="/StatusNotifierWatcher",
@ -617,38 +611,12 @@ class StatusNotifierImplementation(BaseImplementation):
"""Returns an icon path to use for tray icons, or None to fall back to """Returns an icon path to use for tray icons, or None to fall back to
system-wide icons.""" system-wide icons."""
self.custom_icons = False # icon_path = self.application.get_application_icon_path()
custom_icon_path = os.path.join(config.data_folder_path, ".nicotine-icon-theme")
if hasattr(sys, "real_prefix") or sys.base_prefix != sys.prefix:
# Virtual environment
local_icon_path = os.path.join(
sys.prefix, "share", "icons", "hicolor", "scalable", "apps"
)
else:
# Git folder
local_icon_path = os.path.join(
GTK_GUI_FOLDER_PATH, "icons", "hicolor", "scalable", "apps"
)
for icon_name in ("away", "connect", "disconnect", "msg"):
# Check if custom icons exist
if self.check_icon_path(icon_name, custom_icon_path):
self.custom_icons = True
return custom_icon_path
# Check if local icons exist
if self.check_icon_path(icon_name, local_icon_path):
return local_icon_path
return "" return ""
def set_icon_name(self, icon_name): def set_icon(self, icon_path) -> None:
if self.custom_icons: self.tray_icon.properties["IconName"].value = icon_path
# Use alternative icon names to enforce custom icons, since system-wide icons take precedence
icon_name = icon_name.replace(pynicotine.__application_id__, "nplus-tray")
self.tray_icon.properties["IconName"].value = icon_name
self.tray_icon.emit_signal("NewIcon") self.tray_icon.emit_signal("NewIcon")
if not self.is_visible: if not self.is_visible:
@ -1064,7 +1032,7 @@ class Win32Implementation(BaseImplementation):
self._menu, item_id, False, byref(item_info) self._menu, item_id, False, byref(item_info)
) )
def set_icon_name(self, icon_name): def set_icon(self, icon_name):
self._update_notify_icon(icon_name=icon_name) self._update_notify_icon(icon_name=icon_name)
def show_notification(self, title, message): def show_notification(self, title, message):

View File

@ -14,7 +14,7 @@ from clan_vm_manager.models.use_views import Views
gi.require_version("Adw", "1") gi.require_version("Adw", "1")
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
from clan_vm_manager.models.use_vms import VM, VMS, ClanGroup, Clans from clan_vm_manager.models.use_vms import VM, ClanGroup, Clans
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -48,8 +48,8 @@ class ClanList(Gtk.Box):
def __init__(self, config: ClanConfig) -> None: def __init__(self, config: ClanConfig) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL) super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default() self.app = Gio.Application.get_default()
app.connect("join_request", self.on_join_request) self.app.connect("join_request", self.on_join_request)
groups = Clans.use() groups = Clans.use()
join = Join.use() join = Join.use()
@ -123,7 +123,7 @@ class ClanList(Gtk.Box):
def on_search_changed(self, entry: Gtk.SearchEntry) -> None: def on_search_changed(self, entry: Gtk.SearchEntry) -> None:
Clans.use().filter_by_name(entry.get_text()) Clans.use().filter_by_name(entry.get_text())
# Disable the shadow if the list is empty # Disable the shadow if the list is empty
if not VMS.use().list_store.get_n_items(): if not self.app.vms.list_store.get_n_items():
self.group_list.add_css_class("no-shadow") self.group_list.add_css_class("no-shadow")
def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VM) -> Gtk.Widget: def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VM) -> Gtk.Widget:
@ -202,7 +202,7 @@ class ClanList(Gtk.Box):
def on_edit(self, action: Any, parameter: Any) -> None: def on_edit(self, action: Any, parameter: Any) -> None:
target = parameter.get_string() target = parameter.get_string()
vm = VMS.use().get_by_id(target) vm = self.app.vms.get_by_id(target)
if not vm: if not vm:
raise ClanError("Something went wrong. Please restart the app.") raise ClanError("Something went wrong. Please restart the app.")
@ -220,7 +220,7 @@ class ClanList(Gtk.Box):
row.add_css_class("trust") row.add_css_class("trust")
# TODO: figure out how to detect that # TODO: figure out how to detect that
exist = VMS.use().get_by_id(item.url.get_id()) exist = self.app.vms.use().get_by_id(item.url.get_id())
if exist: if exist:
sub = row.get_subtitle() sub = row.get_subtitle()
row.set_subtitle( row.set_subtitle(
@ -292,7 +292,7 @@ class ClanList(Gtk.Box):
if not row.get_active(): if not row.get_active():
row.set_state(True) row.set_state(True)
vm.stop() vm.shutdown()
def vm_status_changed(self, switch: Gtk.Switch, vm: VM, _vm: VM) -> None: def vm_status_changed(self, switch: Gtk.Switch, vm: VM, _vm: VM) -> None:
switch.set_active(vm.is_running()) switch.set_active(vm.is_running())