diff --git a/pkgs/clan-cli/clan_cli/machines/machines.py b/pkgs/clan-cli/clan_cli/machines/machines.py index 9e6af35c..63148143 100644 --- a/pkgs/clan-cli/clan_cli/machines/machines.py +++ b/pkgs/clan-cli/clan_cli/machines/machines.py @@ -177,7 +177,7 @@ class Machine: [ "--impure", "--expr", - f'(builtins.fetchTree {{ type = "file"; url = "{config_json.name}"; }}).narHash', + f'(builtins.fetchTree {{ type = "file"; url = "file://{config_json.name}"; }}).narHash', ] ) ).stdout.strip() diff --git a/pkgs/clan-cli/clan_cli/vms/run.py b/pkgs/clan-cli/clan_cli/vms/run.py index 067a2c55..40a3454e 100644 --- a/pkgs/clan-cli/clan_cli/vms/run.py +++ b/pkgs/clan-cli/clan_cli/vms/run.py @@ -12,7 +12,6 @@ from collections.abc import Iterator from dataclasses import dataclass, field from pathlib import Path from tempfile import TemporaryDirectory -from typing import IO from ..cmd import Log, run from ..dirs import machine_gcroot, module_root, user_cache_dir, vm_state_dir @@ -147,9 +146,7 @@ def qemu_command( # TODO move this to the Machines class -def get_vm_create_info( - machine: Machine, vm: VmConfig, nix_options: list[str] -) -> dict[str, str]: +def build_vm(machine: Machine, vm: VmConfig, nix_options: list[str]) -> dict[str, str]: config = nix_config() system = config["system"] @@ -272,19 +269,12 @@ def start_waypipe(cid: int | None, title_prefix: str) -> Iterator[None]: proc.kill() -def run_vm( - vm: VmConfig, - nix_options: list[str] = [], - log_fd: IO[str] | None = None, -) -> None: - """ - log_fd can be used to stream the output of all commands to a UI - """ +def run_vm(vm: VmConfig, nix_options: list[str] = []) -> None: machine = Machine(vm.machine_name, vm.flake_url) log.debug(f"Creating VM for {machine}") # TODO: We should get this from the vm argument - nixos_config = get_vm_create_info(machine, vm, nix_options) + nixos_config = build_vm(machine, vm, nix_options) # store the temporary rootfs inside XDG_CACHE_HOME on the host # otherwise, when using /tmp, we risk running out of memory diff --git a/pkgs/clan-cli/tests/test_history_cli.py b/pkgs/clan-cli/tests/test_history_cli.py index 9acc7401..d5eb34bf 100644 --- a/pkgs/clan-cli/tests/test_history_cli.py +++ b/pkgs/clan-cli/tests/test_history_cli.py @@ -26,7 +26,6 @@ def test_history_add( "add", str(uri), ] - cli.run(cmd) history_file = user_history_file() diff --git a/pkgs/clan-vm-manager/clan_vm_manager/__init__.py b/pkgs/clan-vm-manager/clan_vm_manager/__init__.py index b42ede31..2fbe0d7e 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/__init__.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/__init__.py @@ -6,6 +6,8 @@ from .app import MainApplication log = logging.getLogger(__name__) +# TODO: Trayicon support +# https://github.com/nicotine-plus/nicotine-plus/blob/b08552584eb6f35782ad77da93ae4aae3362bf64/pynicotine/gtkgui/widgets/trayicon.py#L982 def main() -> None: app = MainApplication() return app.run(sys.argv) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/app.py b/pkgs/clan-vm-manager/clan_vm_manager/app.py index 35fcb378..8248d1f0 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/app.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/app.py @@ -16,6 +16,7 @@ from clan_vm_manager.models.interfaces import ClanConfig from clan_vm_manager.models.use_join import GLib, GObject from clan_vm_manager.models.use_vms import VMS +from .trayicon import TrayIcon from .windows.main_window import MainWindow log = logging.getLogger(__name__) @@ -29,9 +30,11 @@ class MainApplication(Adw.Application): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__( *args, + application_id="lol.clan.vm.manager", flags=Gio.ApplicationFlags.HANDLES_COMMAND_LINE, **kwargs, ) + self.tray_icon: TrayIcon | None = None self.add_main_option( "debug", @@ -42,7 +45,7 @@ class MainApplication(Adw.Application): None, ) - self.win: Adw.ApplicationWindow | None = None + self.window: Adw.ApplicationWindow | None = None self.connect("shutdown", self.on_shutdown) self.connect("activate", self.show_window) @@ -70,19 +73,33 @@ class MainApplication(Adw.Application): def on_shutdown(self, app: Gtk.Application) -> None: log.debug("Shutting down") + + if self.tray_icon is not None: + self.tray_icon.destroy() + VMS.use().kill_all() + def on_window_hide_unhide(self, *_args: Any) -> None: + assert self.window is not None + if self.window.is_visible(): + self.window.hide() + return + + self.window.present() + + def dummy_menu_entry(self) -> None: + log.info("Dummy menu entry called") + def do_activate(self) -> None: self.show_window() def show_window(self, app: Any = None) -> None: - if not self.win: + if not self.window: self.init_style() - self.win = MainWindow(config=ClanConfig(initial_view="list")) - self.win.set_application(self) - icon_path = assets.loc / "clan_black.png" - self.win.set_default_icon_name(str(icon_path)) - self.win.present() + self.window = MainWindow(config=ClanConfig(initial_view="list")) + self.window.set_application(self) + self.tray_icon = TrayIcon(self) + self.window.present() # TODO: For css styling def init_style(self) -> None: diff --git a/pkgs/clan-vm-manager/clan_vm_manager/models/interfaces.py b/pkgs/clan-vm-manager/clan_vm_manager/models/interfaces.py index 1b8f36cc..28bd4649 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/models/interfaces.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/models/interfaces.py @@ -1,5 +1,4 @@ from dataclasses import dataclass -from enum import StrEnum import gi @@ -9,8 +8,3 @@ gi.require_version("Gtk", "4.0") @dataclass class ClanConfig: initial_view: str - - -class VMStatus(StrEnum): - RUNNING = "Running" - STOPPED = "Stopped" diff --git a/pkgs/clan-vm-manager/clan_vm_manager/models/use_vms.py b/pkgs/clan-vm-manager/clan_vm_manager/models/use_vms.py index ec6d2931..2e7f735c 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/models/use_vms.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/models/use_vms.py @@ -1,6 +1,7 @@ import os import tempfile import weakref +from datetime import datetime from pathlib import Path from typing import IO, Any, ClassVar @@ -13,7 +14,6 @@ from clan_cli.history.list import list_history from clan_vm_manager import assets from clan_vm_manager.errors.show_error import show_error_dialog -from clan_vm_manager.models.interfaces import VMStatus from .executor import MPProcess, spawn @@ -98,27 +98,26 @@ class VM(GObject.Object): # Define a custom signal with the name "vm_stopped" and a string argument for the message __gsignals__: ClassVar = { "vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object]), + "build_vm": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object, bool]), } def __init__( self, icon: Path, - status: VMStatus, data: HistoryEntry, ) -> None: super().__init__() self.data = data self.process = MPProcess("dummy", mp.Process(), Path("./dummy")) self._watcher_id: int = 0 + self._stop_watcher_id: int = 0 + self._stop_timer_init: datetime | None = None self._logs_id: int = 0 self._log_file: IO[str] | None = None - self.status = status - self._last_liveness: bool = False self.log_dir = tempfile.TemporaryDirectory( prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}" ) self._finalizer = weakref.finalize(self, self.stop) - self.connect("vm_status_changed", self._start_logs_task) uri = ClanURI.from_str( url=self.data.flake.flake_url, flake_attr=self.data.flake.flake_attr @@ -136,50 +135,47 @@ class VM(GObject.Object): ) def __start(self) -> None: - if self.is_running(): - log.warn("VM is already running") - return + log.info(f"Starting VM {self.get_id()}") vm = vms.run.inspect_vm(self.machine) + + GLib.idle_add(self.emit, "build_vm", self, True) + vms.run.build_vm(self.machine, vm, []) + GLib.idle_add(self.emit, "build_vm", self, False) + self.process = spawn( on_except=None, log_dir=Path(str(self.log_dir.name)), func=vms.run.run_vm, vm=vm, ) - log.debug("Starting VM") + log.debug(f"Started VM {self.get_id()}") + GLib.idle_add(self.emit, "vm_status_changed", self) + log.debug(f"Starting logs watcher on file: {self.process.out_file}") + self._logs_id = GLib.timeout_add(50, self._get_logs_task) + if self._logs_id == 0: + raise ClanError("Failed to add logs watcher") + + log.debug(f"Starting VM watcher for: {self.machine.name}") + self._watcher_id = GLib.timeout_add(50, self._vm_watcher_task) + if self._watcher_id == 0: + raise ClanError("Failed to add watcher") + self.machine.qmp_connect() def start(self) -> None: if self.is_running(): log.warn("VM is already running") return - threading.Thread(target=self.__start).start() - # Every 50ms check if the VM is still running - self._watcher_id = GLib.timeout_add(50, self._vm_watcher_task) - if self._watcher_id == 0: - raise ClanError("Failed to add watcher") - def _vm_watcher_task(self) -> bool: - if self.is_running() != self._last_liveness: + if not self.is_running(): self.emit("vm_status_changed", self) - prev_liveness = self._last_liveness - self._last_liveness = self.is_running() + log.debug("Removing VM watcher") + return GLib.SOURCE_REMOVE - # If the VM was running and now it is not, remove the watcher - if prev_liveness and not self.is_running(): - log.debug("Removing VM watcher") - return GLib.SOURCE_REMOVE return GLib.SOURCE_CONTINUE - def _start_logs_task(self, obj: Any, vm: Any) -> None: - if self.is_running(): - log.debug(f"Starting logs watcher on file: {self.process.out_file}") - self._logs_id = GLib.timeout_add(50, self._get_logs_task) - else: - log.debug("Not starting logs watcher") - def _get_logs_task(self) -> bool: if not self.process.out_file.exists(): return GLib.SOURCE_CONTINUE @@ -192,15 +188,15 @@ class VM(GObject.Object): self._log_file = None return GLib.SOURCE_REMOVE + line = os.read(self._log_file.fileno(), 4096) + if len(line) != 0: + print(line.decode("utf-8"), end="", flush=True) + if not self.is_running(): log.debug("Removing logs watcher") self._log_file = None return GLib.SOURCE_REMOVE - line = os.read(self._log_file.fileno(), 4096) - if len(line) != 0: - print(line.decode("utf-8"), end="", flush=True) - return GLib.SOURCE_CONTINUE def is_running(self) -> bool: @@ -209,12 +205,32 @@ class VM(GObject.Object): def get_id(self) -> str: return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}" + def __shutdown_watchdog(self) -> None: + if self.is_running(): + assert self._stop_timer_init is not None + diff = datetime.now() - self._stop_timer_init + if diff.seconds > 10: + log.error(f"VM {self.get_id()} has not stopped. Killing it") + self.process.kill_group() + return GLib.SOURCE_CONTINUE + else: + log.info(f"VM {self.get_id()} has stopped") + return GLib.SOURCE_REMOVE + + def __stop(self) -> None: + log.info(f"Stopping VM {self.get_id()}") + + self.machine.qmp_command("system_powerdown") + self._stop_timer_init = datetime.now() + self._stop_watcher_id = GLib.timeout_add(100, self.__shutdown_watchdog) + if self._stop_watcher_id == 0: + raise ClanError("Failed to add stop watcher") + def stop(self) -> None: if not self.is_running(): return log.info(f"Stopping VM {self.get_id()}") - # TODO: add fallback to kill the process if the QMP command fails - self.machine.qmp_command("system_powerdown") + threading.Thread(target=self.__stop).start() def read_whole_log(self) -> str: if not self.process.out_file.exists(): @@ -296,7 +312,6 @@ def get_saved_vms() -> list[VM]: base = VM( icon=Path(icon), - status=VMStatus.STOPPED, data=entry, ) vm_list.append(base) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/trayicon.py b/pkgs/clan-vm-manager/clan_vm_manager/trayicon.py new file mode 100644 index 00000000..f827a608 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/trayicon.py @@ -0,0 +1,1218 @@ +# mypy: allow-untyped-defs +# ruff: noqa: ANN201, ANN001, ANN202 + +# COPYRIGHT (C) 2020-2024 Nicotine+ Contributors +# +# GNU GENERAL PUBLIC LICENSE +# Version 3, 29 June 2007 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import sys +from collections.abc import Callable +from typing import Any, ClassVar + +from gi.repository import GdkPixbuf, Gio, GLib, Gtk + + +# DUMMY IMPLEMENTATION +################################################ +### import pynicotine +class Pynicotine: + __application_id__ = "nicotine-plus" + __application_name__ = "Nicotine+" + __version__ = "3.0.0" + + +pynicotine = Pynicotine() + + +### from pynicotine import slskmessages +class UserStatus: + OFFLINE = 0 + ONLINE = 1 + AWAY = 2 + + +class Slskmessages: + UserStatus: Any = UserStatus + + +slskmessages = Slskmessages() + + +### from pynicotine.config import config +class Config: + sections: ClassVar = { + "notifications": {"notification_popup_sound": False}, + "ui": {"trayicon": True}, + } + data_folder_path: Any = "data_folder_path" + + +config = Config() + + +### from pynicotine.core import core +class User: + login_status: Any = UserStatus.OFFLINE + + +class Core: + users = User() + + +core = Core() +### from pynicotine.gtkgui.application import GTK_API_VERSION +GTK_API_VERSION = 4 + +## from pynicotine.gtkgui.application import GTK_GUI_FOLDER_PATH +GTK_GUI_FOLDER_PATH = "assets" +LONG_PATH_PREFIX = "\\\\?\\" + + +# from pynicotine.gtkgui.widgets.theme import ICON_THEME +class IconTheme: + def lookup_icon(self, icon_name: str, **kwargs: Any) -> None: + return None + + +ICON_THEME = IconTheme() + + +# from pynicotine.gtkgui.widgets.window import Window +class CWindow: + activation_token = None + + +Window = CWindow() + +### from pynicotine.logfacility import log + +import logging + + +class MyLog: + def __init__(self) -> None: + self.log = logging.getLogger(__name__) + + def add_debug(self, *args: Any, **kwargs: Any) -> None: + return + self.log.debug(*args, **kwargs) + + +log = MyLog() + + +### from pynicotine.utils import encode_path + + +def encode_path(path: str, prefix: bool = True) -> bytes: + """Converts a file path to bytes for processing by the system. + + On Windows, also append prefix to enable extended-length path. + """ + + if sys.platform == "win32" and prefix: + path = path.replace("/", "\\") + + if path.startswith("\\\\"): + path = "UNC" + path[1:] + + path = LONG_PATH_PREFIX + path + + return path.encode("utf-8") + + +# from pynicotine.utils import truncate_string_byte +def truncate_string_byte( + string: str, byte_limit: int, encoding: str = "utf-8", ellipsize: bool = False +) -> str: + """Truncates a string to fit inside a byte limit.""" + + string_bytes = string.encode(encoding) + + if len(string_bytes) <= byte_limit: + # Nothing to do, return original string + return string + + if ellipsize: + ellipsis_char = "…".encode(encoding) + string_bytes = ( + string_bytes[: max(byte_limit - len(ellipsis_char), 0)].rstrip() + + ellipsis_char + ) + else: + string_bytes = string_bytes[:byte_limit] + + return string_bytes.decode(encoding, "ignore") + + +################################################ + + +class ImplUnavailableError(Exception): + pass + + +class BaseImplementation: + def __init__(self, application: Gtk.Application) -> None: + self.application = application + self.menu_items: dict[int, Any] = {} + self.menu_item_id: int = 1 + self.activate_callback: Callable = application.on_window_hide_unhide + self.is_visible: bool = True + + self.create_menu() + + def create_item( + self, + text: str | None = None, + callback: Callable | None = None, + check: bool = False, + ) -> dict[str, Any]: + item: dict[str, Any] = {"id": self.menu_item_id, "sensitive": True} + + if text is not None: + item["text"] = text + + if callback is not None: + item["callback"] = callback + + if check: + item["toggled"] = False + + self.menu_items[self.menu_item_id] = item + self.menu_item_id += 1 + + return item + + @staticmethod + def set_item_text(item: dict[str, Any], text: str | None) -> None: + item["text"] = text + + @staticmethod + def set_item_sensitive(item: dict[str, Any], sensitive: bool) -> None: + item["sensitive"] = sensitive + + @staticmethod + def set_item_toggled(item: dict[str, Any], toggled: bool) -> None: + item["toggled"] = toggled + + def create_menu(self) -> None: + self.show_hide_item = self.create_item( + "default", self.application.dummy_menu_entry + ) + + self.connect_disconnect_item = self.create_item( + "default", self.application.dummy_menu_entry + ) + + self.create_item() + + self.create_item("_Quit", self.application.dummy_menu_entry) + + def update_window_visibility(self) -> None: + if self.application.window is None: + return + + if self.application.window.is_visible(): + label = "Hide VM Manager" + else: + label = "Show VM Manager" + + self.set_item_text(self.show_hide_item, label) + self.update_menu() + + def update_user_status(self) -> None: + sensitive = core.users.login_status != slskmessages.UserStatus.OFFLINE + label = "_Disconnect" if sensitive else "_Connect" + + # self.set_item_sensitive(self.away_item, sensitive) + + self.set_item_text(self.connect_disconnect_item, label) + # self.set_item_toggled(self.away_item, core.users.login_status == slskmessages.UserStatus.AWAY) + + self.update_icon() + self.update_menu() + + def update_icon(self) -> None: + pass + # # Check for highlights, and display highlight icon if there is a highlighted room or private chat + # if (self.application.window + # and (self.application.window.chatrooms.highlighted_rooms + # or self.application.window.privatechat.highlighted_users)): + # icon_name = "msg" + + # elif core.users.login_status == slskmessages.UserStatus.ONLINE: + # icon_name = "connect" + + # elif core.users.login_status == slskmessages.UserStatus.AWAY: + # icon_name = "away" + + # else: + # icon_name = "disconnect" + + # icon_name = f"{pynicotine.__application_id__}-{icon_name}" + # self.set_icon_name(icon_name) + + def set_icon_name(self, icon_name: str) -> None: + # Implemented in subclasses + pass + + def update_icon_theme(self) -> None: + # Implemented in subclasses + pass + + def update_menu(self) -> None: + # Implemented in subclasses + pass + + def set_download_status(self, status: str) -> None: + self.update_menu() + + def set_upload_status(self, status) -> None: + self.update_menu() + + def show_notification(self, title, message) -> None: + # Implemented in subclasses + pass + + def unload(self, is_shutdown=False) -> None: + # Implemented in subclasses + pass + + +class StatusNotifierImplementation(BaseImplementation): + class DBusProperty: + def __init__(self, name, signature, value) -> None: + self.name = name + self.signature = signature + self.value = value + + class DBusSignal: + def __init__(self, name, args) -> None: + self.name = name + self.args = args + + class DBusMethod: + def __init__(self, name, in_args, out_args, callback) -> None: + self.name = name + self.in_args = in_args + self.out_args = out_args + self.callback = callback + + class DBusService: + def __init__(self, interface_name, object_path, bus_type) -> None: + self._interface_name = interface_name + self._object_path = object_path + + self._bus = Gio.bus_get_sync(bus_type) + self._registration_id = None + self.properties: Any = {} + self.signals: Any = {} + self.methods: Any = {} + + def register(self): + xml_output = f"" + + for property_name, prop in self.properties.items(): + xml_output += f"" + + for method_name, method in self.methods.items(): + xml_output += f"" + + for in_signature in method.in_args: + xml_output += f"" + for out_signature in method.out_args: + xml_output += f"" + + xml_output += "" + + for signal_name, signal in self.signals.items(): + xml_output += f"" + + for signature in signal.args: + xml_output += f"" + + xml_output += "" + + xml_output += "" + + registration_id = self._bus.register_object( + object_path=self._object_path, + interface_info=Gio.DBusNodeInfo.new_for_xml(xml_output).interfaces[0], + method_call_closure=self.on_method_call, + get_property_closure=self.on_get_property, + ) + + if not registration_id: + raise GLib.Error( + f"Failed to register object with path {self._object_path}" + ) + + self._registration_id = registration_id + + def unregister(self) -> None: + if self._registration_id is None: + return + + self._bus.unregister_object(self._registration_id) + self._registration_id = None + + def add_property(self, name: str, signature: Any, value: Any) -> None: + self.properties[name] = StatusNotifierImplementation.DBusProperty( + name, signature, value + ) + + def add_signal(self, name: str, args: Any) -> None: + self.signals[name] = StatusNotifierImplementation.DBusSignal(name, args) + + def add_method( + self, name: str, in_args: Any, out_args: Any, callback: Any + ) -> None: + self.methods[name] = StatusNotifierImplementation.DBusMethod( + name, in_args, out_args, callback + ) + + def emit_signal(self, name: str, *args: Any) -> None: + arg_types = "".join(self.signals[name].args) + + self._bus.emit_signal( + destination_bus_name=None, + object_path=self._object_path, + interface_name=self._interface_name, + signal_name=name, + parameters=GLib.Variant(f"({arg_types})", args), + ) + + def on_method_call( + self, + _connection, + _sender, + _path, + _interface_name, + method_name, + parameters, + invocation, + ): + method = self.methods[method_name] + result = method.callback(*parameters.unpack()) + out_arg_types = "".join(method.out_args) + return_value = None + + if method.out_args: + return_value = GLib.Variant(f"({out_arg_types})", result) + + invocation.return_value(return_value) + + def on_get_property( + self, _connection, _sender, _path, _interface_name, property_name + ): + prop = self.properties[property_name] + return GLib.Variant(prop.signature, prop.value) + + class DBusMenuService(DBusService): + def __init__(self) -> None: + super().__init__( + interface_name="com.canonical.dbusmenu", + object_path="/org/ayatana/NotificationItem/Nicotine/Menu", + bus_type=Gio.BusType.SESSION, + ) + + self._items: Any = {} + self._revision: Any = 0 + + for method_name, in_args, out_args, callback in ( + ( + "GetGroupProperties", + ("ai", "as"), + ("a(ia{sv})",), + self.on_get_group_properties, + ), + ( + "GetLayout", + ("i", "i", "as"), + ("u", "(ia{sv}av)"), + self.on_get_layout, + ), + ("Event", ("i", "s", "v", "u"), (), self.on_event), + ): + self.add_method(method_name, in_args, out_args, callback) + + for signal_name, value in (("LayoutUpdated", ("u", "i")),): + self.add_signal(signal_name, value) + + def set_items(self, items) -> None: + self._items = items + + self._revision += 1 + self.emit_signal("LayoutUpdated", self._revision, 0) + + @staticmethod + def _serialize_item(item) -> dict[str, Any]: + if "text" in item: + props = { + "label": GLib.Variant("s", item["text"]), + "enabled": GLib.Variant("b", item["sensitive"]), + } + + if item.get("toggled") is not None: + props["toggle-type"] = GLib.Variant("s", "checkmark") + props["toggle-state"] = GLib.Variant("i", int(item["toggled"])) + + return props + + return {"type": GLib.Variant("s", "separator")} + + def on_get_group_properties(self, ids, _properties): + item_properties = [] + + for idx, item in self._items.items(): + if idx in ids: + item_properties.append((idx, self._serialize_item(item))) + + return (item_properties,) + + def on_get_layout(self, _parent_id, _recursion_depth, _property_names): + serialized_items = [] + + for idx, item in self._items.items(): + serialized_item = GLib.Variant( + "(ia{sv}av)", (idx, self._serialize_item(item), []) + ) + serialized_items.append(serialized_item) + + return self._revision, (0, {}, serialized_items) + + def on_event(self, idx, event_id, _data, _timestamp) -> None: + if event_id == "clicked": + self._items[idx]["callback"]() + + class StatusNotifierItemService(DBusService): + def __init__(self, activate_callback) -> None: + super().__init__( + interface_name="org.kde.StatusNotifierItem", + object_path="/org/ayatana/NotificationItem/Nicotine", + bus_type=Gio.BusType.SESSION, + ) + + self.menu = StatusNotifierImplementation.DBusMenuService() + + for property_name, signature, value in ( + ("Category", "s", "Communications"), + ("Id", "s", pynicotine.__application_id__), + ("Title", "s", pynicotine.__application_name__), + ( + "ToolTip", + "(sa(iiay)ss)", + ("", [], pynicotine.__application_name__, ""), + ), + ("Menu", "o", "/org/ayatana/NotificationItem/Nicotine/Menu"), + ("ItemIsMenu", "b", False), + ("IconName", "s", ""), + ("IconThemePath", "s", ""), + ("Status", "s", "Active"), + ): + self.add_property(property_name, signature, value) + + for method_name, in_args, out_args, callback in ( + ("Activate", ("i", "i"), (), activate_callback), + ( + "ProvideXdgActivationToken", + ("s",), + (), + self.on_provide_activation_token, + ), + ): + self.add_method(method_name, in_args, out_args, callback) + + for signal_name, value in ( + ("NewIcon", ()), + ("NewIconThemePath", ("s",)), + ("NewStatus", ("s",)), + ): + self.add_signal(signal_name, value) + + def register(self): + self.menu.register() + super().register() + + def unregister(self): + super().unregister() + self.menu.unregister() + + def on_provide_activation_token(self, token): + Window.activation_token = token + + def __init__(self, application) -> None: + super().__init__(application) + + self.tray_icon: Any = None + self.custom_icons: bool = False + + try: + self.bus = Gio.bus_get_sync(bus_type=Gio.BusType.SESSION) + self.tray_icon = self.StatusNotifierItemService( + activate_callback=self.activate_callback + ) + self.tray_icon.register() + + self.bus.call_sync( + bus_name="org.kde.StatusNotifierWatcher", + object_path="/StatusNotifierWatcher", + interface_name="org.kde.StatusNotifierWatcher", + method_name="RegisterStatusNotifierItem", + parameters=GLib.Variant( + "(s)", ("/org/ayatana/NotificationItem/Nicotine",) + ), + reply_type=None, + flags=Gio.DBusCallFlags.NONE, + timeout_msec=-1, + ) + + except GLib.Error as error: + self.unload() + raise ImplUnavailableError( + f"StatusNotifier implementation not available: {error}" + ) from error + + self.update_menu() + + @staticmethod + def check_icon_path(icon_name, icon_path) -> bool: + """Check if tray icons exist in the specified icon path.""" + + if not icon_path: + return False + + icon_scheme = f"{pynicotine.__application_id__}-{icon_name}." + + try: + with os.scandir(encode_path(icon_path)) as entries: + for entry in entries: + if entry.is_file() and entry.name.decode( + "utf-8", "replace" + ).startswith(icon_scheme): + return True + + except OSError as error: + log.add_debug(f"Error accessing tray icon path {icon_path}: {error}") + + return False + + def get_icon_path(self): + """Returns an icon path to use for tray icons, or None to fall back to + system-wide icons.""" + + self.custom_icons = False + custom_icon_path = os.path.join(config.data_folder_path, ".nicotine-icon-theme") + + if hasattr(sys, "real_prefix") or sys.base_prefix != sys.prefix: + # Virtual environment + local_icon_path = os.path.join( + sys.prefix, "share", "icons", "hicolor", "scalable", "apps" + ) + else: + # Git folder + local_icon_path = os.path.join( + GTK_GUI_FOLDER_PATH, "icons", "hicolor", "scalable", "apps" + ) + + for icon_name in ("away", "connect", "disconnect", "msg"): + # Check if custom icons exist + if self.check_icon_path(icon_name, custom_icon_path): + self.custom_icons = True + return custom_icon_path + + # Check if local icons exist + if self.check_icon_path(icon_name, local_icon_path): + return local_icon_path + + return "" + + def set_icon_name(self, icon_name): + if self.custom_icons: + # Use alternative icon names to enforce custom icons, since system-wide icons take precedence + icon_name = icon_name.replace(pynicotine.__application_id__, "nplus-tray") + + self.tray_icon.properties["IconName"].value = icon_name + self.tray_icon.emit_signal("NewIcon") + + if not self.is_visible: + return + + status = "Active" + + if self.tray_icon.properties["Status"].value != status: + self.tray_icon.properties["Status"].value = status + self.tray_icon.emit_signal("NewStatus", status) + + def update_icon_theme(self): + # If custom icon path was found, use it, otherwise we fall back to system icons + icon_path = self.get_icon_path() + self.tray_icon.properties["IconThemePath"].value = icon_path + self.tray_icon.emit_signal("NewIconThemePath", icon_path) + + if icon_path: + log.add_debug("Using tray icon path %s", icon_path) + + def update_menu(self) -> None: + self.tray_icon.menu.set_items(self.menu_items) + + def unload(self, is_shutdown: bool = False) -> None: + if self.tray_icon is None: + return + + status = "Passive" + + self.tray_icon.properties["Status"].value = status + self.tray_icon.emit_signal("NewStatus", status) + + if is_shutdown: + self.tray_icon.unregister() + + +class Win32Implementation(BaseImplementation): + """Windows NotifyIcon implementation. + + https://learn.microsoft.com/en-us/windows/win32/shell/notification-area + https://learn.microsoft.com/en-us/windows/win32/shell/taskbar + """ + + WINDOW_CLASS_NAME = "NicotineTrayIcon" + + NIM_ADD = 0 + NIM_MODIFY = 1 + NIM_DELETE = 2 + + NIF_MESSAGE = 1 + NIF_ICON = 2 + NIF_TIP = 4 + NIF_INFO = 16 + NIIF_NOSOUND = 16 + + MIIM_STATE = 1 + MIIM_ID = 2 + MIIM_STRING = 64 + + MFS_ENABLED = 0 + MFS_UNCHECKED = 0 + MFS_DISABLED = 3 + MFS_CHECKED = 8 + + MFT_SEPARATOR = 2048 + + WM_NULL = 0 + WM_DESTROY = 2 + WM_CLOSE = 16 + WM_COMMAND = 273 + WM_LBUTTONUP = 514 + WM_RBUTTONUP = 517 + WM_USER = 1024 + WM_TRAYICON = WM_USER + 1 + NIN_BALLOONHIDE = WM_USER + 3 + NIN_BALLOONTIMEOUT = WM_USER + 4 + NIN_BALLOONUSERCLICK = WM_USER + 5 + + CS_VREDRAW = 1 + CS_HREDRAW = 2 + COLOR_WINDOW = 5 + IDC_ARROW = 32512 + + WS_OVERLAPPED = 0 + WS_SYSMENU = 524288 + CW_USEDEFAULT = -2147483648 + + IMAGE_ICON = 1 + LR_LOADFROMFILE = 16 + SM_CXSMICON = 49 + + if sys.platform == "win32": + from ctypes import Structure + + class WNDCLASSW(Structure): + from ctypes import CFUNCTYPE, wintypes + + LPFN_WND_PROC = CFUNCTYPE( + wintypes.INT, + wintypes.HWND, + wintypes.UINT, + wintypes.WPARAM, + wintypes.LPARAM, + ) + _fields_: ClassVar = [ + ("style", wintypes.UINT), + ("lpfn_wnd_proc", LPFN_WND_PROC), + ("cb_cls_extra", wintypes.INT), + ("cb_wnd_extra", wintypes.INT), + ("h_instance", wintypes.HINSTANCE), + ("h_icon", wintypes.HICON), + ("h_cursor", wintypes.HANDLE), + ("hbr_background", wintypes.HBRUSH), + ("lpsz_menu_name", wintypes.LPCWSTR), + ("lpsz_class_name", wintypes.LPCWSTR), + ] + + class MENUITEMINFOW(Structure): + from ctypes import wintypes + + _fields_: ClassVar = [ + ("cb_size", wintypes.UINT), + ("f_mask", wintypes.UINT), + ("f_type", wintypes.UINT), + ("f_state", wintypes.UINT), + ("w_id", wintypes.UINT), + ("h_sub_menu", wintypes.HMENU), + ("hbmp_checked", wintypes.HBITMAP), + ("hbmp_unchecked", wintypes.HBITMAP), + ("dw_item_data", wintypes.LPVOID), + ("dw_type_data", wintypes.LPWSTR), + ("cch", wintypes.UINT), + ("hbmp_item", wintypes.HBITMAP), + ] + + class NOTIFYICONDATAW(Structure): + from ctypes import wintypes + + _fields_: ClassVar = [ + ("cb_size", wintypes.DWORD), + ("h_wnd", wintypes.HWND), + ("u_id", wintypes.UINT), + ("u_flags", wintypes.UINT), + ("u_callback_message", wintypes.UINT), + ("h_icon", wintypes.HICON), + ("sz_tip", wintypes.WCHAR * 128), + ("dw_state", wintypes.DWORD), + ("dw_state_mask", wintypes.DWORD), + ("sz_info", wintypes.WCHAR * 256), + ("u_version", wintypes.UINT), + ("sz_info_title", wintypes.WCHAR * 64), + ("dw_info_flags", wintypes.DWORD), + ("guid_item", wintypes.CHAR * 16), + ("h_balloon_icon", wintypes.HICON), + ] + + def __init__(self, application: Gtk.Application) -> None: + from ctypes import windll # type: ignore + + super().__init__(application) + + self._window_class: Any = None + self._h_wnd = None + self._notify_id = None + self._h_icon = None + self._menu = None + self._wm_taskbarcreated = windll.user32.RegisterWindowMessageW("TaskbarCreated") + + self._register_class() + self._create_window() + self.update_icon() + + def _register_class(self) -> None: + from ctypes import byref, windll # type: ignore + + self._window_class = self.WNDCLASSW( # type: ignore + style=(self.CS_VREDRAW | self.CS_HREDRAW), + lpfn_wnd_proc=self.WNDCLASSW.LPFN_WND_PROC(self.on_process_window_message), # type: ignore + h_cursor=windll.user32.LoadCursorW(0, self.IDC_ARROW), + hbr_background=self.COLOR_WINDOW, + lpsz_class_name=self.WINDOW_CLASS_NAME, + ) + + windll.user32.RegisterClassW(byref(self._window_class)) + + def _unregister_class(self): + if self._window_class is None: + return + + from ctypes import windll + + windll.user32.UnregisterClassW( + self.WINDOW_CLASS_NAME, self._window_class.h_instance + ) + self._window_class = None + + def _create_window(self) -> None: + from ctypes import windll # type: ignore + + style = self.WS_OVERLAPPED | self.WS_SYSMENU + self._h_wnd = windll.user32.CreateWindowExW( + 0, + self.WINDOW_CLASS_NAME, + self.WINDOW_CLASS_NAME, + style, + 0, + 0, + self.CW_USEDEFAULT, + self.CW_USEDEFAULT, + 0, + 0, + 0, + None, + ) + + windll.user32.UpdateWindow(self._h_wnd) + + def _destroy_window(self): + if self._h_wnd is None: + return + + from ctypes import windll + + windll.user32.DestroyWindow(self._h_wnd) + self._h_wnd = None + + def _load_ico_buffer(self, icon_name, icon_size): + ico_buffer = b"" + + if GTK_API_VERSION >= 4: + icon = ICON_THEME.lookup_icon( + icon_name, fallbacks=None, size=icon_size, scale=1, direction=0, flags=0 + ) + icon_path = icon.get_file().get_path() + + if not icon_path: + return ico_buffer + + pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_size( + icon_path, icon_size, icon_size + ) + else: + icon = ICON_THEME.lookup_icon(icon_name, size=icon_size, flags=0) + + if not icon: + return ico_buffer + + pixbuf = icon.load_icon() + + _success, ico_buffer = pixbuf.save_to_bufferv("ico") + return ico_buffer + + def _load_h_icon(self, icon_name): + from ctypes import windll + + # Attempt to load custom icons first + icon_size = windll.user32.GetSystemMetrics(self.SM_CXSMICON) + ico_buffer = self._load_ico_buffer( + icon_name.replace(f"{pynicotine.__application_id__}-", "nplus-tray-"), + icon_size, + ) + + if not ico_buffer: + # No custom icons present, fall back to default icons + ico_buffer = self._load_ico_buffer(icon_name, icon_size) + + try: + import tempfile + + file_handle = tempfile.NamedTemporaryFile(delete=False) + + with file_handle: + file_handle.write(ico_buffer) + + return windll.user32.LoadImageA( + 0, + encode_path(file_handle.name), + self.IMAGE_ICON, + icon_size, + icon_size, + self.LR_LOADFROMFILE, + ) + + finally: + os.remove(file_handle.name) + + def _destroy_h_icon(self): + from ctypes import windll + + if self._h_icon: + windll.user32.DestroyIcon(self._h_icon) + self._h_icon = None + + def _update_notify_icon(self, title="", message="", icon_name=None): + # pylint: disable=attribute-defined-outside-init,no-member + + if self._h_wnd is None: + return + + if icon_name: + self._destroy_h_icon() + self._h_icon = self._load_h_icon(icon_name) + + if not self.is_visible and not (title or message): + # When disabled by user, temporarily show tray icon when displaying a notification + return + + from ctypes import byref, sizeof, windll + + action = self.NIM_MODIFY + + if self._notify_id is None: + self._notify_id = self.NOTIFYICONDATAW( + cb_size=sizeof(self.NOTIFYICONDATAW), + h_wnd=self._h_wnd, + u_id=0, + u_flags=( + self.NIF_ICON | self.NIF_MESSAGE | self.NIF_TIP | self.NIF_INFO + ), + u_callback_message=self.WM_TRAYICON, + sz_tip=truncate_string_byte( + pynicotine.__application_name__, byte_limit=127 + ), + ) + action = self.NIM_ADD + + if config.sections["notifications"]["notification_popup_sound"]: + self._notify_id.dw_info_flags &= ~self.NIIF_NOSOUND + else: + self._notify_id.dw_info_flags |= self.NIIF_NOSOUND + + self._notify_id.h_icon = self._h_icon + self._notify_id.sz_info_title = truncate_string_byte( + title, byte_limit=63, ellipsize=True + ) + self._notify_id.sz_info = truncate_string_byte( + message, byte_limit=255, ellipsize=True + ) + + windll.shell32.Shell_NotifyIconW(action, byref(self._notify_id)) + + def _remove_notify_icon(self): + from ctypes import byref, windll + + if self._notify_id: + windll.shell32.Shell_NotifyIconW(self.NIM_DELETE, byref(self._notify_id)) + self._notify_id = None + + if self._menu: + windll.user32.DestroyMenu(self._menu) + self._menu = None + + def _serialize_menu_item(self, item): + # pylint: disable=attribute-defined-outside-init,no-member + + from ctypes import sizeof + + item_info = self.MENUITEMINFOW(cb_size=sizeof(self.MENUITEMINFOW)) + w_id = item["id"] + text = item.get("text") + is_checked = item.get("toggled") + is_sensitive = item.get("sensitive") + + item_info.f_mask |= self.MIIM_ID + item_info.w_id = w_id + + if text is not None: + item_info.f_mask |= self.MIIM_STRING + item_info.dw_type_data = text.replace("_", "&") # Mnemonics use & + else: + item_info.f_type |= self.MFT_SEPARATOR + + if is_checked is not None: + item_info.f_mask |= self.MIIM_STATE + item_info.f_state |= self.MFS_CHECKED if is_checked else self.MFS_UNCHECKED + + if is_sensitive is not None: + item_info.f_mask |= self.MIIM_STATE + item_info.f_state |= self.MFS_ENABLED if is_sensitive else self.MFS_DISABLED + + return item_info + + def _show_menu(self): + from ctypes import byref, windll, wintypes + + if self._menu is None: + self.update_menu() + + pos = wintypes.POINT() + windll.user32.GetCursorPos(byref(pos)) + + # PRB: Menus for Notification Icons Do Not Work Correctly + # https://web.archive.org/web/20121015064650/http://support.microsoft.com/kb/135788 + + windll.user32.SetForegroundWindow(self._h_wnd) + windll.user32.TrackPopupMenu(self._menu, 0, pos.x, pos.y, 0, self._h_wnd, None) + windll.user32.PostMessageW(self._h_wnd, self.WM_NULL, 0, 0) + + def update_menu(self): + from ctypes import byref, windll + + if self._menu is None: + self._menu = windll.user32.CreatePopupMenu() + + for item in self.menu_items.values(): + item_id = item["id"] + item_info = self._serialize_menu_item(item) + + if not windll.user32.SetMenuItemInfoW( + self._menu, item_id, False, byref(item_info) + ): + windll.user32.InsertMenuItemW( + self._menu, item_id, False, byref(item_info) + ) + + def set_icon_name(self, icon_name): + self._update_notify_icon(icon_name=icon_name) + + def show_notification(self, title, message): + self._update_notify_icon(title=title, message=message) + + def on_process_window_message(self, h_wnd, msg, w_param, l_param): + from ctypes import windll, wintypes + + if msg == self.WM_TRAYICON: + if l_param == self.WM_RBUTTONUP: + # Icon pressed + self._show_menu() + + elif l_param == self.WM_LBUTTONUP: + # Icon pressed + self.activate_callback() + + elif l_param in ( + self.NIN_BALLOONHIDE, + self.NIN_BALLOONTIMEOUT, + self.NIN_BALLOONUSERCLICK, + ): + if not config.sections["ui"]["trayicon"]: + # Notification dismissed, but user has disabled tray icon + self._remove_notify_icon() + + elif msg == self.WM_COMMAND: + # Menu item pressed + menu_item_id = w_param & 0xFFFF + menu_item_callback = self.menu_items[menu_item_id]["callback"] + menu_item_callback() + + elif msg == self._wm_taskbarcreated: + # Taskbar process restarted, create new icon + self._remove_notify_icon() + self._update_notify_icon() + + return windll.user32.DefWindowProcW( + wintypes.HWND(h_wnd), + msg, + wintypes.WPARAM(w_param), + wintypes.LPARAM(l_param), + ) + + def unload(self, is_shutdown=False): + self._remove_notify_icon() + + if not is_shutdown: + # Keep notification support as long as we're running + return + + self._destroy_h_icon() + self._destroy_window() + self._unregister_class() + + +class TrayIcon: + def __init__(self, application: Gtk.Application) -> None: + self.application: Gtk.Application = application + self.available: bool = True + self.implementation: Any = None + + self.watch_availability() + self.load() + + def watch_availability(self) -> None: + if sys.platform in {"win32", "darwin"}: + return + + Gio.bus_watch_name( + bus_type=Gio.BusType.SESSION, + name="org.kde.StatusNotifierWatcher", + flags=Gio.BusNameWatcherFlags.NONE, + name_appeared_closure=self.load, + name_vanished_closure=self.unload, + ) + + def load(self, *_args: Any) -> None: + self.available = True + + if sys.platform == "win32": + # Always keep tray icon loaded for Windows notification support + pass + + elif not config.sections["ui"]["trayicon"]: + # No need to have tray icon loaded now (unless this is Windows) + return + + if self.implementation is None: + if sys.platform == "win32": + self.implementation = Win32Implementation(self.application) + else: + try: + self.implementation = StatusNotifierImplementation(self.application) # type: ignore + + except ImplUnavailableError: + self.available = False + return + + self.refresh_state() + + def update_window_visibility(self) -> None: + if self.implementation: + self.implementation.update_window_visibility() + + def update_user_status(self) -> None: + if self.implementation: + self.implementation.update_user_status() + + def update_icon(self) -> None: + if self.implementation: + self.implementation.update_icon() + + def update_icon_theme(self) -> None: + if self.implementation: + self.implementation.update_icon_theme() + + def set_download_status(self, status: str) -> None: + if self.implementation: + self.implementation.set_download_status(status) + + def set_upload_status(self, status: str) -> None: + if self.implementation: + self.implementation.set_upload_status(status) + + def show_notification(self, title: str, message: str) -> None: + if self.implementation: + self.implementation.show_notification(title=title, message=message) + + def refresh_state(self) -> None: + if not self.implementation: + return + + self.implementation.is_visible = True + + self.update_icon_theme() + self.update_icon() + self.update_window_visibility() + self.update_user_status() + + def unload(self, *_args: Any, is_shutdown: bool = False) -> None: + if self.implementation: + self.implementation.unload(is_shutdown=is_shutdown) + self.implementation.is_visible = False + + if is_shutdown: + self.implementation = None + + def destroy(self) -> None: + self.unload(is_shutdown=True) + self.__dict__.clear() diff --git a/pkgs/clan-vm-manager/clan_vm_manager/views/list.py b/pkgs/clan-vm-manager/clan_vm_manager/views/list.py index 90f2edfb..ed3279b8 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/views/list.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/views/list.py @@ -100,12 +100,10 @@ class ClanList(Gtk.Box): box.set_valign(Gtk.Align.CENTER) add_button = Gtk.MenuButton() - add_button.set_icon_name("list-add") add_button.set_has_frame(False) add_button.set_menu_model(menu_model) - + add_button.set_label("Add machine") box.append(add_button) - box.append(Gtk.Label.new("Add machine")) grp.set_header_suffix(box) @@ -192,6 +190,7 @@ class ClanList(Gtk.Box): switch.connect("notify::active", partial(self.on_row_toggle, vm)) vm.connect("vm_status_changed", partial(self.vm_status_changed, switch)) + vm.connect("build_vm", self.build_vm) # suffix.append(box) row.add_suffix(box) @@ -295,6 +294,12 @@ class ClanList(Gtk.Box): row.set_state(True) vm.stop() + def build_vm(self, vm: VM, _vm: VM, building: bool) -> None: + if building: + log.info("Building VM") + else: + log.info("VM built") + def vm_status_changed(self, switch: Gtk.Switch, vm: VM, _vm: VM) -> None: switch.set_active(vm.is_running()) switch.set_state(vm.is_running()) diff --git a/pkgs/clan-vm-manager/default.nix b/pkgs/clan-vm-manager/default.nix index 8d3f2a65..3923b0a2 100644 --- a/pkgs/clan-vm-manager/default.nix +++ b/pkgs/clan-vm-manager/default.nix @@ -57,7 +57,7 @@ python3.pkgs.buildPythonApplication { ''; desktopItems = [ (makeDesktopItem { - name = "clan-vm-manager"; + name = "lol.clan.vm.manager"; exec = "clan-vm-manager %u"; icon = ./clan_vm_manager/assets/clan_white.png; desktopName = "cLAN Manager"; diff --git a/pkgs/clan-vm-manager/shell.nix b/pkgs/clan-vm-manager/shell.nix index 116c02c2..efe90b72 100644 --- a/pkgs/clan-vm-manager/shell.nix +++ b/pkgs/clan-vm-manager/shell.nix @@ -20,18 +20,18 @@ mkShell { ln -snf ${clan-vm-manager} result - # install desktop file set -eou pipefail - DESKTOP_DST=~/.local/share/applications/clan-vm-manager.desktop - DESKTOP_SRC=${clan-vm-manager}/share/applications/clan-vm-manager.desktop + DESKTOP_FILE_NAME=lol.clan.vm.manager.desktop + DESKTOP_DST=~/.local/share/applications/$DESKTOP_FILE_NAME + DESKTOP_SRC=${clan-vm-manager}/share/applications/$DESKTOP_FILE_NAME # UI_BIN="env GTK_DEBUG=interactive ${clan-vm-manager}/bin/clan-vm-manager" UI_BIN="${clan-vm-manager}/bin/clan-vm-manager" cp -f $DESKTOP_SRC $DESKTOP_DST sleep 2 sed -i "s|Exec=.*clan-vm-manager|Exec=$UI_BIN|" $DESKTOP_DST - xdg-mime default clan-vm-manager.desktop x-scheme-handler/clan + xdg-mime default $DESKTOP_FILE_NAME x-scheme-handler/clan echo "==== Validating desktop file installation ====" set -x desktop-file-validate $DESKTOP_DST