diff --git a/formatter.nix b/formatter.nix index bee296b9..8998825e 100644 --- a/formatter.nix +++ b/formatter.nix @@ -23,6 +23,14 @@ (self'.packages.clan-app.externalTestDeps or [ ]) ++ self'.packages.clan-cli.testDependencies; modules = [ "clan_app" ]; }; + "pkgs/clan-vm-manager" = { + extraPythonPackages = + # clan-app currently only exists on linux + (self'.packages.clan-vm-manager.externalTestDeps or [ ]) + ++ self'.packages.clan-vm-manager.testDependencies + ++ self'.packages.clan-cli.testDependencies; + modules = [ "clan_vm_manager" ]; + }; }; treefmt.programs.ruff.check = true; treefmt.programs.ruff.format = true; diff --git a/pkgs/clan-cli/clan_cli/clan_uri.py b/pkgs/clan-cli/clan_cli/clan_uri.py index c1d5aaf0..5f5b7eb8 100644 --- a/pkgs/clan-cli/clan_cli/clan_uri.py +++ b/pkgs/clan-cli/clan_cli/clan_uri.py @@ -18,9 +18,7 @@ class FlakeId: ), f"Flake {self._value} has an invalid type: {type(self._value)}" def __str__(self) -> str: - return str( - self._value - ) # The __str__ method returns a custom string representation + return str(self._value) @property def path(self) -> Path: diff --git a/pkgs/clan-cli/clan_cli/machines/machines.py b/pkgs/clan-cli/clan_cli/machines/machines.py index 57c85c7a..427e9da5 100644 --- a/pkgs/clan-cli/clan_cli/machines/machines.py +++ b/pkgs/clan-cli/clan_cli/machines/machines.py @@ -25,6 +25,9 @@ class Machine: _eval_cache: dict[str, str] = field(default_factory=dict) _build_cache: dict[str, Path] = field(default_factory=dict) + def get_id(self) -> str: + return f"{self.flake}#{self.name}" + def flush_caches(self) -> None: self.cached_deployment = None self._build_cache.clear() diff --git a/pkgs/clan-cli/clan_cli/vms/inspect.py b/pkgs/clan-cli/clan_cli/vms/inspect.py index 3c00198a..71f017bf 100644 --- a/pkgs/clan-cli/clan_cli/vms/inspect.py +++ b/pkgs/clan-cli/clan_cli/vms/inspect.py @@ -21,6 +21,10 @@ class VmConfig: graphics: bool waypipe: bool = False + def __post_init__(self) -> None: + if isinstance(self.flake_url, str): + self.flake_url = FlakeId(self.flake_url) + def inspect_vm(machine: Machine) -> VmConfig: data = json.loads(machine.eval_nix("config.clan.core.vm.inspect")) diff --git a/pkgs/clan-cli/clan_cli/vms/qemu.py b/pkgs/clan-cli/clan_cli/vms/qemu.py index d21df202..9f5969ff 100644 --- a/pkgs/clan-cli/clan_cli/vms/qemu.py +++ b/pkgs/clan-cli/clan_cli/vms/qemu.py @@ -1,8 +1,13 @@ import os import random +from collections.abc import Generator +from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path +from clan_cli.qemu.qmp import QEMUMonitorProtocol + +from ..errors import ClanError from .inspect import VmConfig @@ -145,3 +150,25 @@ def qemu_command( else: command.append("-nographic") return QemuCommand(command, vsock_cid=vsock_cid) + + +class QMPWrapper: + def __init__(self, state_dir: Path) -> None: + # These sockets here are just symlinks to the real sockets which + # are created by the run.py file. The reason being that we run into + # file path length issues on Linux. If no qemu process is running + # the symlink will be dangling. + self._qmp_socket: Path = state_dir / "qmp.sock" + self._qga_socket: Path = state_dir / "qga.sock" + + @contextmanager + def qmp_ctx(self) -> Generator[QEMUMonitorProtocol, None, None]: + rpath = self._qmp_socket.resolve() + if not rpath.exists(): + raise ClanError(f"qmp socket {rpath} does not exist. Is the VM running?") + qmp = QEMUMonitorProtocol(str(rpath)) + qmp.connect() + try: + yield qmp + finally: + qmp.close() diff --git a/pkgs/clan-cli/clan_cli/vms/run.py b/pkgs/clan-cli/clan_cli/vms/run.py index a0fdbde8..6f2082d7 100644 --- a/pkgs/clan-cli/clan_cli/vms/run.py +++ b/pkgs/clan-cli/clan_cli/vms/run.py @@ -110,7 +110,7 @@ def run_vm( nix_options: list[str] = [], ) -> None: with ExitStack() as stack: - machine = Machine(vm.machine_name, vm.flake_url) + machine = Machine(name=vm.machine_name, flake=vm.flake_url) log.debug(f"Creating VM for {machine}") # store the temporary rootfs inside XDG_CACHE_HOME on the host diff --git a/pkgs/clan-vm-manager/.envrc b/pkgs/clan-vm-manager/.envrc new file mode 100644 index 00000000..7c18dfe0 --- /dev/null +++ b/pkgs/clan-vm-manager/.envrc @@ -0,0 +1,7 @@ +# shellcheck shell=bash +source_up + +watch_file flake-module.nix shell.nix default.nix + +# Because we depend on nixpkgs sources, uploading to builders takes a long time +use flake .#clan-vm-manager --builders '' diff --git a/pkgs/clan-vm-manager/.gitignore b/pkgs/clan-vm-manager/.gitignore new file mode 100644 index 00000000..6537faba --- /dev/null +++ b/pkgs/clan-vm-manager/.gitignore @@ -0,0 +1 @@ +**/.vscode \ No newline at end of file diff --git a/pkgs/clan-vm-manager/README.md b/pkgs/clan-vm-manager/README.md new file mode 100644 index 00000000..8a31c44e --- /dev/null +++ b/pkgs/clan-vm-manager/README.md @@ -0,0 +1,94 @@ +# Clan VM Manager + +Provides users with the simple functionality to manage their locally registered clans. + +![app-preview](screenshots/image.png) + +## Available commands + +Run this application + +```bash +./bin/clan-vm-manager +``` + +Join the default machine of a clan + +```bash +./bin/clan-vm-manager [clan-uri] +``` + +Join a specific machine of a clan + +```bash +./bin/clan-vm-manager [clan-uri]#[machine] +``` + +For more available commands see the developer section below. + +## Developing this Application + +### Debugging Style and Layout + +```bash +# Enable the GTK debugger +gsettings set org.gtk.Settings.Debug enable-inspector-keybinding true + +# Start the application with the debugger attached +GTK_DEBUG=interactive ./bin/clan-vm-manager --debug +``` + +Appending `--debug` flag enables debug logging printed into the console. + +### Profiling + +To activate profiling you can run + +```bash +PERF=1 ./bin/clan-vm-manager +``` + +### Library Components + +> Note: +> +> we recognized bugs when starting some cli-commands through the integrated vs-code terminal. +> If encountering issues make sure to run commands in a regular os-shell. + +lib-Adw has a demo application showing all widgets. You can run it by executing + +```bash +adwaita-1-demo +``` + +GTK4 has a demo application showing all widgets. You can run it by executing + +```bash +gtk4-widget-factory +``` + +To find available icons execute + +```bash +gtk4-icon-browser +``` + +### Links + +Here are some important documentation links related to the Clan VM Manager: + +- [Adw PyGobject Reference](http://lazka.github.io/pgi-docs/index.html#Adw-1): This link provides the PyGObject reference documentation for the Adw library, which is used in the Clan VM Manager. It contains detailed information about the Adw widgets and their usage. + +- [GTK4 PyGobject Reference](http://lazka.github.io/pgi-docs/index.html#Gtk-4.0): This link provides the PyGObject reference documentation for GTK4, the toolkit used for building the user interface of the Clan VM Manager. It includes information about GTK4 widgets, signals, and other features. + +- [Adw Widget Gallery](https://gnome.pages.gitlab.gnome.org/libadwaita/doc/main/widget-gallery.html): This link showcases a widget gallery for Adw, allowing you to see the available widgets and their visual appearance. It can be helpful for designing the user interface of the Clan VM Manager. + +- [Python + GTK3 Tutorial](https://python-gtk-3-tutorial.readthedocs.io/en/latest/textview.html): Although the Clan VM Manager uses GTK4, this tutorial for GTK3 can still be useful as it covers the basics of building GTK-based applications with Python. It includes examples and explanations for various GTK widgets, including text views. + +- [GNOME Human Interface Guidelines](https://developer.gnome.org/hig/): This link provides the GNOME Human Interface Guidelines, which offer design and usability recommendations for creating GNOME applications. It covers topics such as layout, navigation, and interaction patterns. + +## Error handling + +> Error dialogs should be avoided where possible, since they are disruptive. +> +> For simple non-critical errors, toasts can be a good alternative. \ No newline at end of file diff --git a/pkgs/clan-vm-manager/bin/clan-vm-manager b/pkgs/clan-vm-manager/bin/clan-vm-manager new file mode 100755 index 00000000..a6c6a17b --- /dev/null +++ b/pkgs/clan-vm-manager/bin/clan-vm-manager @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 +import sys +from pathlib import Path + +module_path = Path(__file__).parent.parent.absolute() + +sys.path.insert(0, str(module_path)) +sys.path.insert(0, str(module_path.parent / "clan_cli")) + +from clan_vm_manager import main # NOQA + +if __name__ == "__main__": + main() diff --git a/pkgs/clan-vm-manager/clan-vm-manager.code-workspace b/pkgs/clan-vm-manager/clan-vm-manager.code-workspace new file mode 100644 index 00000000..1a108d5c --- /dev/null +++ b/pkgs/clan-vm-manager/clan-vm-manager.code-workspace @@ -0,0 +1,43 @@ +{ + "folders": [ + { + "path": "." + }, + { + "path": "../clan-cli/clan_cli" + }, + { + "path": "../clan-cli/tests" + }, + { + "path": "../../nixosModules" + }, + { + "path": "../../lib/build-clan" + } + ], + "settings": { + "python.linting.mypyEnabled": true, + "files.exclude": { + "**/__pycache__": true, + "**/.direnv": true, + "**/.hypothesis": true, + "**/.mypy_cache": true, + "**/.reports": true, + "**/.ruff_cache": true, + "**/result/**": true, + "/nix/store/**": true + }, + "search.exclude": { + "**/__pycache__": true, + "**/.direnv": true, + "**/.hypothesis": true, + "**/.mypy_cache": true, + "**/.reports": true, + "**/.ruff_cache": true, + "**/result/": true, + "/nix/store/**": true + }, + "files.autoSave": "off" + } +} diff --git a/pkgs/clan-vm-manager/clan_vm_manager/__init__.py b/pkgs/clan-vm-manager/clan_vm_manager/__init__.py new file mode 100644 index 00000000..d1cc2a96 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/__init__.py @@ -0,0 +1,14 @@ +import logging +import sys + +from clan_cli.profiler import profile + +from clan_vm_manager.app import MainApplication + +log = logging.getLogger(__name__) + + +@profile +def main(argv: list[str] = sys.argv) -> int: + app = MainApplication() + return app.run(argv) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/__main__.py b/pkgs/clan-vm-manager/clan_vm_manager/__main__.py new file mode 100644 index 00000000..daf509ab --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/__main__.py @@ -0,0 +1,6 @@ +import sys + +from . import main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/app.py b/pkgs/clan-vm-manager/clan_vm_manager/app.py new file mode 100644 index 00000000..d247f7f6 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/app.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +import logging +from typing import Any, ClassVar + +import gi + +from clan_vm_manager import assets +from clan_vm_manager.singletons.toast import InfoToast, ToastOverlay + +gi.require_version("Gtk", "4.0") +gi.require_version("Adw", "1") + +from clan_cli.custom_logger import setup_logging +from gi.repository import Adw, Gdk, Gio, Gtk + +from clan_vm_manager.components.interfaces import ClanConfig +from clan_vm_manager.singletons.use_join import GLib, GObject + +from .windows.main_window import MainWindow + +log = logging.getLogger(__name__) + + +class MainApplication(Adw.Application): + """ + This class is initialized every time the app is started + Only the Adw.ApplicationWindow is a singleton. + So don't use any singletons in the Adw.Application class. + """ + + __gsignals__: ClassVar = { + "join_request": (GObject.SignalFlags.RUN_FIRST, None, [str]), + } + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__( + application_id="org.clan.vm-manager", + flags=Gio.ApplicationFlags.HANDLES_COMMAND_LINE, + ) + + self.add_main_option( + "debug", + ord("d"), + GLib.OptionFlags.NONE, + GLib.OptionArg.NONE, + "enable debug mode", + None, + ) + + self.window: MainWindow | None = None + self.connect("activate", self.on_activate) + self.connect("shutdown", self.on_shutdown) + + def on_shutdown(self, source: "MainApplication") -> None: + log.debug("Shutting down Adw.Application") + + if self.get_windows() == []: + log.warning("No windows to destroy") + if self.window: + # TODO: Doesn't seem to raise the destroy signal. Need to investigate + # self.get_windows() returns an empty list. Desync between window and application? + self.window.close() + # Killing vms directly. This is dirty + self.window.kill_vms() + else: + log.error("No window to destroy") + + def do_command_line(self, command_line: Any) -> int: + options = command_line.get_options_dict() + # convert GVariantDict -> GVariant -> dict + options = options.end().unpack() + + if "debug" in options and self.window is None: + setup_logging(logging.DEBUG, root_log_name=__name__.split(".")[0]) + setup_logging(logging.DEBUG, root_log_name="clan_cli") + elif self.window is None: + setup_logging(logging.INFO, root_log_name=__name__.split(".")[0]) + log.debug("Debug logging enabled") + + if "debug" in options: + ToastOverlay.use().add_toast_unique( + InfoToast("Debug logging enabled").toast, "info.debugging.enabled" + ) + + args = command_line.get_arguments() + + self.activate() + + if len(args) > 1: + uri = args[1] + self.emit("join_request", uri) + return 0 + + def on_window_hide_unhide(self, *_args: Any) -> None: + if not self.window: + log.error("No window to hide/unhide") + return + if self.window.is_visible(): + self.window.hide() + else: + self.window.present() + + def dummy_menu_entry(self) -> None: + log.info("Dummy menu entry called") + + def on_activate(self, source: "MainApplication") -> None: + if not self.window: + self.init_style() + self.window = MainWindow(config=ClanConfig(initial_view="list")) + self.window.set_application(self) + + self.window.show() + + # TODO: For css styling + def init_style(self) -> None: + resource_path = assets.loc / "style.css" + + log.debug(f"Style css path: {resource_path}") + css_provider = Gtk.CssProvider() + css_provider.load_from_path(str(resource_path)) + display = Gdk.Display.get_default() + assert display is not None + Gtk.StyleContext.add_provider_for_display( + display, + css_provider, + Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION, + ) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/__init__.py b/pkgs/clan-vm-manager/clan_vm_manager/assets/__init__.py new file mode 100644 index 00000000..bfd4588e --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/assets/__init__.py @@ -0,0 +1,7 @@ +from pathlib import Path + +loc: Path = Path(__file__).parent + + +def get_asset(name: str | Path) -> Path: + return loc / name diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_black.png b/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_black.png new file mode 100644 index 00000000..370d0f75 Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_black.png differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_black_notext.png b/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_black_notext.png new file mode 100644 index 00000000..edfb5a90 Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_black_notext.png differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_white.png b/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_white.png new file mode 100644 index 00000000..b5bcb145 Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_white.png differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_white_notext.png b/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_white_notext.png new file mode 100644 index 00000000..45361f1a Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/clan_white_notext.png differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/cybernet.jpeg b/pkgs/clan-vm-manager/clan_vm_manager/assets/cybernet.jpeg new file mode 100644 index 00000000..f2840c42 Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/cybernet.jpeg differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/cybernet_no_text.jpeg b/pkgs/clan-vm-manager/clan_vm_manager/assets/cybernet_no_text.jpeg new file mode 100644 index 00000000..53d4f30c Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/cybernet_no_text.jpeg differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/firestorm.jpeg b/pkgs/clan-vm-manager/clan_vm_manager/assets/firestorm.jpeg new file mode 100644 index 00000000..2c8241c6 Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/firestorm.jpeg differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/penguin.jpeg b/pkgs/clan-vm-manager/clan_vm_manager/assets/penguin.jpeg new file mode 100644 index 00000000..72f8e503 Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/penguin.jpeg differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/placeholder.jpeg b/pkgs/clan-vm-manager/clan_vm_manager/assets/placeholder.jpeg new file mode 100644 index 00000000..f3fa915b Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/placeholder.jpeg differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/placeholder2.jpeg b/pkgs/clan-vm-manager/clan_vm_manager/assets/placeholder2.jpeg new file mode 100644 index 00000000..5c251a0d Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/placeholder2.jpeg differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/style.css b/pkgs/clan-vm-manager/clan_vm_manager/assets/style.css new file mode 100644 index 00000000..c179744d --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/assets/style.css @@ -0,0 +1,66 @@ +/* Insert custom styles here */ + +navigation-view { + padding: 5px; + /* padding-left: 5px; + padding-right: 5px; + padding-bottom: 5px; */ +} + +avatar { + margin: 2px; +} + +.trust { + padding-top: 25px; + padding-bottom: 25px; +} + +.join-list { + margin-top: 1px; + margin-left: 2px; + margin-right: 2px; + +} + +.progress-bar { + margin-right: 25px; + min-width: 200px; +} + +.group-list { + background-color: inherit; +} +.group-list > .activatable:hover { + background-color: unset; +} + +.group-list > row { + margin-top: 12px; + border-bottom: unset; +} + + +.vm-list { + margin-top: 25px; + margin-bottom: 25px; +} + +.no-shadow { + box-shadow: none; +} + +.search-entry { + margin-bottom: 12px; +} + +searchbar { + margin-bottom: 25px; +} + + +.log-view { + margin-top: 12px; + font-family: monospace; + padding: 8px; +} diff --git a/pkgs/clan-vm-manager/clan_vm_manager/assets/zenith.jpeg b/pkgs/clan-vm-manager/clan_vm_manager/assets/zenith.jpeg new file mode 100644 index 00000000..904182b5 Binary files /dev/null and b/pkgs/clan-vm-manager/clan_vm_manager/assets/zenith.jpeg differ diff --git a/pkgs/clan-vm-manager/clan_vm_manager/components/__init__.py b/pkgs/clan-vm-manager/clan_vm_manager/components/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkgs/clan-vm-manager/clan_vm_manager/components/executor.py b/pkgs/clan-vm-manager/clan_vm_manager/components/executor.py new file mode 100644 index 00000000..7ca59d52 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/components/executor.py @@ -0,0 +1,132 @@ +import logging +import os +import signal +import sys +import traceback +from pathlib import Path +from typing import Any + +import gi + +gi.require_version("GdkPixbuf", "2.0") + +import dataclasses +import multiprocessing as mp +from collections.abc import Callable + +log = logging.getLogger(__name__) + + +# Kill the new process and all its children by sending a SIGTERM signal to the process group +def _kill_group(proc: mp.Process) -> None: + pid = proc.pid + if proc.is_alive() and pid: + os.killpg(pid, signal.SIGTERM) + else: + log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead") + + +@dataclasses.dataclass(frozen=True) +class MPProcess: + name: str + proc: mp.Process + out_file: Path + + # Kill the new process and all its children by sending a SIGTERM signal to the process group + def kill_group(self) -> None: + _kill_group(proc=self.proc) + + +def _set_proc_name(name: str) -> None: + if sys.platform != "linux": + return + import ctypes + + # Define the prctl function with the appropriate arguments and return type + libc = ctypes.CDLL("libc.so.6") + prctl = libc.prctl + prctl.argtypes = [ + ctypes.c_int, + ctypes.c_char_p, + ctypes.c_ulong, + ctypes.c_ulong, + ctypes.c_ulong, + ] + prctl.restype = ctypes.c_int + + # Set the process name to "my_process" + prctl(15, name.encode(), 0, 0, 0) + + +def _init_proc( + func: Callable, + out_file: Path, + proc_name: str, + on_except: Callable[[Exception, mp.process.BaseProcess], None] | None, + **kwargs: Any, +) -> None: + # Create a new process group + os.setsid() + + # Open stdout and stderr + with open(out_file, "w") as out_fd: + os.dup2(out_fd.fileno(), sys.stdout.fileno()) + os.dup2(out_fd.fileno(), sys.stderr.fileno()) + + # Print some information + pid = os.getpid() + gpid = os.getpgid(pid=pid) + + # Set the process name + _set_proc_name(proc_name) + + # Close stdin + sys.stdin.close() + + linebreak = "=" * 5 + # Execute the main function + print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr) + try: + func(**kwargs) + except Exception as ex: + traceback.print_exc() + if on_except is not None: + on_except(ex, mp.current_process()) + + # Kill the new process and all its children by sending a SIGTERM signal to the process group + pid = os.getpid() + gpid = os.getpgid(pid=pid) + print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr) + os.killpg(gpid, signal.SIGTERM) + sys.exit(1) + # Don't use a finally block here, because we want the exitcode to be set to + # 0 if the function returns normally + + +def spawn( + *, + out_file: Path, + on_except: Callable[[Exception, mp.process.BaseProcess], None] | None, + func: Callable, + **kwargs: Any, +) -> MPProcess: + # Decouple the process from the parent + if mp.get_start_method(allow_none=True) is None: + mp.set_start_method(method="forkserver") + + # Set names + proc_name = f"MPExec:{func.__name__}" + + # Start the process + proc = mp.Process( + target=_init_proc, + args=(func, out_file, proc_name, on_except), + name=proc_name, + kwargs=kwargs, + ) + proc.start() + + # Return the process + mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file) + + return mp_proc diff --git a/pkgs/clan-vm-manager/clan_vm_manager/components/gkvstore.py b/pkgs/clan-vm-manager/clan_vm_manager/components/gkvstore.py new file mode 100644 index 00000000..ea96b005 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/components/gkvstore.py @@ -0,0 +1,220 @@ +import logging +from collections.abc import Callable +from typing import Any, Generic, TypeVar + +import gi + +gi.require_version("Gio", "2.0") +from gi.repository import Gio, GObject + +log = logging.getLogger(__name__) + + +# Define type variables for key and value types +K = TypeVar("K") # Key type +V = TypeVar( + "V", bound=GObject.Object +) # Value type, bound to GObject.GObject or its subclasses + + +class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]): + """ + A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values. + Only use self[key] and del self[key] for accessing the items for better performance. + This class could be optimized by having the objects remember their position in the list. + """ + + def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None: + super().__init__() + self.gtype = gtype + self.key_gen = key_gen + # From Python 3.7 onwards dictionaries are ordered by default + self._items: dict[K, V] = dict() + + ################################## + # # + # Gio.ListStore Interface # + # # + ################################## + @classmethod + def new(cls: Any, gtype: type[V]) -> "GKVStore": + return cls.__new__(cls, gtype) + + def append(self, item: V) -> None: + key = self.key_gen(item) + self[key] = item + + def find(self, item: V) -> tuple[bool, int]: + log.warning("Finding is O(n) in GKVStore. Better use indexing") + for i, v in enumerate(self.values()): + if v == item: + return True, i + return False, -1 + + def find_with_equal_func( + self, item: V, equal_func: Callable[[V, V], bool] + ) -> tuple[bool, int]: + log.warning("Finding is O(n) in GKVStore. Better use indexing") + for i, v in enumerate(self.values()): + if equal_func(v, item): + return True, i + return False, -1 + + def find_with_equal_func_full( + self, item: V, equal_func: Callable[[V, V, Any], bool], user_data: Any + ) -> tuple[bool, int]: + log.warning("Finding is O(n) in GKVStore. Better use indexing") + for i, v in enumerate(self.values()): + if equal_func(v, item, user_data): + return True, i + return False, -1 + + def insert(self, position: int, item: V) -> None: + log.warning("Inserting is O(n) in GKVStore. Better use append") + log.warning( + "This functions may have incorrect items_changed signal behavior. Please test it" + ) + key = self.key_gen(item) + if key in self._items: + raise ValueError("Key already exists in the dictionary") + if position < 0 or position > len(self._items): + raise IndexError("Index out of range") + + # Temporary storage for items to be reinserted + temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]] + + # Delete items from the original dict + for k in list(self.keys())[position:]: + del self._items[k] + + # Insert the new key-value pair + self._items[key] = item + + # Reinsert the items + for i, (k, v) in enumerate(temp_list): + self._items[k] = v + + # Notify the model of the changes + self.items_changed(position, 0, 1) + + def insert_sorted( + self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any + ) -> None: + raise NotImplementedError("insert_sorted is not implemented in GKVStore") + + def remove(self, position: int) -> None: + if position < 0 or position >= self.get_n_items(): + return + key = self.keys()[position] + del self[key] + self.items_changed(position, 1, 0) + + def remove_all(self) -> None: + self._items.clear() + self.items_changed(0, len(self._items), 0) + + def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None: + raise NotImplementedError("sort is not implemented in GKVStore") + + def splice(self, position: int, n_removals: int, additions: list[V]) -> None: + raise NotImplementedError("splice is not implemented in GKVStore") + + ################################## + # # + # Gio.ListModel Interface # + # # + ################################## + def get_item(self, position: int) -> V | None: + if position < 0 or position >= self.get_n_items(): + return None + # Access items by index since OrderedDict does not support direct indexing + key = list(self._items.keys())[position] + return self._items[key] + + def do_get_item(self, position: int) -> V | None: + return self.get_item(position) + + def get_item_type(self) -> Any: + return self.gtype.__gtype__ # type: ignore[attr-defined] + + def do_get_item_type(self) -> GObject.GType: + return self.get_item_type() + + def get_n_items(self) -> int: + return len(self._items) + + def do_get_n_items(self) -> int: + return self.get_n_items() + + ################################## + # # + # Dict Interface # + # # + ################################## + def keys(self) -> list[K]: + return list(self._items.keys()) + + def values(self) -> list[V]: + return list(self._items.values()) + + def items(self) -> list[tuple[K, V]]: + return list(self._items.items()) + + def get(self, key: K, default: V | None = None) -> V | None: + return self._items.get(key, default) + + # O(1) operation if the key does not exist, O(n) if it does + def __setitem__(self, key: K, value: V) -> None: + # If the key already exists, remove it O(n) + if key in self._items: + log.debug("Updating an existing key in GKVStore is O(n)") + position = self.keys().index(key) + self._items[key] = value + self.items_changed(position, 1, 1) + else: + # Add the new key-value pair + self._items[key] = value + position = max(len(self._items) - 1, 0) + self.items_changed(position, 0, 1) + + # O(n) operation + def __delitem__(self, key: K) -> None: + position = self.keys().index(key) + del self._items[key] + self.items_changed(position, 1, 0) + + def __len__(self) -> int: + return len(self._items) + + # O(1) operation + def __getitem__(self, key: K) -> V: # type: ignore[override] + return self._items[key] + + def __contains__(self, key: K) -> bool: # type: ignore[override] + return key in self._items + + def __str__(self) -> str: + resp = "GKVStore(\n" + for k, v in self._items.items(): + resp += f"{k}: {v}\n" + resp += ")" + return resp + + def __repr__(self) -> str: + return self._items.__str__() + + ################################## + # # + # Custom Methods # + # # + ################################## + def first(self) -> V: + return self.values()[0] + + def last(self) -> V: + return self.values()[-1] + + def register_on_change( + self, callback: Callable[["GKVStore[K,V]", int, int, int], None] + ) -> None: + self.connect("items-changed", callback) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/components/interfaces.py b/pkgs/clan-vm-manager/clan_vm_manager/components/interfaces.py new file mode 100644 index 00000000..28bd4649 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/components/interfaces.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + +import gi + +gi.require_version("Gtk", "4.0") + + +@dataclass +class ClanConfig: + initial_view: str diff --git a/pkgs/clan-vm-manager/clan_vm_manager/components/list_splash.py b/pkgs/clan-vm-manager/clan_vm_manager/components/list_splash.py new file mode 100644 index 00000000..73a5b73e --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/components/list_splash.py @@ -0,0 +1,74 @@ +import logging +from collections.abc import Callable +from typing import TypeVar + +import gi + +from clan_vm_manager import assets + +gi.require_version("Adw", "1") +from gi.repository import Adw, GdkPixbuf, Gio, GObject, Gtk + +log = logging.getLogger(__name__) + +ListItem = TypeVar("ListItem", bound=GObject.Object) +CustomStore = TypeVar("CustomStore", bound=Gio.ListModel) + + +class EmptySplash(Gtk.Box): + def __init__(self, on_join: Callable[[str], None]) -> None: + super().__init__(orientation=Gtk.Orientation.VERTICAL) + self.on_join = on_join + + vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) + clan_icon = self.load_image(str(assets.get_asset("clan_black_notext.png"))) + + if clan_icon: + image = Gtk.Image.new_from_pixbuf(clan_icon) + else: + image = Gtk.Image.new_from_icon_name("image-missing") + # same as the clamp + image.set_pixel_size(400) + image.set_opacity(0.5) + image.set_margin_top(20) + image.set_margin_bottom(10) + + vbox.append(image) + + empty_label = Gtk.Label(label="Welcome to Clan! Join your first clan.") + join_entry = Gtk.Entry() + join_entry.set_placeholder_text("clan://") + join_entry.set_hexpand(True) + + join_button = Gtk.Button(label="Join") + join_button.connect("clicked", self._on_join, join_entry) + + join_entry.connect("activate", lambda e: self._on_join(join_button, e)) + + clamp = Adw.Clamp() + clamp.set_maximum_size(400) + clamp.set_margin_bottom(40) + vbox.append(empty_label) + hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6) + hbox.append(join_entry) + hbox.append(join_button) + vbox.append(hbox) + clamp.set_child(vbox) + + self.append(clamp) + + def load_image(self, file_path: str) -> GdkPixbuf.Pixbuf | None: + try: + pixbuf = GdkPixbuf.Pixbuf.new_from_file(file_path) + return pixbuf + except Exception as e: + log.error(f"Failed to load image: {e}") + return None + + def _on_join(self, button: Gtk.Button, entry: Gtk.Entry) -> None: + """ + Callback for the join button + Extracts the text from the entry and calls the on_join callback + """ + log.info(f"Splash screen: Joining {entry.get_text()}") + self.on_join(entry.get_text()) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/components/trayicon.py b/pkgs/clan-vm-manager/clan_vm_manager/components/trayicon.py new file mode 100644 index 00000000..4bfd5a23 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/components/trayicon.py @@ -0,0 +1,1189 @@ +# mypy: allow-untyped-defs +# ruff: noqa: ANN201, ANN001, ANN202 + +# COPYRIGHT (C) 2020-2024 Nicotine+ Contributors +# +# GNU GENERAL PUBLIC LICENSE +# Version 3, 29 June 2007 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import sys +from collections.abc import Callable +from typing import Any, ClassVar + +import gi + +gi.require_version("Gtk", "4.0") +from gi.repository import GdkPixbuf, Gio, GLib, Gtk + + +# DUMMY IMPLEMENTATION +################################################ +### import pynicotine +class Pynicotine: + __application_id__ = "nicotine-plus" + __application_name__ = "Nicotine+" + __version__ = "3.0.0" + + +pynicotine = Pynicotine() + + +### from pynicotine import slskmessages +class UserStatus: + OFFLINE = 0 + ONLINE = 1 + AWAY = 2 + + +class Slskmessages: + UserStatus: Any = UserStatus + + +slskmessages = Slskmessages() + + +### from pynicotine.config import config +class Config: + sections: ClassVar = { + "notifications": {"notification_popup_sound": False}, + "ui": {"trayicon": True}, + } + data_folder_path: Any = "data_folder_path" + + +config = Config() + + +### from pynicotine.core import core +class User: + login_status: Any = UserStatus.OFFLINE + + +class Core: + users = User() + + +core = Core() +### from pynicotine.gtkgui.application import GTK_API_VERSION +GTK_API_VERSION = 4 + +## from pynicotine.gtkgui.application import GTK_GUI_FOLDER_PATH +GTK_GUI_FOLDER_PATH = "assets" +LONG_PATH_PREFIX = "\\\\?\\" + + +# from pynicotine.gtkgui.widgets.theme import ICON_THEME +class IconTheme: + def lookup_icon(self, icon_name: str, **kwargs: Any) -> None: + return None + + +ICON_THEME = IconTheme() + + +# from pynicotine.gtkgui.widgets.window import Window +class CWindow: + activation_token: Any = None + + +Window = CWindow() + +### from pynicotine.logfacility import log + +import logging + + +class MyLog: + def __init__(self) -> None: + self.log = logging.getLogger(__name__) + + def add_debug(self, *args: Any, **kwargs: Any) -> None: + return + self.log.debug(*args, **kwargs) + + +log = MyLog() + + +### from pynicotine.utils import encode_path + + +def encode_path(path: str, prefix: bool = True) -> bytes: + """Converts a file path to bytes for processing by the system. + + On Windows, also append prefix to enable extended-length path. + """ + + if sys.platform == "win32" and prefix: + path = path.replace("/", "\\") + + if path.startswith("\\\\"): + path = "UNC" + path[1:] + + path = LONG_PATH_PREFIX + path + + return path.encode("utf-8") + + +# from pynicotine.utils import truncate_string_byte +def truncate_string_byte( + string: str, byte_limit: int, encoding: str = "utf-8", ellipsize: bool = False +) -> str: + """Truncates a string to fit inside a byte limit.""" + + string_bytes = string.encode(encoding) + + if len(string_bytes) <= byte_limit: + # Nothing to do, return original string + return string + + if ellipsize: + ellipsis_char = "…".encode(encoding) + string_bytes = ( + string_bytes[: max(byte_limit - len(ellipsis_char), 0)].rstrip() + + ellipsis_char + ) + else: + string_bytes = string_bytes[:byte_limit] + + return string_bytes.decode(encoding, "ignore") + + +################################################ + + +class ImplUnavailableError(Exception): + pass + + +class BaseImplementation: + def __init__(self, application: Any) -> None: + self.application = application + self.menu_items: dict[int, Any] = {} + self.menu_item_id: int = 1 + self.activate_callback: Callable = lambda a, b: self.update_window_visibility + self.is_visible: bool = True + + self.create_menu() + + def create_item( + self, + text: str | None = None, + callback: Callable | None = None, + check: bool = False, + ) -> dict[str, Any]: + item: dict[str, Any] = {"id": self.menu_item_id, "sensitive": True} + + if text is not None: + item["text"] = text + + if callback is not None: + item["callback"] = callback + + if check: + item["toggled"] = False + + self.menu_items[self.menu_item_id] = item + self.menu_item_id += 1 + + return item + + @staticmethod + def set_item_text(item: dict[str, Any], text: str | None) -> None: + item["text"] = text + + @staticmethod + def set_item_sensitive(item: dict[str, Any], sensitive: bool) -> None: + item["sensitive"] = sensitive + + @staticmethod + def set_item_toggled(item: dict[str, Any], toggled: bool) -> None: + item["toggled"] = toggled + + def create_menu(self) -> None: + self.show_hide_item = self.create_item( + "default", self.application.on_window_hide_unhide + ) + + # self.create_item() + + # self.create_item("_Quit", self.application.on_shutdown) + + def update_window_visibility(self) -> None: + if self.application.window is None: + return + + if self.application.window.is_visible(): + label = "Hide VM Manager" + else: + label = "Show VM Manager" + + self.set_item_text(self.show_hide_item, label) + self.update_menu() + + def update_user_status(self) -> None: + self.update_icon() + self.update_menu() + + def update_icon(self) -> None: + pass + # # Check for highlights, and display highlight icon if there is a highlighted room or private chat + # if (self.application.window + # and (self.application.window.chatrooms.highlighted_rooms + # or self.application.window.privatechat.highlighted_users)): + # icon_name = "msg" + + # elif core.users.login_status == slskmessages.UserStatus.ONLINE: + # icon_name = "connect" + + # elif core.users.login_status == slskmessages.UserStatus.AWAY: + # icon_name = "away" + + # else: + # icon_name = "disconnect" + + # icon_name = f"{pynicotine.__application_id__}-{icon_name}" + # self.set_icon(icon_name) + + def set_icon(self, icon_name: str) -> None: + # Implemented in subclasses + pass + + def update_icon_theme(self) -> None: + # Implemented in subclasses + pass + + def update_menu(self) -> None: + # Implemented in subclasses + pass + + def set_download_status(self, status: str) -> None: + self.update_menu() + + def set_upload_status(self, status) -> None: + self.update_menu() + + def show_notification(self, title, message) -> None: + # Implemented in subclasses + pass + + def unload(self, is_shutdown=False) -> None: + # Implemented in subclasses + pass + + +class StatusNotifierImplementation(BaseImplementation): + class DBusProperty: + def __init__(self, name, signature, value) -> None: + self.name = name + self.signature = signature + self.value = value + + class DBusSignal: + def __init__(self, name, args) -> None: + self.name = name + self.args = args + + class DBusMethod: + def __init__(self, name, in_args, out_args, callback) -> None: + self.name = name + self.in_args = in_args + self.out_args = out_args + self.callback = callback + + class DBusService: + def __init__(self, interface_name, object_path, bus_type) -> None: + self._interface_name = interface_name + self._object_path = object_path + + self._bus = Gio.bus_get_sync(bus_type) + self._registration_id = None + self.properties: Any = {} + self.signals: Any = {} + self.methods: Any = {} + + def register(self): + xml_output = f"" + + for property_name, prop in self.properties.items(): + xml_output += f"" + + for method_name, method in self.methods.items(): + xml_output += f"" + + for in_signature in method.in_args: + xml_output += f"" + for out_signature in method.out_args: + xml_output += f"" + + xml_output += "" + + for signal_name, signal in self.signals.items(): + xml_output += f"" + + for signature in signal.args: + xml_output += f"" + + xml_output += "" + + xml_output += "" + + registration_id = self._bus.register_object( + object_path=self._object_path, + interface_info=Gio.DBusNodeInfo.new_for_xml(xml_output).interfaces[0], + method_call_closure=self.on_method_call, + get_property_closure=self.on_get_property, + ) + + if not registration_id: + raise GLib.Error( + f"Failed to register object with path {self._object_path}" + ) + + self._registration_id = registration_id + + def unregister(self) -> None: + if self._registration_id is None: + return + + self._bus.unregister_object(self._registration_id) + self._registration_id = None + + def add_property(self, name: str, signature: Any, value: Any) -> None: + self.properties[name] = StatusNotifierImplementation.DBusProperty( + name, signature, value + ) + + def add_signal(self, name: str, args: Any) -> None: + self.signals[name] = StatusNotifierImplementation.DBusSignal(name, args) + + def add_method( + self, name: str, in_args: Any, out_args: Any, callback: Any + ) -> None: + self.methods[name] = StatusNotifierImplementation.DBusMethod( + name, in_args, out_args, callback + ) + + def emit_signal(self, name: str, *args: Any) -> None: + arg_types = "".join(self.signals[name].args) + + self._bus.emit_signal( + destination_bus_name=None, + object_path=self._object_path, + interface_name=self._interface_name, + signal_name=name, + parameters=GLib.Variant(f"({arg_types})", args), + ) + + def on_method_call( + self, + _connection, + _sender, + _path, + _interface_name, + method_name, + parameters, + invocation, + ): + method = self.methods[method_name] + result = method.callback(*parameters.unpack()) + + out_arg_types = "".join(method.out_args) + return_value = None + + if method.out_args: + return_value = GLib.Variant(f"({out_arg_types})", result) + + invocation.return_value(return_value) + + def on_get_property( + self, _connection, _sender, _path, _interface_name, property_name + ): + prop = self.properties[property_name] + return GLib.Variant(prop.signature, prop.value) + + class DBusMenuService(DBusService): + def __init__(self) -> None: + super().__init__( + interface_name="com.canonical.dbusmenu", + object_path="/org/ayatana/NotificationItem/Nicotine/Menu", + bus_type=Gio.BusType.SESSION, + ) + + self._items: Any = {} + self._revision: Any = 0 + + for method_name, in_args, out_args, callback in ( + ( + "GetGroupProperties", + ("ai", "as"), + ("a(ia{sv})",), + self.on_get_group_properties, + ), + ( + "GetLayout", + ("i", "i", "as"), + ("u", "(ia{sv}av)"), + self.on_get_layout, + ), + ("Event", ("i", "s", "v", "u"), (), self.on_event), + ): + self.add_method(method_name, in_args, out_args, callback) + + for signal_name, value in (("LayoutUpdated", ("u", "i")),): + self.add_signal(signal_name, value) + + def set_items(self, items) -> None: + self._items = items + + self._revision += 1 + self.emit_signal("LayoutUpdated", self._revision, 0) + + @staticmethod + def _serialize_item(item) -> dict[str, Any]: + if "text" in item: + props = { + "label": GLib.Variant("s", item["text"]), + "enabled": GLib.Variant("b", item["sensitive"]), + } + + if item.get("toggled") is not None: + props["toggle-type"] = GLib.Variant("s", "checkmark") + props["toggle-state"] = GLib.Variant("i", int(item["toggled"])) + + return props + + return {"type": GLib.Variant("s", "separator")} + + def on_get_group_properties(self, ids, _properties): + item_properties = [] + + for idx, item in self._items.items(): + if idx in ids: + item_properties.append((idx, self._serialize_item(item))) + + return (item_properties,) + + def on_get_layout(self, _parent_id, _recursion_depth, _property_names): + serialized_items = [] + + for idx, item in self._items.items(): + serialized_item = GLib.Variant( + "(ia{sv}av)", (idx, self._serialize_item(item), []) + ) + serialized_items.append(serialized_item) + + return self._revision, (0, {}, serialized_items) + + def on_event(self, idx, event_id, _data, _timestamp) -> None: + if event_id == "clicked": + self._items[idx]["callback"]() + + class StatusNotifierItemService(DBusService): + def __init__(self, activate_callback) -> None: + super().__init__( + interface_name="org.kde.StatusNotifierItem", + object_path="/org/ayatana/NotificationItem/Nicotine", + bus_type=Gio.BusType.SESSION, + ) + + self.menu = StatusNotifierImplementation.DBusMenuService() + + for property_name, signature, value in ( + ("Category", "s", "Communications"), + ("Id", "s", pynicotine.__application_id__), + ("Title", "s", pynicotine.__application_name__), + ( + "ToolTip", + "(sa(iiay)ss)", + ("", [], pynicotine.__application_name__, ""), + ), + ("Menu", "o", "/org/ayatana/NotificationItem/Nicotine/Menu"), + ("ItemIsMenu", "b", False), + ("IconName", "s", ""), + ("IconThemePath", "s", ""), + ("Status", "s", "Active"), + ): + self.add_property(property_name, signature, value) + + for method_name, in_args, out_args, callback in ( + ("Activate", ("i", "i"), (), activate_callback), + ( + "ProvideXdgActivationToken", + ("s",), + (), + self.on_provide_activation_token, + ), + ): + self.add_method(method_name, in_args, out_args, callback) + + for signal_name, value in ( + ("NewIcon", ()), + ("NewIconThemePath", ("s",)), + ("NewStatus", ("s",)), + ): + self.add_signal(signal_name, value) + + def register(self): + self.menu.register() + super().register() + + def unregister(self): + super().unregister() + self.menu.unregister() + + def on_provide_activation_token(self, token): + Window.activation_token = token + + def __init__(self, application) -> None: + super().__init__(application) + + self.tray_icon: Any = None + self.custom_icons: bool = False + + try: + self.bus = Gio.bus_get_sync(bus_type=Gio.BusType.SESSION) + self.tray_icon = self.StatusNotifierItemService( + activate_callback=self.activate_callback + ) + self.tray_icon.register() + + from clan_vm_manager.assets import loc + + icon_path = str(loc / "clan_white_notext.png") + self.set_icon(icon_path) + + self.bus.call_sync( + bus_name="org.kde.StatusNotifierWatcher", + object_path="/StatusNotifierWatcher", + interface_name="org.kde.StatusNotifierWatcher", + method_name="RegisterStatusNotifierItem", + parameters=GLib.Variant( + "(s)", ("/org/ayatana/NotificationItem/Nicotine",) + ), + reply_type=None, + flags=Gio.DBusCallFlags.NONE, + timeout_msec=-1, + ) + + except GLib.Error as error: + self.unload() + raise ImplUnavailableError( + f"StatusNotifier implementation not available: {error}" + ) from error + + self.update_menu() + + @staticmethod + def check_icon_path(icon_name, icon_path) -> bool: + """Check if tray icons exist in the specified icon path.""" + + if not icon_path: + return False + + icon_scheme = f"{pynicotine.__application_id__}-{icon_name}." + + try: + with os.scandir(encode_path(icon_path)) as entries: + for entry in entries: + if entry.is_file() and entry.name.decode( + "utf-8", "replace" + ).startswith(icon_scheme): + return True + + except OSError as error: + log.add_debug(f"Error accessing tray icon path {icon_path}: {error}") + + return False + + def get_icon_path(self): + """Returns an icon path to use for tray icons, or None to fall back to + system-wide icons.""" + + # icon_path = self.application.get_application_icon_path() + + return "" + + def set_icon(self, icon_path) -> None: + self.tray_icon.properties["IconName"].value = icon_path + self.tray_icon.emit_signal("NewIcon") + + if not self.is_visible: + return + + status = "Active" + + if self.tray_icon.properties["Status"].value != status: + self.tray_icon.properties["Status"].value = status + self.tray_icon.emit_signal("NewStatus", status) + + def update_icon_theme(self): + # If custom icon path was found, use it, otherwise we fall back to system icons + icon_path = self.get_icon_path() + self.tray_icon.properties["IconThemePath"].value = icon_path + self.tray_icon.emit_signal("NewIconThemePath", icon_path) + + if icon_path: + log.add_debug("Using tray icon path %s", icon_path) + + def update_menu(self) -> None: + self.tray_icon.menu.set_items(self.menu_items) + + def unload(self, is_shutdown: bool = False) -> None: + if self.tray_icon is None: + return + + status = "Passive" + + self.tray_icon.properties["Status"].value = status + self.tray_icon.emit_signal("NewStatus", status) + + if is_shutdown: + self.tray_icon.unregister() + + +class Win32Implementation(BaseImplementation): + """Windows NotifyIcon implementation. + + https://learn.microsoft.com/en-us/windows/win32/shell/notification-area + https://learn.microsoft.com/en-us/windows/win32/shell/taskbar + """ + + WINDOW_CLASS_NAME = "NicotineTrayIcon" + + NIM_ADD = 0 + NIM_MODIFY = 1 + NIM_DELETE = 2 + + NIF_MESSAGE = 1 + NIF_ICON = 2 + NIF_TIP = 4 + NIF_INFO = 16 + NIIF_NOSOUND = 16 + + MIIM_STATE = 1 + MIIM_ID = 2 + MIIM_STRING = 64 + + MFS_ENABLED = 0 + MFS_UNCHECKED = 0 + MFS_DISABLED = 3 + MFS_CHECKED = 8 + + MFT_SEPARATOR = 2048 + + WM_NULL = 0 + WM_DESTROY = 2 + WM_CLOSE = 16 + WM_COMMAND = 273 + WM_LBUTTONUP = 514 + WM_RBUTTONUP = 517 + WM_USER = 1024 + WM_TRAYICON = WM_USER + 1 + NIN_BALLOONHIDE = WM_USER + 3 + NIN_BALLOONTIMEOUT = WM_USER + 4 + NIN_BALLOONUSERCLICK = WM_USER + 5 + + CS_VREDRAW = 1 + CS_HREDRAW = 2 + COLOR_WINDOW = 5 + IDC_ARROW = 32512 + + WS_OVERLAPPED = 0 + WS_SYSMENU = 524288 + CW_USEDEFAULT = -2147483648 + + IMAGE_ICON = 1 + LR_LOADFROMFILE = 16 + SM_CXSMICON = 49 + + if sys.platform == "win32": + from ctypes import Structure + + class WNDCLASSW(Structure): + from ctypes import CFUNCTYPE, wintypes + + LPFN_WND_PROC = CFUNCTYPE( + wintypes.INT, + wintypes.HWND, + wintypes.UINT, + wintypes.WPARAM, + wintypes.LPARAM, + ) + _fields_: ClassVar = [ + ("style", wintypes.UINT), + ("lpfn_wnd_proc", LPFN_WND_PROC), + ("cb_cls_extra", wintypes.INT), + ("cb_wnd_extra", wintypes.INT), + ("h_instance", wintypes.HINSTANCE), + ("h_icon", wintypes.HICON), + ("h_cursor", wintypes.HANDLE), + ("hbr_background", wintypes.HBRUSH), + ("lpsz_menu_name", wintypes.LPCWSTR), + ("lpsz_class_name", wintypes.LPCWSTR), + ] + + class MENUITEMINFOW(Structure): + from ctypes import wintypes + + _fields_: ClassVar = [ + ("cb_size", wintypes.UINT), + ("f_mask", wintypes.UINT), + ("f_type", wintypes.UINT), + ("f_state", wintypes.UINT), + ("w_id", wintypes.UINT), + ("h_sub_menu", wintypes.HMENU), + ("hbmp_checked", wintypes.HBITMAP), + ("hbmp_unchecked", wintypes.HBITMAP), + ("dw_item_data", wintypes.LPVOID), + ("dw_type_data", wintypes.LPWSTR), + ("cch", wintypes.UINT), + ("hbmp_item", wintypes.HBITMAP), + ] + + class NOTIFYICONDATAW(Structure): + from ctypes import wintypes + + _fields_: ClassVar = [ + ("cb_size", wintypes.DWORD), + ("h_wnd", wintypes.HWND), + ("u_id", wintypes.UINT), + ("u_flags", wintypes.UINT), + ("u_callback_message", wintypes.UINT), + ("h_icon", wintypes.HICON), + ("sz_tip", wintypes.WCHAR * 128), + ("dw_state", wintypes.DWORD), + ("dw_state_mask", wintypes.DWORD), + ("sz_info", wintypes.WCHAR * 256), + ("u_version", wintypes.UINT), + ("sz_info_title", wintypes.WCHAR * 64), + ("dw_info_flags", wintypes.DWORD), + ("guid_item", wintypes.CHAR * 16), + ("h_balloon_icon", wintypes.HICON), + ] + + def __init__(self, application: Gtk.Application) -> None: + from ctypes import windll # type: ignore + + super().__init__(application) + + self._window_class: Any = None + self._h_wnd = None + self._notify_id = None + self._h_icon = None + self._menu = None + self._wm_taskbarcreated = windll.user32.RegisterWindowMessageW("TaskbarCreated") + + self._register_class() + self._create_window() + self.update_icon() + + def _register_class(self) -> None: + from ctypes import byref, windll # type: ignore + + self._window_class = self.WNDCLASSW( # type: ignore + style=(self.CS_VREDRAW | self.CS_HREDRAW), + lpfn_wnd_proc=self.WNDCLASSW.LPFN_WND_PROC(self.on_process_window_message), # type: ignore + h_cursor=windll.user32.LoadCursorW(0, self.IDC_ARROW), + hbr_background=self.COLOR_WINDOW, + lpsz_class_name=self.WINDOW_CLASS_NAME, + ) + + windll.user32.RegisterClassW(byref(self._window_class)) + + def _unregister_class(self): + if self._window_class is None: + return + + from ctypes import windll + + windll.user32.UnregisterClassW( + self.WINDOW_CLASS_NAME, self._window_class.h_instance + ) + self._window_class = None + + def _create_window(self) -> None: + from ctypes import windll # type: ignore + + style = self.WS_OVERLAPPED | self.WS_SYSMENU + self._h_wnd = windll.user32.CreateWindowExW( + 0, + self.WINDOW_CLASS_NAME, + self.WINDOW_CLASS_NAME, + style, + 0, + 0, + self.CW_USEDEFAULT, + self.CW_USEDEFAULT, + 0, + 0, + 0, + None, + ) + + windll.user32.UpdateWindow(self._h_wnd) + + def _destroy_window(self): + if self._h_wnd is None: + return + + from ctypes import windll + + windll.user32.DestroyWindow(self._h_wnd) + self._h_wnd = None + + def _load_ico_buffer(self, icon_name, icon_size): + ico_buffer = b"" + + if GTK_API_VERSION >= 4: + icon = ICON_THEME.lookup_icon( + icon_name, fallbacks=None, size=icon_size, scale=1, direction=0, flags=0 + ) + icon_path = icon.get_file().get_path() + + if not icon_path: + return ico_buffer + + pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_size( + icon_path, icon_size, icon_size + ) + else: + icon = ICON_THEME.lookup_icon(icon_name, size=icon_size, flags=0) + + if not icon: + return ico_buffer + + pixbuf = icon.load_icon() + + _success, ico_buffer = pixbuf.save_to_bufferv("ico") + return ico_buffer + + def _load_h_icon(self, icon_name): + from ctypes import windll + + # Attempt to load custom icons first + icon_size = windll.user32.GetSystemMetrics(self.SM_CXSMICON) + ico_buffer = self._load_ico_buffer( + icon_name.replace(f"{pynicotine.__application_id__}-", "nplus-tray-"), + icon_size, + ) + + if not ico_buffer: + # No custom icons present, fall back to default icons + ico_buffer = self._load_ico_buffer(icon_name, icon_size) + + try: + import tempfile + + file_handle = tempfile.NamedTemporaryFile(delete=False) + + with file_handle: + file_handle.write(ico_buffer) + + return windll.user32.LoadImageA( + 0, + encode_path(file_handle.name), + self.IMAGE_ICON, + icon_size, + icon_size, + self.LR_LOADFROMFILE, + ) + + finally: + os.remove(file_handle.name) + + def _destroy_h_icon(self): + from ctypes import windll + + if self._h_icon: + windll.user32.DestroyIcon(self._h_icon) + self._h_icon = None + + def _update_notify_icon(self, title="", message="", icon_name=None): + # pylint: disable=attribute-defined-outside-init,no-member + + if self._h_wnd is None: + return + + if icon_name: + self._destroy_h_icon() + self._h_icon = self._load_h_icon(icon_name) + + if not self.is_visible and not (title or message): + # When disabled by user, temporarily show tray icon when displaying a notification + return + + from ctypes import byref, sizeof, windll + + action = self.NIM_MODIFY + + if self._notify_id is None: + self._notify_id = self.NOTIFYICONDATAW( + cb_size=sizeof(self.NOTIFYICONDATAW), + h_wnd=self._h_wnd, + u_id=0, + u_flags=( + self.NIF_ICON | self.NIF_MESSAGE | self.NIF_TIP | self.NIF_INFO + ), + u_callback_message=self.WM_TRAYICON, + sz_tip=truncate_string_byte( + pynicotine.__application_name__, byte_limit=127 + ), + ) + action = self.NIM_ADD + + if config.sections["notifications"]["notification_popup_sound"]: + self._notify_id.dw_info_flags &= ~self.NIIF_NOSOUND + else: + self._notify_id.dw_info_flags |= self.NIIF_NOSOUND + + self._notify_id.h_icon = self._h_icon + self._notify_id.sz_info_title = truncate_string_byte( + title, byte_limit=63, ellipsize=True + ) + self._notify_id.sz_info = truncate_string_byte( + message, byte_limit=255, ellipsize=True + ) + + windll.shell32.Shell_NotifyIconW(action, byref(self._notify_id)) + + def _remove_notify_icon(self): + from ctypes import byref, windll + + if self._notify_id: + windll.shell32.Shell_NotifyIconW(self.NIM_DELETE, byref(self._notify_id)) + self._notify_id = None + + if self._menu: + windll.user32.DestroyMenu(self._menu) + self._menu = None + + def _serialize_menu_item(self, item): + # pylint: disable=attribute-defined-outside-init,no-member + + from ctypes import sizeof + + item_info = self.MENUITEMINFOW(cb_size=sizeof(self.MENUITEMINFOW)) + w_id = item["id"] + text = item.get("text") + is_checked = item.get("toggled") + is_sensitive = item.get("sensitive") + + item_info.f_mask |= self.MIIM_ID + item_info.w_id = w_id + + if text is not None: + item_info.f_mask |= self.MIIM_STRING + item_info.dw_type_data = text.replace("_", "&") # Mnemonics use & + else: + item_info.f_type |= self.MFT_SEPARATOR + + if is_checked is not None: + item_info.f_mask |= self.MIIM_STATE + item_info.f_state |= self.MFS_CHECKED if is_checked else self.MFS_UNCHECKED + + if is_sensitive is not None: + item_info.f_mask |= self.MIIM_STATE + item_info.f_state |= self.MFS_ENABLED if is_sensitive else self.MFS_DISABLED + + return item_info + + def _show_menu(self): + from ctypes import byref, windll, wintypes + + if self._menu is None: + self.update_menu() + + pos = wintypes.POINT() + windll.user32.GetCursorPos(byref(pos)) + + # PRB: Menus for Notification Icons Do Not Work Correctly + # https://web.archive.org/web/20121015064650/http://support.microsoft.com/kb/135788 + + windll.user32.SetForegroundWindow(self._h_wnd) + windll.user32.TrackPopupMenu(self._menu, 0, pos.x, pos.y, 0, self._h_wnd, None) + windll.user32.PostMessageW(self._h_wnd, self.WM_NULL, 0, 0) + + def update_menu(self): + from ctypes import byref, windll + + if self._menu is None: + self._menu = windll.user32.CreatePopupMenu() + + for item in self.menu_items.values(): + item_id = item["id"] + item_info = self._serialize_menu_item(item) + + if not windll.user32.SetMenuItemInfoW( + self._menu, item_id, False, byref(item_info) + ): + windll.user32.InsertMenuItemW( + self._menu, item_id, False, byref(item_info) + ) + + def set_icon(self, icon_name): + self._update_notify_icon(icon_name=icon_name) + + def show_notification(self, title, message): + self._update_notify_icon(title=title, message=message) + + def on_process_window_message(self, h_wnd, msg, w_param, l_param): + from ctypes import windll, wintypes + + if msg == self.WM_TRAYICON: + if l_param == self.WM_RBUTTONUP: + # Icon pressed + self._show_menu() + + elif l_param == self.WM_LBUTTONUP: + # Icon pressed + self.activate_callback() + + elif l_param in ( + self.NIN_BALLOONHIDE, + self.NIN_BALLOONTIMEOUT, + self.NIN_BALLOONUSERCLICK, + ): + if not config.sections["ui"]["trayicon"]: + # Notification dismissed, but user has disabled tray icon + self._remove_notify_icon() + + elif msg == self.WM_COMMAND: + # Menu item pressed + menu_item_id = w_param & 0xFFFF + menu_item_callback = self.menu_items[menu_item_id]["callback"] + menu_item_callback() + + elif msg == self._wm_taskbarcreated: + # Taskbar process restarted, create new icon + self._remove_notify_icon() + self._update_notify_icon() + + return windll.user32.DefWindowProcW( + wintypes.HWND(h_wnd), + msg, + wintypes.WPARAM(w_param), + wintypes.LPARAM(l_param), + ) + + def unload(self, is_shutdown=False): + self._remove_notify_icon() + + if not is_shutdown: + # Keep notification support as long as we're running + return + + self._destroy_h_icon() + self._destroy_window() + self._unregister_class() + + +class TrayIcon: + def __init__(self, application: Gio.Application) -> None: + self.application: Gio.Application = application + self.available: bool = True + self.implementation: Any = None + + self.watch_availability() + self.load() + + def watch_availability(self) -> None: + if sys.platform in {"win32", "darwin"}: + return + + Gio.bus_watch_name( + bus_type=Gio.BusType.SESSION, + name="org.kde.StatusNotifierWatcher", + flags=Gio.BusNameWatcherFlags.NONE, + name_appeared_closure=self.load, + name_vanished_closure=self.unload, + ) + + def load(self, *_args: Any) -> None: + self.available = True + + if sys.platform == "win32": + # Always keep tray icon loaded for Windows notification support + pass + + elif not config.sections["ui"]["trayicon"]: + # No need to have tray icon loaded now (unless this is Windows) + return + + if self.implementation is None: + if sys.platform == "win32": + self.implementation = Win32Implementation(self.application) + else: + try: + self.implementation = StatusNotifierImplementation(self.application) # type: ignore + + except ImplUnavailableError: + self.available = False + return + + self.refresh_state() + + def update_window_visibility(self) -> None: + if self.implementation: + self.implementation.update_window_visibility() + + def update_user_status(self) -> None: + if self.implementation: + self.implementation.update_user_status() + + def update_icon(self) -> None: + if self.implementation: + self.implementation.update_icon() + + def update_icon_theme(self) -> None: + if self.implementation: + self.implementation.update_icon_theme() + + def set_download_status(self, status: str) -> None: + if self.implementation: + self.implementation.set_download_status(status) + + def set_upload_status(self, status: str) -> None: + if self.implementation: + self.implementation.set_upload_status(status) + + def show_notification(self, title: str, message: str) -> None: + if self.implementation: + self.implementation.show_notification(title=title, message=message) + + def refresh_state(self) -> None: + if not self.implementation: + return + + self.implementation.is_visible = True + + self.update_icon_theme() + self.update_icon() + self.update_window_visibility() + self.update_user_status() + + def unload(self, *_args: Any, is_shutdown: bool = False) -> None: + if self.implementation: + self.implementation.unload(is_shutdown=is_shutdown) + self.implementation.is_visible = False + + if is_shutdown: + self.implementation = None + + def destroy(self) -> None: + self.unload(is_shutdown=True) + self.__dict__.clear() diff --git a/pkgs/clan-vm-manager/clan_vm_manager/components/vmobj.py b/pkgs/clan-vm-manager/clan_vm_manager/components/vmobj.py new file mode 100644 index 00000000..05c237ff --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/components/vmobj.py @@ -0,0 +1,384 @@ +import logging +import multiprocessing as mp +import os +import tempfile +import threading +import time +import weakref +from collections.abc import Callable, Generator +from contextlib import contextmanager +from datetime import datetime +from pathlib import Path +from typing import IO, ClassVar + +import gi +from clan_cli import vms +from clan_cli.clan_uri import ClanURI +from clan_cli.dirs import vm_state_dir +from clan_cli.history.add import HistoryEntry +from clan_cli.machines.machines import Machine +from clan_cli.vms.qemu import QMPWrapper + +from clan_vm_manager.components.executor import MPProcess, spawn +from clan_vm_manager.singletons.toast import ( + InfoToast, + SuccessToast, + ToastOverlay, + WarningToast, +) + +gi.require_version("GObject", "2.0") +gi.require_version("Gtk", "4.0") +from gi.repository import Gio, GLib, GObject, Gtk + +log = logging.getLogger(__name__) + + +class VMObject(GObject.Object): + # Define a custom signal with the name "vm_stopped" and a string argument for the message + __gsignals__: ClassVar = { + "vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, []), + "vm_build_notify": (GObject.SignalFlags.RUN_FIRST, None, [bool, bool]), + } + + def __init__( + self, + icon: Path, + data: HistoryEntry, + build_log_cb: Callable[[Gio.File], None], + ) -> None: + super().__init__() + + # Store the data from the history entry + self.data: HistoryEntry = data + + self.build_log_cb = build_log_cb + + # Create a process object to store the VM process + self.vm_process: MPProcess = MPProcess( + "vm_dummy", mp.Process(), Path("./dummy") + ) + self.build_process: MPProcess = MPProcess( + "build_dummy", mp.Process(), Path("./dummy") + ) + self._start_thread: threading.Thread = threading.Thread() + self.machine: Machine | None = None + self.qmp_wrap: QMPWrapper | None = None + + # Watcher to stop the VM + self.KILL_TIMEOUT: int = 20 # seconds + self._stop_thread: threading.Thread = threading.Thread() + + # Build progress bar vars + self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar() + self.progress_bar.hide() + self.progress_bar.set_hexpand(True) # Horizontally expand + self.prog_bar_id: int = 0 + + # Create a temporary directory to store the logs + self.log_dir: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory( + prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}" + ) + self._logs_id: int = 0 + self._log_file: IO[str] | None = None + + # To be able to set the switch state programmatically + # we need to store the handler id returned by the connect method + # and block the signal while we change the state. This is cursed. + self.switch: Gtk.Switch = Gtk.Switch() + self.switch_handler_id: int = self.switch.connect( + "notify::active", self._on_switch_toggle + ) + self.connect("vm_status_changed", self._on_vm_status_changed) + + # Make sure the VM is killed when the reference to this object is dropped + self._finalizer: weakref.finalize = weakref.finalize(self, self._kill_ref_drop) + + def _vm_status_changed_task(self) -> bool: + self.emit("vm_status_changed") + return GLib.SOURCE_REMOVE + + def update(self, data: HistoryEntry) -> None: + self.data = data + + def _on_vm_status_changed(self, source: "VMObject") -> None: + # Signal may be emitted multiple times + self.emit("vm_build_notify", self.is_building(), self.is_running()) + + prev_state = self.switch.get_state() + next_state = self.is_running() and not self.is_building() + + self.switch.set_state(next_state) + if prev_state is False and next_state is True: + ToastOverlay.use().add_toast_unique( + SuccessToast(f"{source.data.flake.flake_attr} started").toast, + "success.vm.start", + ) + + if self.switch.get_sensitive() is False and not self.is_building(): + self.switch.set_sensitive(True) + + exit_vm = self.vm_process.proc.exitcode + exit_build = self.build_process.proc.exitcode + exitc = exit_vm or exit_build + if not self.is_running() and exitc != 0: + with self.switch.handler_block(self.switch_handler_id): + self.switch.set_active(False) + log.error(f"VM exited with error. Exitcode: {exitc}") + ToastOverlay.use().add_toast_unique( + WarningToast(f"VM exited with error. Exitcode: {exitc}").toast, + "warning.vm.exit", + ) + + def _on_switch_toggle(self, switch: Gtk.Switch, user_state: bool) -> None: + if switch.get_active(): + switch.set_state(False) + switch.set_sensitive(False) + self.start() + else: + switch.set_state(True) + self.shutdown() + switch.set_sensitive(False) + + # We use a context manager to create the machine object + # and make sure it is destroyed when the context is exited + @contextmanager + def _create_machine(self) -> Generator[Machine, None, None]: + uri = ClanURI.from_str( + url=str(self.data.flake.flake_url), machine_name=self.data.flake.flake_attr + ) + if uri.flake.is_local(): + self.machine = Machine( + name=self.data.flake.flake_attr, + flake=uri.flake, + ) + if uri.flake.is_remote(): + self.machine = Machine( + name=self.data.flake.flake_attr, + flake=uri.flake, + ) + assert self.machine is not None + state_dir = vm_state_dir( + flake_url=str(self.machine.flake.url), vm_name=self.machine.name + ) + self.qmp_wrap = QMPWrapper(state_dir) + assert self.machine is not None + yield self.machine + self.machine = None + + def _pulse_progress_bar_task(self) -> bool: + if self.progress_bar.is_visible(): + self.progress_bar.pulse() + return GLib.SOURCE_CONTINUE + else: + return GLib.SOURCE_REMOVE + + def __start(self) -> None: + with self._create_machine() as machine: + # Start building VM + tstart = datetime.now() + log.info(f"Building VM {self.get_id()}") + log_dir = Path(str(self.log_dir.name)) + + # Start the build process + self.build_process = spawn( + on_except=None, + out_file=log_dir / "build.log", + func=vms.run.build_vm, + machine=machine, + tmpdir=log_dir, + ) + + gfile = Gio.File.new_for_path(str(log_dir / "build.log")) + # Gio documentation: + # Obtains a file monitor for the given file. + # If no file notification mechanism exists, then regular polling of the file is used. + g_monitor = gfile.monitor_file(Gio.FileMonitorFlags.NONE, None) + g_monitor.connect("changed", self.on_logs_changed) + + GLib.idle_add(self._vm_status_changed_task) + self.switch.set_sensitive(True) + # Start the logs watcher + self._logs_id = GLib.timeout_add( + 50, self._get_logs_task, self.build_process + ) + if self._logs_id == 0: + log.error("Failed to start VM log watcher") + log.debug(f"Starting logs watcher on file: {self.build_process.out_file}") + + # Start the progress bar and show it + self.progress_bar.show() + self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar_task) + if self.prog_bar_id == 0: + log.error("Couldn't spawn a progress bar task") + + # Wait for the build to finish then hide the progress bar + self.build_process.proc.join() + tend = datetime.now() + log.info(f"VM {self.get_id()} build took {tend - tstart}s") + self.progress_bar.hide() + + # Check if the VM was built successfully + if self.build_process.proc.exitcode != 0: + log.error(f"Failed to build VM {self.get_id()}") + GLib.idle_add(self._vm_status_changed_task) + return + log.info(f"Successfully built VM {self.get_id()}") + + # Start the VM + self.vm_process = spawn( + on_except=None, + out_file=Path(str(self.log_dir.name)) / "vm.log", + func=vms.run.run_vm, + vm=self.data.flake.vm, + cachedir=log_dir, + socketdir=log_dir, + ) + log.debug(f"Started VM {self.get_id()}") + GLib.idle_add(self._vm_status_changed_task) + + # Start the logs watcher + self._logs_id = GLib.timeout_add(50, self._get_logs_task, self.vm_process) + if self._logs_id == 0: + log.error("Failed to start VM log watcher") + log.debug(f"Starting logs watcher on file: {self.vm_process.out_file}") + + # Wait for the VM to stop + self.vm_process.proc.join() + log.debug(f"VM {self.get_id()} has stopped") + GLib.idle_add(self._vm_status_changed_task) + + def on_logs_changed( + self, + monitor: Gio.FileMonitor, + file: Gio.File, + other_file: Gio.File, + event_type: Gio.FileMonitorEvent, + ) -> None: + if event_type == Gio.FileMonitorEvent.CHANGES_DONE_HINT: + # File was changed and the changes were written to disk + # wire up the callback for setting the logs + self.build_log_cb(file) + + def start(self) -> None: + if self.is_running(): + log.warn("VM is already running. Ignoring start request") + self.emit("vm_status_changed", self) + return + log.debug(f"VM state dir {self.log_dir.name}") + self._start_thread = threading.Thread(target=self.__start) + self._start_thread.start() + + def _get_logs_task(self, proc: MPProcess) -> bool: + if not proc.out_file.exists(): + return GLib.SOURCE_CONTINUE + + if not self._log_file: + try: + self._log_file = open(proc.out_file) + except Exception as ex: + log.exception(ex) + self._log_file = None + return GLib.SOURCE_REMOVE + + line = os.read(self._log_file.fileno(), 4096) + if len(line) != 0: + print(line.decode("utf-8"), end="", flush=True) + + if not proc.proc.is_alive(): + log.debug("Removing logs watcher") + self._log_file = None + return GLib.SOURCE_REMOVE + + return GLib.SOURCE_CONTINUE + + def is_running(self) -> bool: + return self._start_thread.is_alive() + + def is_building(self) -> bool: + return self.build_process.proc.is_alive() + + def is_shutting_down(self) -> bool: + return self._stop_thread.is_alive() + + def get_id(self) -> str: + return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}" + + def __stop(self) -> None: + log.info(f"Stopping VM {self.get_id()}") + + start_time = datetime.now() + while self.is_running(): + diff = datetime.now() - start_time + if diff.seconds > self.KILL_TIMEOUT: + log.error( + f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it" + ) + self.vm_process.kill_group() + break + if self.is_building(): + log.info(f"VM {self.get_id()} is still building. Killing it") + self.build_process.kill_group() + break + if not self.machine: + log.error(f"Machine object is None. Killing VM {self.get_id()}") + self.vm_process.kill_group() + break + + # Try to shutdown the VM gracefully using QMP + try: + assert self.qmp_wrap is not None + with self.qmp_wrap.qmp_ctx() as qmp: + qmp.command("system_powerdown") + except Exception as ex: + log.debug(f"QMP command 'system_powerdown' ignored. Error: {ex}") + + # Try 20 times to stop the VM + time.sleep(self.KILL_TIMEOUT / 20) + GLib.idle_add(self._vm_status_changed_task) + log.debug(f"VM {self.get_id()} has stopped") + + ToastOverlay.use().add_toast_unique( + InfoToast(f"Stopped {self.get_id()}").toast, "info.vm.exit" + ) + + def shutdown(self) -> None: + if not self.is_running(): + log.warning("VM not running. Ignoring shutdown request.") + self.emit("vm_status_changed", self) + return + if self.is_shutting_down(): + log.warning("Shutdown already in progress") + self.emit("vm_status_changed", self) + return + self._stop_thread = threading.Thread(target=self.__stop) + self._stop_thread.start() + + def _kill_ref_drop(self) -> None: + if self.is_running(): + log.warning("Killing VM due to reference drop") + self.kill() + + def kill(self) -> None: + if not self.is_running(): + log.warning(f"Tried to kill VM {self.get_id()} is not running") + return + log.info(f"Killing VM {self.get_id()} now") + + if self.vm_process.proc.is_alive(): + self.vm_process.kill_group() + + if self.build_process.proc.is_alive(): + self.build_process.kill_group() + + def read_whole_log(self) -> str: + if not self.vm_process.out_file.exists(): + log.error(f"Log file {self.vm_process.out_file} does not exist") + return "" + return self.vm_process.out_file.read_text() + + def __str__(self) -> str: + return f"VM({self.get_id()})" + + def __repr__(self) -> str: + return self.__str__() diff --git a/pkgs/clan-vm-manager/clan_vm_manager/singletons/__init__.py b/pkgs/clan-vm-manager/clan_vm_manager/singletons/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkgs/clan-vm-manager/clan_vm_manager/singletons/toast.py b/pkgs/clan-vm-manager/clan_vm_manager/singletons/toast.py new file mode 100644 index 00000000..2be797a4 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/singletons/toast.py @@ -0,0 +1,150 @@ +import logging +from collections.abc import Callable +from typing import Any + +import gi + +gi.require_version("Gtk", "4.0") +gi.require_version("Adw", "1") + +from gi.repository import Adw + +from clan_vm_manager.singletons.use_views import ViewStack +from clan_vm_manager.views.logs import Logs + +log = logging.getLogger(__name__) + + +class ToastOverlay: + """ + The ToastOverlay is a class that manages the display of toasts + It should be used as a singleton in your application to prevent duplicate toasts + Usage + """ + + # For some reason, the adw toast overlay cannot be subclassed + # Thats why it is added as a class property + overlay: Adw.ToastOverlay + active_toasts: set[str] + + _instance: "None | ToastOverlay" = None + + def __init__(self) -> None: + raise RuntimeError("Call use() instead") + + @classmethod + def use(cls: Any) -> "ToastOverlay": + if cls._instance is None: + cls._instance = cls.__new__(cls) + cls.overlay = Adw.ToastOverlay() + cls.active_toasts = set() + + return cls._instance + + def add_toast_unique(self, toast: Adw.Toast, key: str) -> None: + if key not in self.active_toasts: + self.active_toasts.add(key) + self.overlay.add_toast(toast) + toast.connect("dismissed", lambda toast: self.active_toasts.remove(key)) + + +class ErrorToast: + toast: Adw.Toast + + def __init__( + self, message: str, persistent: bool = False, details: str = "" + ) -> None: + super().__init__() + self.toast = Adw.Toast.new( + f"""❌ Error {message}""" + ) + self.toast.set_use_markup(True) + + self.toast.set_priority(Adw.ToastPriority.HIGH) + self.toast.set_button_label("Show more") + + if persistent: + self.toast.set_timeout(0) + + views = ViewStack.use().view + + # we cannot check this type, python is not smart enough + logs_view: Logs = views.get_child_by_name("logs") # type: ignore + logs_view.set_message(details) + + self.toast.connect( + "button-clicked", + lambda _: views.set_visible_child_name("logs"), + ) + + +class WarningToast: + toast: Adw.Toast + + def __init__(self, message: str, persistent: bool = False) -> None: + super().__init__() + self.toast = Adw.Toast.new( + f"⚠ Warning {message}" + ) + self.toast.set_use_markup(True) + + self.toast.set_priority(Adw.ToastPriority.NORMAL) + + if persistent: + self.toast.set_timeout(0) + + +class InfoToast: + toast: Adw.Toast + + def __init__(self, message: str, persistent: bool = False) -> None: + super().__init__() + self.toast = Adw.Toast.new(f" {message}") + self.toast.set_use_markup(True) + + self.toast.set_priority(Adw.ToastPriority.NORMAL) + + if persistent: + self.toast.set_timeout(0) + + +class SuccessToast: + toast: Adw.Toast + + def __init__(self, message: str, persistent: bool = False) -> None: + super().__init__() + self.toast = Adw.Toast.new(f" {message}") + self.toast.set_use_markup(True) + + self.toast.set_priority(Adw.ToastPriority.NORMAL) + + if persistent: + self.toast.set_timeout(0) + + +class LogToast: + toast: Adw.Toast + + def __init__( + self, + message: str, + on_button_click: Callable[[], None], + button_label: str = "More", + persistent: bool = False, + ) -> None: + super().__init__() + self.toast = Adw.Toast.new( + f"""Logs are available {message}""" + ) + self.toast.set_use_markup(True) + + self.toast.set_priority(Adw.ToastPriority.NORMAL) + + if persistent: + self.toast.set_timeout(0) + + self.toast.set_button_label(button_label) + self.toast.connect( + "button-clicked", + lambda _: on_button_click(), + ) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_join.py b/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_join.py new file mode 100644 index 00000000..9779c950 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_join.py @@ -0,0 +1,114 @@ +import logging +import threading +from collections.abc import Callable +from typing import Any, ClassVar, cast + +import gi +from clan_cli.clan_uri import ClanURI +from clan_cli.history.add import HistoryEntry, add_history +from clan_cli.machines.machines import Machine + +from clan_vm_manager.components.gkvstore import GKVStore +from clan_vm_manager.singletons.use_vms import ClanStore + +gi.require_version("Gtk", "4.0") +gi.require_version("Adw", "1") +from gi.repository import Gio, GLib, GObject + +log = logging.getLogger(__name__) + + +class JoinValue(GObject.Object): + __gsignals__: ClassVar = { + "join_finished": (GObject.SignalFlags.RUN_FIRST, None, []), + } + + url: ClanURI + entry: HistoryEntry | None + + def _join_finished_task(self) -> bool: + self.emit("join_finished") + return GLib.SOURCE_REMOVE + + def __init__(self, url: ClanURI) -> None: + super().__init__() + self.url: ClanURI = url + self.entry: HistoryEntry | None = None + + def __join(self) -> None: + new_entry = add_history(self.url) + self.entry = new_entry + GLib.idle_add(self._join_finished_task) + + def join(self) -> None: + threading.Thread(target=self.__join).start() + + +class JoinList: + """ + This is a singleton. + It is initialized with the first call of use() + """ + + _instance: "None | JoinList" = None + list_store: Gio.ListStore + + # Make sure the VMS class is used as a singleton + def __init__(self) -> None: + raise RuntimeError("Call use() instead") + + @classmethod + def use(cls: Any) -> "JoinList": + if cls._instance is None: + cls._instance = cls.__new__(cls) + cls.list_store = Gio.ListStore.new(JoinValue) + + ClanStore.use().register_on_deep_change(cls._instance._rerender_join_list) + + return cls._instance + + def _rerender_join_list( + self, source: GKVStore, position: int, removed: int, added: int + ) -> None: + self.list_store.items_changed( + 0, self.list_store.get_n_items(), self.list_store.get_n_items() + ) + + def is_empty(self) -> bool: + return self.list_store.get_n_items() == 0 + + def push(self, uri: ClanURI, after_join: Callable[[JoinValue], None]) -> None: + """ + Add a join request. + This method can add multiple join requests if called subsequently for each request. + """ + + value = JoinValue(uri) + + machine_id = Machine(uri.machine_name, uri.flake) + machine_id_list = [] + + for machine_obj in self.list_store: + mvalue: ClanURI = cast(JoinValue, machine_obj).url + machine = Machine(mvalue.machine_name, mvalue.flake) + machine_id_list.append(machine.get_id()) + + if machine_id in machine_id_list: + log.info(f"Join request already exists: {value.url}. Ignoring.") + return + + value.connect("join_finished", self._on_join_finished) + value.connect("join_finished", after_join) + + self.list_store.append(value) + + def _on_join_finished(self, source: JoinValue) -> None: + log.info(f"Join finished: {source.url}") + self.discard(source) + assert source.entry is not None + ClanStore.use().push_history_entry(source.entry) + + def discard(self, value: JoinValue) -> None: + (has, idx) = self.list_store.find(value) + if has: + self.list_store.remove(idx) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_views.py b/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_views.py new file mode 100644 index 00000000..af0ee4ae --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_views.py @@ -0,0 +1,36 @@ +from typing import Any + +import gi + +gi.require_version("Gtk", "4.0") +gi.require_version("Adw", "1") +from gi.repository import Adw + + +class ViewStack: + """ + This is a singleton. + It is initialized with the first call of use() + + Usage: + + ViewStack.use().set_visible() + + ViewStack.use() can also be called before the data is needed. e.g. to eliminate/reduce waiting time. + + """ + + _instance: "None | ViewStack" = None + view: Adw.ViewStack + + # Make sure the VMS class is used as a singleton + def __init__(self) -> None: + raise RuntimeError("Call use() instead") + + @classmethod + def use(cls: Any) -> "ViewStack": + if cls._instance is None: + cls._instance = cls.__new__(cls) + cls.view = Adw.ViewStack() + + return cls._instance diff --git a/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_vms.py b/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_vms.py new file mode 100644 index 00000000..d88ae7c2 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_vms.py @@ -0,0 +1,183 @@ +import logging +from collections.abc import Callable +from pathlib import Path +from typing import Any, ClassVar + +import gi +from clan_cli.clan_uri import ClanURI +from clan_cli.history.add import HistoryEntry +from clan_cli.machines.machines import Machine + +from clan_vm_manager import assets +from clan_vm_manager.components.gkvstore import GKVStore +from clan_vm_manager.components.vmobj import VMObject +from clan_vm_manager.singletons.use_views import ViewStack +from clan_vm_manager.views.logs import Logs + +gi.require_version("GObject", "2.0") +gi.require_version("Gtk", "4.0") +from gi.repository import Gio, GLib, GObject + +log = logging.getLogger(__name__) + + +class VMStore(GKVStore): + def __init__(self) -> None: + super().__init__(VMObject, lambda vm: vm.data.flake.flake_attr) + + +class Emitter(GObject.GObject): + __gsignals__: ClassVar = { + "is_ready": (GObject.SignalFlags.RUN_FIRST, None, []), + } + + +class ClanStore: + _instance: "None | ClanStore" = None + _clan_store: GKVStore[str, VMStore] + + _emitter: Emitter + + # set the vm that is outputting logs + # build logs are automatically streamed to the logs-view + _logging_vm: VMObject | None = None + + # Make sure the VMS class is used as a singleton + def __init__(self) -> None: + raise RuntimeError("Call use() instead") + + @classmethod + def use(cls: Any) -> "ClanStore": + if cls._instance is None: + cls._instance = cls.__new__(cls) + cls._clan_store = GKVStore( + VMStore, lambda store: store.first().data.flake.flake_url + ) + cls._emitter = Emitter() + + return cls._instance + + def emit(self, signal: str) -> None: + self._emitter.emit(signal) + + def connect(self, signal: str, cb: Callable[(...), Any]) -> None: + self._emitter.connect(signal, cb) + + def set_logging_vm(self, ident: str) -> VMObject | None: + vm = self.get_vm(ClanURI(f"clan://{ident}")) + if vm is not None: + self._logging_vm = vm + + return self._logging_vm + + def register_on_deep_change( + self, callback: Callable[[GKVStore, int, int, int], None] + ) -> None: + """ + Register a callback that is called when a clan_store or one of the included VMStores changes + """ + + def on_vmstore_change( + store: VMStore, position: int, removed: int, added: int + ) -> None: + callback(store, position, removed, added) + + def on_clanstore_change( + store: "GKVStore", position: int, removed: int, added: int + ) -> None: + if added > 0: + store.values()[position].register_on_change(on_vmstore_change) + callback(store, position, removed, added) + + self.clan_store.register_on_change(on_clanstore_change) + + @property + def clan_store(self) -> GKVStore[str, VMStore]: + return self._clan_store + + def create_vm_task(self, vm: HistoryEntry) -> bool: + self.push_history_entry(vm) + return GLib.SOURCE_REMOVE + + def push_history_entry(self, entry: HistoryEntry) -> None: + # TODO: We shouldn't do this here but in the list view + if entry.flake.icon is None: + icon: Path = assets.loc / "placeholder.jpeg" + else: + icon = Path(entry.flake.icon) + + def log_details(gfile: Gio.File) -> None: + self.log_details(vm, gfile) + + vm = VMObject(icon=icon, data=entry, build_log_cb=log_details) + self.push(vm) + + def log_details(self, vm: VMObject, gfile: Gio.File) -> None: + views = ViewStack.use().view + logs_view: Logs = views.get_child_by_name("logs") # type: ignore + + def file_read_callback( + source_object: Gio.File, result: Gio.AsyncResult, _user_data: Any + ) -> None: + try: + # Finish the asynchronous read operation + res = source_object.load_contents_finish(result) + _success, contents, _etag_out = res + + # Convert the byte array to a string and print it + logs_view.set_message(contents.decode("utf-8")) + except Exception as e: + print(f"Error reading file: {e}") + + # only one vm can output logs at a time + if vm == self._logging_vm: + gfile.load_contents_async(None, file_read_callback, None) + + # we cannot check this type, python is not smart enough + + def push(self, vm: VMObject) -> None: + url = str(vm.data.flake.flake_url) + + # Only write to the store if the Clan is not already in it + # Every write to the KVStore rerenders bound widgets to the clan_store + if url not in self.clan_store: + log.debug(f"Creating new VMStore for {url}") + vm_store = VMStore() + vm_store.append(vm) + self.clan_store[url] = vm_store + else: + vm_store = self.clan_store[url] + machine = vm.data.flake.flake_attr + old_vm = vm_store.get(machine) + + if old_vm: + log.info( + f"VM {vm.data.flake.flake_attr} already exists in store. Updating data field." + ) + old_vm.update(vm.data) + else: + log.debug(f"Appending VM {vm.data.flake.flake_attr} to store") + vm_store.append(vm) + + def remove(self, vm: VMObject) -> None: + del self.clan_store[str(vm.data.flake.flake_url)][vm.data.flake.flake_attr] + + def get_vm(self, uri: ClanURI) -> None | VMObject: + flake_id = Machine(uri.machine_name, uri.flake).get_id() + vm_store = self.clan_store.get(flake_id) + if vm_store is None: + return None + machine = vm_store.get(uri.machine_name, None) + return machine + + def get_running_vms(self) -> list[VMObject]: + return [ + vm + for clan in self.clan_store.values() + for vm in clan.values() + if vm.is_running() + ] + + def kill_all(self) -> None: + for vm in self.get_running_vms(): + vm.kill() diff --git a/pkgs/clan-vm-manager/clan_vm_manager/views/__init__.py b/pkgs/clan-vm-manager/clan_vm_manager/views/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkgs/clan-vm-manager/clan_vm_manager/views/details.py b/pkgs/clan-vm-manager/clan_vm_manager/views/details.py new file mode 100644 index 00000000..c9ec2f93 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/views/details.py @@ -0,0 +1,61 @@ +import os +from collections.abc import Callable +from functools import partial +from typing import Any, Literal, TypeVar + +import gi + +gi.require_version("Adw", "1") +from gi.repository import Adw, Gio, GObject, Gtk + +# Define a TypeVar that is bound to GObject.Object +ListItem = TypeVar("ListItem", bound=GObject.Object) + + +def create_details_list( + model: Gio.ListStore, render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget] +) -> Gtk.ListBox: + boxed_list = Gtk.ListBox() + boxed_list.set_selection_mode(Gtk.SelectionMode.NONE) + boxed_list.add_css_class("boxed-list") + boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list)) + return boxed_list + + +class PreferencesValue(GObject.Object): + variant: Literal["CPU", "MEMORY"] + editable: bool + data: Any + + def __init__( + self, variant: Literal["CPU", "MEMORY"], editable: bool, data: Any + ) -> None: + super().__init__() + self.variant = variant + self.editable = editable + self.data = data + + +class Details(Gtk.Box): + def __init__(self) -> None: + super().__init__(orientation=Gtk.Orientation.VERTICAL) + + preferences_store = Gio.ListStore.new(PreferencesValue) + preferences_store.append(PreferencesValue("CPU", True, 1)) + + self.details_list = create_details_list( + model=preferences_store, render_row=self.render_entry_row + ) + + self.append(self.details_list) + + def render_entry_row( + self, boxed_list: Gtk.ListBox, item: PreferencesValue + ) -> Gtk.Widget: + cores: int | None = os.cpu_count() + fcores = float(cores) if cores else 1.0 + + row = Adw.SpinRow.new_with_range(0, fcores, 1) + row.set_value(item.data) + + return row diff --git a/pkgs/clan-vm-manager/clan_vm_manager/views/list.py b/pkgs/clan-vm-manager/clan_vm_manager/views/list.py new file mode 100644 index 00000000..65444fb4 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/views/list.py @@ -0,0 +1,356 @@ +import base64 +import logging +from collections.abc import Callable +from functools import partial +from typing import Any, TypeVar + +import gi +from clan_cli.clan_uri import ClanURI + +from clan_vm_manager.components.gkvstore import GKVStore +from clan_vm_manager.components.interfaces import ClanConfig +from clan_vm_manager.components.list_splash import EmptySplash +from clan_vm_manager.components.vmobj import VMObject +from clan_vm_manager.singletons.toast import ( + LogToast, + SuccessToast, + ToastOverlay, + WarningToast, +) +from clan_vm_manager.singletons.use_join import JoinList, JoinValue +from clan_vm_manager.singletons.use_views import ViewStack +from clan_vm_manager.singletons.use_vms import ClanStore, VMStore +from clan_vm_manager.views.logs import Logs + +gi.require_version("Adw", "1") +from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk + +log = logging.getLogger(__name__) + +ListItem = TypeVar("ListItem", bound=GObject.Object) +CustomStore = TypeVar("CustomStore", bound=Gio.ListModel) + + +def create_boxed_list( + model: CustomStore, + render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget], +) -> Gtk.ListBox: + boxed_list = Gtk.ListBox() + boxed_list.set_selection_mode(Gtk.SelectionMode.NONE) + boxed_list.add_css_class("boxed-list") + boxed_list.add_css_class("no-shadow") + + boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list)) + return boxed_list + + +class ClanList(Gtk.Box): + """ + The ClanList + Is the composition of + the ClanListToolbar + the clanListView + # ------------------------ # + # - Tools < Edit> # + # ------------------------ # + # - List Items + # - <...> + # ------------------------# + """ + + def __init__(self, config: ClanConfig) -> None: + super().__init__(orientation=Gtk.Orientation.VERTICAL) + + app = Gio.Application.get_default() + assert app is not None + app.connect("join_request", self.on_join_request) + + self.log_label: Gtk.Label = Gtk.Label() + + # Add join list + self.join_boxed_list = create_boxed_list( + model=JoinList.use().list_store, render_row=self.render_join_row + ) + self.join_boxed_list.add_css_class("join-list") + self.append(self.join_boxed_list) + + clan_store = ClanStore.use() + clan_store.connect("is_ready", self.display_splash) + + self.group_list = create_boxed_list( + model=clan_store.clan_store, render_row=self.render_group_row + ) + self.group_list.add_css_class("group-list") + self.append(self.group_list) + + self.splash = EmptySplash(on_join=lambda x: self.on_join_request(x, x)) + + def display_splash(self, source: GKVStore) -> None: + print("Displaying splash") + if ( + ClanStore.use().clan_store.get_n_items() == 0 + and JoinList.use().list_store.get_n_items() == 0 + ): + self.append(self.splash) + + def render_group_row( + self, boxed_list: Gtk.ListBox, vm_store: VMStore + ) -> Gtk.Widget: + self.remove(self.splash) + + vm = vm_store.first() + log.debug("Rendering group row for %s", vm.data.flake.flake_url) + + grp = Adw.PreferencesGroup() + grp.set_title(vm.data.flake.clan_name) + grp.set_description(vm.data.flake.flake_url) + + add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s")) + add_action.connect("activate", self.on_add) + app = Gio.Application.get_default() + assert app is not None + app.add_action(add_action) + + # menu_model = Gio.Menu() + # TODO: Make this lazy, blocks UI startup for too long + # for vm in machines.list.list_machines(flake_url=vm.data.flake.flake_url): + # if vm not in vm_store: + # menu_model.append(vm, f"app.add::{vm}") + + box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5) + box.set_valign(Gtk.Align.CENTER) + + add_button = Gtk.Button() + add_button_content = Adw.ButtonContent.new() + add_button_content.set_label("Add machine") + add_button_content.set_icon_name("list-add-symbolic") + add_button.add_css_class("flat") + add_button.set_child(add_button_content) + + # add_button.set_has_frame(False) + # add_button.set_menu_model(menu_model) + # add_button.set_label("Add machine") + box.append(add_button) + + grp.set_header_suffix(box) + + vm_list = create_boxed_list(model=vm_store, render_row=self.render_vm_row) + grp.add(vm_list) + + return grp + + def on_add(self, source: Any, parameter: Any) -> None: + target = parameter.get_string() + print("Adding new machine", target) + + def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VMObject) -> Gtk.Widget: + # Remove no-shadow class if attached + if boxed_list.has_css_class("no-shadow"): + boxed_list.remove_css_class("no-shadow") + flake = vm.data.flake + row = Adw.ActionRow() + + # ====== Display Avatar ====== + avatar = Adw.Avatar() + machine_icon = flake.vm.machine_icon + + # If there is a machine icon, display it else + # display the clan icon + if machine_icon: + avatar.set_custom_image(Gdk.Texture.new_from_filename(str(machine_icon))) + elif flake.icon: + avatar.set_custom_image(Gdk.Texture.new_from_filename(str(flake.icon))) + else: + avatar.set_text(flake.clan_name + " " + flake.flake_attr) + + avatar.set_show_initials(True) + avatar.set_size(50) + row.add_prefix(avatar) + + # ====== Display Name And Url ===== + row.set_title(flake.flake_attr) + row.set_title_lines(1) + row.set_title_selectable(True) + + # If there is a machine description, display it else + # display the clan name + if flake.vm.machine_description: + row.set_subtitle(flake.vm.machine_description) + else: + row.set_subtitle(flake.clan_name) + row.set_subtitle_lines(1) + + # ==== Display build progress bar ==== + build_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5) + build_box.set_valign(Gtk.Align.CENTER) + build_box.append(vm.progress_bar) + build_box.set_homogeneous(False) + row.add_suffix(build_box) # This allows children to have different sizes + + # ==== Action buttons ==== + button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5) + button_box.set_valign(Gtk.Align.CENTER) + + ## Drop down menu + open_action = Gio.SimpleAction.new("edit", GLib.VariantType.new("s")) + open_action.connect("activate", self.on_edit) + + action_id = base64.b64encode(vm.get_id().encode("utf-8")).decode("utf-8") + + build_logs_action = Gio.SimpleAction.new( + f"logs.{action_id}", GLib.VariantType.new("s") + ) + + build_logs_action.connect("activate", self.on_show_build_logs) + build_logs_action.set_enabled(False) + + app = Gio.Application.get_default() + assert app is not None + + app.add_action(open_action) + app.add_action(build_logs_action) + + # set a callback function for conditionally enabling the build_logs action + def on_vm_build_notify( + vm: VMObject, is_building: bool, is_running: bool + ) -> None: + build_logs_action.set_enabled(is_building or is_running) + app.add_action(build_logs_action) + if is_building: + ToastOverlay.use().add_toast_unique( + LogToast( + """Build process running ...""", + on_button_click=lambda: self.show_vm_build_logs(vm.get_id()), + ).toast, + f"info.build.running.{vm}", + ) + + vm.connect("vm_build_notify", on_vm_build_notify) + + menu_model = Gio.Menu() + menu_model.append("Edit", f"app.edit::{vm.get_id()}") + menu_model.append("Show Logs", f"app.logs.{action_id}::{vm.get_id()}") + + pref_button = Gtk.MenuButton() + pref_button.set_icon_name("open-menu-symbolic") + pref_button.set_menu_model(menu_model) + + button_box.append(pref_button) + + ## VM switch button + switch_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) + switch_box.set_valign(Gtk.Align.CENTER) + switch_box.append(vm.switch) + button_box.append(switch_box) + + row.add_suffix(button_box) + + return row + + def on_edit(self, source: Any, parameter: Any) -> None: + target = parameter.get_string() + print("Editing settings for machine", target) + + def on_show_build_logs(self, _: Any, parameter: Any) -> None: + target = parameter.get_string() + self.show_vm_build_logs(target) + + def show_vm_build_logs(self, target: str) -> None: + vm = ClanStore.use().set_logging_vm(target) + if vm is None: + raise ValueError(f"VM {target} not found") + + views = ViewStack.use().view + # Reset the logs view + logs: Logs = views.get_child_by_name("logs") # type: ignore + + if logs is None: + raise ValueError("Logs view not found") + + name = vm.machine.name if vm.machine else "Unknown" + + logs.set_title(f"""📄 {name}""") + # initial message. Streaming happens automatically when the file is changed by the build process + with open(vm.build_process.out_file) as f: + logs.set_message(f.read()) + + views.set_visible_child_name("logs") + + def render_join_row( + self, boxed_list: Gtk.ListBox, join_val: JoinValue + ) -> Gtk.Widget: + if boxed_list.has_css_class("no-shadow"): + boxed_list.remove_css_class("no-shadow") + + log.debug("Rendering join row for %s", join_val.url) + + row = Adw.ActionRow() + row.set_title(join_val.url.machine_name) + row.set_subtitle(str(join_val.url)) + row.add_css_class("trust") + + vm = ClanStore.use().get_vm(join_val.url) + + # Can't do this here because clan store is empty at this point + if vm is not None: + sub = row.get_subtitle() + assert sub is not None + + ToastOverlay.use().add_toast_unique( + WarningToast( + f"""{join_val.url.machine_name!s} Already exists. Joining again will update it""" + ).toast, + "warning.duplicate.join", + ) + + row.set_subtitle( + sub + "\nClan already exists. Joining again will update it" + ) + + avatar = Adw.Avatar() + avatar.set_text(str(join_val.url.machine_name)) + avatar.set_show_initials(True) + avatar.set_size(50) + row.add_prefix(avatar) + + cancel_button = Gtk.Button(label="Cancel") + cancel_button.add_css_class("error") + cancel_button.connect("clicked", partial(self.on_discard_clicked, join_val)) + self.cancel_button = cancel_button + + trust_button = Gtk.Button(label="Join") + trust_button.add_css_class("success") + trust_button.connect("clicked", partial(self.on_trust_clicked, join_val)) + + box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5) + box.set_valign(Gtk.Align.CENTER) + box.append(cancel_button) + box.append(trust_button) + + row.add_suffix(box) + + return row + + def on_join_request(self, source: Any, url: str) -> None: + log.debug("Join request: %s", url) + clan_uri = ClanURI(url) + JoinList.use().push(clan_uri, self.on_after_join) + + def on_after_join(self, source: JoinValue) -> None: + ToastOverlay.use().add_toast_unique( + SuccessToast(f"Updated {source.url.machine_name}").toast, + "success.join", + ) + # If the join request list is empty disable the shadow artefact + if JoinList.use().is_empty(): + self.join_boxed_list.add_css_class("no-shadow") + + def on_trust_clicked(self, value: JoinValue, source: Gtk.Widget) -> None: + source.set_sensitive(False) + self.cancel_button.set_sensitive(False) + value.join() + + def on_discard_clicked(self, value: JoinValue, source: Gtk.Widget) -> None: + JoinList.use().discard(value) + if JoinList.use().is_empty(): + self.join_boxed_list.add_css_class("no-shadow") diff --git a/pkgs/clan-vm-manager/clan_vm_manager/views/logs.py b/pkgs/clan-vm-manager/clan_vm_manager/views/logs.py new file mode 100644 index 00000000..f7fb804f --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/views/logs.py @@ -0,0 +1,65 @@ +import logging + +import gi + +gi.require_version("Adw", "1") +from gi.repository import Adw, Gio, Gtk + +from clan_vm_manager.singletons.use_views import ViewStack + +log = logging.getLogger(__name__) + + +class Logs(Gtk.Box): + """ + Simple log view + This includes a banner and a text view and a button to close the log and navigate back to the overview + """ + + def __init__(self) -> None: + super().__init__(orientation=Gtk.Orientation.VERTICAL) + + app = Gio.Application.get_default() + assert app is not None + + self.banner = Adw.Banner.new("") + self.banner.set_use_markup(True) + self.banner.set_revealed(True) + self.banner.set_button_label("Close") + + self.banner.connect( + "button-clicked", + lambda _: ViewStack.use().view.set_visible_child_name("list"), + ) + + self.text_view = Gtk.TextView() + self.text_view.set_editable(False) + self.text_view.set_wrap_mode(Gtk.WrapMode.WORD) + self.text_view.add_css_class("log-view") + + self.append(self.banner) + self.append(self.text_view) + + def set_title(self, title: str) -> None: + self.banner.set_title(title) + + def set_message(self, message: str) -> None: + """ + Set the log message. This will delete any previous message + """ + buffer = self.text_view.get_buffer() + buffer.set_text(message) + + mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore + self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0) + + def append_message(self, message: str) -> None: + """ + Append to the end of a potentially existent log message + """ + buffer = self.text_view.get_buffer() + end_iter = buffer.get_end_iter() + buffer.insert(end_iter, message) # type: ignore + + mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore + self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/views/webview.py b/pkgs/clan-vm-manager/clan_vm_manager/views/webview.py new file mode 100644 index 00000000..4d8e9cd6 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/views/webview.py @@ -0,0 +1,156 @@ +import dataclasses +import json +import logging +import sys +import threading +from collections.abc import Callable +from pathlib import Path +from threading import Lock +from typing import Any + +import gi +from clan_cli.api import API + +gi.require_version("WebKit", "6.0") + +from gi.repository import GLib, WebKit + +site_index: Path = ( + Path(sys.argv[0]).absolute() + / Path("../..") + / Path("clan_vm_manager/.webui/index.html") +).resolve() + +log = logging.getLogger(__name__) + + +def dataclass_to_dict(obj: Any) -> Any: + """ + Utility function to convert dataclasses to dictionaries + It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries + + It does NOT convert member functions. + """ + if dataclasses.is_dataclass(obj): + return {k: dataclass_to_dict(v) for k, v in dataclasses.asdict(obj).items()} + elif isinstance(obj, list | tuple): + return [dataclass_to_dict(item) for item in obj] + elif isinstance(obj, dict): + return {k: dataclass_to_dict(v) for k, v in obj.items()} + else: + return obj + + +class WebView: + def __init__(self, methods: dict[str, Callable]) -> None: + self.method_registry: dict[str, Callable] = methods + + self.webview = WebKit.WebView() + + settings = self.webview.get_settings() + # settings. + settings.set_property("enable-developer-extras", True) + self.webview.set_settings(settings) + + self.manager = self.webview.get_user_content_manager() + # Can be called with: window.webkit.messageHandlers.gtk.postMessage("...") + # Important: it seems postMessage must be given some payload, otherwise it won't trigger the event + self.manager.register_script_message_handler("gtk") + self.manager.connect("script-message-received", self.on_message_received) + + self.webview.load_uri(f"file://{site_index}") + + # global mutex lock to ensure functions run sequentially + self.mutex_lock = Lock() + self.queue_size = 0 + + def on_message_received( + self, user_content_manager: WebKit.UserContentManager, message: Any + ) -> None: + payload = json.loads(message.to_json(0)) + method_name = payload["method"] + handler_fn = self.method_registry[method_name] + + log.debug(f"Received message: {payload}") + log.debug(f"Queue size: {self.queue_size} (Wait)") + + def threaded_wrapper() -> bool: + """ + Ensures only one function is executed at a time + + Wait until there is no other function acquiring the global lock. + + Starts a thread with the potentially long running API function within. + """ + if not self.mutex_lock.locked(): + thread = threading.Thread( + target=self.threaded_handler, + args=( + handler_fn, + payload.get("data"), + method_name, + ), + ) + thread.start() + return GLib.SOURCE_REMOVE + + return GLib.SOURCE_CONTINUE + + GLib.idle_add( + threaded_wrapper, + ) + self.queue_size += 1 + + def threaded_handler( + self, + handler_fn: Callable[ + ..., + Any, + ], + data: dict[str, Any] | None, + method_name: str, + ) -> None: + with self.mutex_lock: + log.debug("Executing... ", method_name) + log.debug(f"{data}") + if data is None: + result = handler_fn() + else: + reconciled_arguments = {} + for k, v in data.items(): + # Some functions expect to be called with dataclass instances + # But the js api returns dictionaries. + # Introspect the function and create the expected dataclass from dict dynamically + # Depending on the introspected argument_type + arg_type = API.get_method_argtype(method_name, k) + if dataclasses.is_dataclass(arg_type): + reconciled_arguments[k] = arg_type(**v) + else: + reconciled_arguments[k] = v + + result = handler_fn(**reconciled_arguments) + + serialized = json.dumps(dataclass_to_dict(result)) + + # Use idle_add to queue the response call to js on the main GTK thread + GLib.idle_add(self.return_data_to_js, method_name, serialized) + self.queue_size -= 1 + log.debug(f"Done: Remaining queue size: {self.queue_size}") + + def return_data_to_js(self, method_name: str, serialized: str) -> bool: + # This function must be run on the main GTK thread to interact with the webview + # result = method_fn(data) # takes very long + # serialized = result + self.webview.evaluate_javascript( + f""" + window.clan.{method_name}(`{serialized}`); + """, + -1, + None, + None, + None, + ) + return GLib.SOURCE_REMOVE + + def get_webview(self) -> WebKit.WebView: + return self.webview diff --git a/pkgs/clan-vm-manager/clan_vm_manager/windows/__init__.py b/pkgs/clan-vm-manager/clan_vm_manager/windows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkgs/clan-vm-manager/clan_vm_manager/windows/main_window.py b/pkgs/clan-vm-manager/clan_vm_manager/windows/main_window.py new file mode 100644 index 00000000..92dcf279 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/windows/main_window.py @@ -0,0 +1,92 @@ +import logging +import threading + +import gi +from clan_cli.api import API +from clan_cli.history.list import list_history + +from clan_vm_manager.components.interfaces import ClanConfig +from clan_vm_manager.singletons.toast import ToastOverlay +from clan_vm_manager.singletons.use_views import ViewStack +from clan_vm_manager.singletons.use_vms import ClanStore +from clan_vm_manager.views.details import Details +from clan_vm_manager.views.list import ClanList +from clan_vm_manager.views.logs import Logs +from clan_vm_manager.views.webview import WebView + +gi.require_version("Adw", "1") + +from gi.repository import Adw, Gio, GLib, Gtk + +from clan_vm_manager.components.trayicon import TrayIcon + +log = logging.getLogger(__name__) + + +class MainWindow(Adw.ApplicationWindow): + def __init__(self, config: ClanConfig) -> None: + super().__init__() + self.set_title("Clan Manager") + self.set_default_size(980, 850) + + overlay = ToastOverlay.use().overlay + view = Adw.ToolbarView() + overlay.set_child(view) + + self.set_content(overlay) + + header = Adw.HeaderBar() + view.add_top_bar(header) + + app = Gio.Application.get_default() + assert app is not None + self.tray_icon: TrayIcon = TrayIcon(app) + + # Initialize all ClanStore + threading.Thread(target=self._populate_vms).start() + + # Initialize all views + stack_view = ViewStack.use().view + + clamp = Adw.Clamp() + clamp.set_child(stack_view) + clamp.set_maximum_size(1000) + + scroll = Gtk.ScrolledWindow() + scroll.set_propagate_natural_height(True) + scroll.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) + scroll.set_child(clamp) + + stack_view.add_named(ClanList(config), "list") + stack_view.add_named(Details(), "details") + stack_view.add_named(Logs(), "logs") + + webview = WebView(methods=API._registry) + stack_view.add_named(webview.get_webview(), "webview") + + stack_view.set_visible_child_name(config.initial_view) + + view.set_content(scroll) + + self.connect("destroy", self.on_destroy) + + def _set_clan_store_ready(self) -> bool: + ClanStore.use().emit("is_ready") + return GLib.SOURCE_REMOVE + + def _populate_vms(self) -> None: + # Execute `clan flakes add ` to democlan for this to work + # TODO: Make list_history a generator function + for entry in list_history(): + GLib.idle_add(ClanStore.use().create_vm_task, entry) + + GLib.idle_add(self._set_clan_store_ready) + + def kill_vms(self) -> None: + log.debug("Killing all VMs") + ClanStore.use().kill_all() + + def on_destroy(self, source: "Adw.ApplicationWindow") -> None: + log.info("====Destroying Adw.ApplicationWindow===") + ClanStore.use().kill_all() + self.tray_icon.destroy() diff --git a/pkgs/clan-vm-manager/default.nix b/pkgs/clan-vm-manager/default.nix new file mode 100644 index 00000000..2bf0a501 --- /dev/null +++ b/pkgs/clan-vm-manager/default.nix @@ -0,0 +1,172 @@ +{ + python3, + runCommand, + setuptools, + copyDesktopItems, + pygobject3, + wrapGAppsHook, + gtk4, + gnome, + pygobject-stubs, + gobject-introspection, + clan-cli, + makeDesktopItem, + libadwaita, + webkitgtk_6_0, + pytest, # Testing framework + pytest-cov, # Generate coverage reports + pytest-subprocess, # fake the real subprocess behavior to make your tests more independent. + pytest-xdist, # Run tests in parallel on multiple cores + pytest-timeout, # Add timeouts to your tests + webview-ui, + fontconfig, +}: +let + source = ./.; + desktop-file = makeDesktopItem { + name = "org.clan.vm-manager"; + exec = "clan-vm-manager %u"; + icon = ./clan_vm_manager/assets/clan_white.png; + desktopName = "Clan Manager"; + startupWMClass = "clan"; + mimeTypes = [ "x-scheme-handler/clan" ]; + }; + + # Dependencies that are directly used in the project but nor from internal python packages + externalPythonDeps = [ + pygobject3 + pygobject-stubs + gtk4 + libadwaita + webkitgtk_6_0 + gnome.adwaita-icon-theme + ]; + + # Deps including python packages from the local project + allPythonDeps = [ (python3.pkgs.toPythonModule clan-cli) ] ++ externalPythonDeps; + + # Runtime binary dependencies required by the application + runtimeDependencies = [ + + ]; + + # Dependencies required for running tests + externalTestDeps = + externalPythonDeps + ++ runtimeDependencies + ++ [ + pytest # Testing framework + pytest-cov # Generate coverage reports + pytest-subprocess # fake the real subprocess behavior to make your tests more independent. + pytest-xdist # Run tests in parallel on multiple cores + pytest-timeout # Add timeouts to your tests + ]; + + # Dependencies required for running tests + testDependencies = runtimeDependencies ++ allPythonDeps ++ externalTestDeps; + + # Setup Python environment with all dependencies for running tests + pythonWithTestDeps = python3.withPackages (_ps: testDependencies); +in +python3.pkgs.buildPythonApplication rec { + name = "clan-vm-manager"; + src = source; + format = "pyproject"; + + makeWrapperArgs = [ + "--set FONTCONFIG_FILE ${fontconfig.out}/etc/fonts/fonts.conf" + # This prevents problems with mixed glibc versions that might occur when the + # cli is called through a browser built against another glibc + "--unset LD_LIBRARY_PATH" + ]; + + # Deps needed only at build time + nativeBuildInputs = [ + setuptools + copyDesktopItems + wrapGAppsHook + + gobject-introspection + ]; + + # The necessity of setting buildInputs and propagatedBuildInputs to the + # same values for your Python package within Nix largely stems from ensuring + # that all necessary dependencies are consistently available both + # at build time and runtime, + buildInputs = allPythonDeps ++ runtimeDependencies; + propagatedBuildInputs = allPythonDeps ++ runtimeDependencies; + + # also re-expose dependencies so we test them in CI + passthru = { + tests = { + clan-vm-manager-pytest = + runCommand "clan-vm-manager-pytest" { inherit buildInputs propagatedBuildInputs nativeBuildInputs; } + '' + cp -r ${source} ./src + chmod +w -R ./src + cd ./src + + export FONTCONFIG_FILE=${fontconfig.out}/etc/fonts/fonts.conf + export FONTCONFIG_PATH=${fontconfig.out}/etc/fonts + + mkdir -p .home/.local/share/fonts + export HOME=.home + + fc-cache --verbose + # > fc-cache succeded + + echo "Loaded the following fonts ..." + fc-list + + echo "STARTING ..." + export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1 + ${pythonWithTestDeps}/bin/python -m pytest -s -m "not impure" ./tests + touch $out + ''; + + clan-vm-manager-no-breakpoints = runCommand "clan-vm-manager-no-breakpoints" { } '' + if grep --include \*.py -Rq "breakpoint()" ${source}; then + echo "breakpoint() found in ${source}:" + grep --include \*.py -Rn "breakpoint()" ${source} + exit 1 + fi + touch $out + ''; + }; + }; + + # Additional pass-through attributes + passthru.desktop-file = desktop-file; + passthru.externalPythonDeps = externalPythonDeps; + passthru.externalTestDeps = externalTestDeps; + passthru.runtimeDependencies = runtimeDependencies; + passthru.testDependencies = testDependencies; + + # TODO: place webui in lib/python3.11/site-packages/clan_vm_manager + postInstall = '' + mkdir -p $out/clan_vm_manager/.webui + cp -r ${webview-ui}/lib/node_modules/@clan/webview-ui/dist/* $out/clan_vm_manager/.webui + ''; + + # Don't leak python packages into a devshell. + # It can be very confusing if you `nix run` than load the cli from the devshell instead. + postFixup = '' + rm $out/nix-support/propagated-build-inputs + ''; + checkPhase = '' + export FONTCONFIG_FILE=${fontconfig.out}/etc/fonts/fonts.conf + export FONTCONFIG_PATH=${fontconfig.out}/etc/fonts + + mkdir -p .home/.local/share/fonts + export HOME=.home + + fc-cache --verbose + # > fc-cache succeded + + echo "Loaded the following fonts ..." + fc-list + + PYTHONPATH= $out/bin/clan-vm-manager --help + ''; + desktopItems = [ desktop-file ]; +} diff --git a/pkgs/clan-vm-manager/demo.sh b/pkgs/clan-vm-manager/demo.sh new file mode 100755 index 00000000..4574f840 --- /dev/null +++ b/pkgs/clan-vm-manager/demo.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash + +set -e -o pipefail + +check_git_tag() { + local repo_path="$1" + local target_tag="$2" + + # Change directory to the specified Git repository + pushd "$repo_path" > /dev/null 2>&1 + # shellcheck disable=SC2181 + if [ $? -ne 0 ]; then + echo "Error: Failed to change directory to $repo_path" + return 1 + fi + + # Get the current Git tag + local current_tag + current_tag=$(git describe --tags --exact-match 2>/dev/null) + + # Restore the original directory + popd > /dev/null 2>&1 + + # Check if the current tag is 2.0 + if [ "$current_tag" = "$target_tag" ]; then + echo "Current Git tag in $repo_path is $target_tag" + else + echo "Error: Current Git tag in $repo_path is not $target_tag" + exit 1 + fi +} + + + +if [ -z "$1" ]; then + echo "Usage: $0 " + exit 1 +fi + +democlan="$1" + +check_git_tag "$democlan" "v2.2" + +check_git_tag "." "demo-v2.3" + +rm -rf ~/.config/clan + +clan history add "clan://$democlan#localsend-wayland1" + +clear +cat << EOF +Open up this link in a browser: +"clan://$democlan#localsend-wayland2" +EOF diff --git a/pkgs/clan-vm-manager/flake-module.nix b/pkgs/clan-vm-manager/flake-module.nix new file mode 100644 index 00000000..f374a95f --- /dev/null +++ b/pkgs/clan-vm-manager/flake-module.nix @@ -0,0 +1,24 @@ +{ ... }: +{ + perSystem = + { + config, + pkgs, + lib, + system, + ... + }: + if lib.elem system lib.platforms.darwin then + { } + else + { + devShells.clan-vm-manager = pkgs.callPackage ./shell.nix { + inherit (config.packages) clan-vm-manager webview-ui; + }; + packages.clan-vm-manager = pkgs.python3.pkgs.callPackage ./default.nix { + inherit (config.packages) clan-cli webview-ui; + }; + + checks = config.packages.clan-vm-manager.tests; + }; +} diff --git a/pkgs/clan-vm-manager/install-desktop.sh b/pkgs/clan-vm-manager/install-desktop.sh new file mode 100755 index 00000000..44583978 --- /dev/null +++ b/pkgs/clan-vm-manager/install-desktop.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +CLAN=$(nix build .#clan-vm-manager --print-out-paths) + +if ! command -v xdg-mime &> /dev/null; then + echo "Warning: 'xdg-mime' is not available. The desktop file cannot be installed." +fi + +# install desktop file +set -eou pipefail +DESKTOP_FILE_NAME=org.clan.vm-manager.desktop +DESKTOP_DST=~/.local/share/applications/"$DESKTOP_FILE_NAME" +DESKTOP_SRC="$CLAN/share/applications/$DESKTOP_FILE_NAME" +UI_BIN="$CLAN/bin/clan-vm-manager" + +cp -f "$DESKTOP_SRC" "$DESKTOP_DST" +sleep 2 +sed -i "s|Exec=.*clan-vm-manager|Exec=$UI_BIN|" "$DESKTOP_DST" +xdg-mime default "$DESKTOP_FILE_NAME" x-scheme-handler/clan +echo "==== Validating desktop file installation ====" +set -x +desktop-file-validate "$DESKTOP_DST" +set +xeou pipefail diff --git a/pkgs/clan-vm-manager/notes.md b/pkgs/clan-vm-manager/notes.md new file mode 100644 index 00000000..4c512057 --- /dev/null +++ b/pkgs/clan-vm-manager/notes.md @@ -0,0 +1,7 @@ +# Webkit GTK doesn't interop flawless with Solid.js build result + +1. Webkit expects script tag to be in `body` only solid.js puts the in the head. +2. script and css files are loaded with type="module" and crossorigin tags beeing set. WebKit silently fails to load then. +3. Paths to resiources are not allowed to start with "/" because webkit interprets them relative to the system and not the base url. +4. webkit doesn't support native features such as directly handling external urls (i.e opening them in the default browser) +6. Other problems to be found? \ No newline at end of file diff --git a/pkgs/clan-vm-manager/pyproject.toml b/pkgs/clan-vm-manager/pyproject.toml new file mode 100644 index 00000000..3a7e63dc --- /dev/null +++ b/pkgs/clan-vm-manager/pyproject.toml @@ -0,0 +1,48 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + + +[project] +name = "clan-vm-manager" +description = "clan vm manager" +dynamic = ["version"] +scripts = { clan-vm-manager = "clan_vm_manager:main" } + +[project.urls] +Homepage = "https://clan.lol/" +Documentation = "https://docs.clan.lol/" +Repository = "https://git.clan.lol/clan/clan-core" + +[tool.setuptools.packages.find] +exclude = ["result"] + +[tool.setuptools.package-data] +clan_vm_manager = ["**/assets/*"] + +[tool.pytest.ini_options] +testpaths = "tests" +faulthandler_timeout = 60 +log_level = "DEBUG" +log_format = "%(levelname)s: %(message)s\n %(pathname)s:%(lineno)d::%(funcName)s" +addopts = "--cov . --cov-report term --cov-report html:.reports/html --no-cov-on-fail --durations 5 --color=yes --new-first" # Add --pdb for debugging +norecursedirs = "tests/helpers" +markers = ["impure"] + +[tool.mypy] +python_version = "3.11" +warn_redundant_casts = true +disallow_untyped_calls = true +disallow_untyped_defs = true +no_implicit_optional = true + +[[tool.mypy.overrides]] +module = "argcomplete.*" +ignore_missing_imports = true + + +[tool.ruff] +target-version = "py311" +line-length = 88 +lint.select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ] +lint.ignore = ["E501", "E402", "N802", "ANN101", "ANN401", "A003"] diff --git a/pkgs/clan-vm-manager/screenshots/image.png b/pkgs/clan-vm-manager/screenshots/image.png new file mode 100644 index 00000000..a6f7f4a2 Binary files /dev/null and b/pkgs/clan-vm-manager/screenshots/image.png differ diff --git a/pkgs/clan-vm-manager/shell.nix b/pkgs/clan-vm-manager/shell.nix new file mode 100644 index 00000000..4de1f494 --- /dev/null +++ b/pkgs/clan-vm-manager/shell.nix @@ -0,0 +1,62 @@ +{ + lib, + stdenv, + clan-vm-manager, + mkShell, + ruff, + desktop-file-utils, + xdg-utils, + mypy, + python3, + gtk4, + libadwaita, + webview-ui, +}: + +let + devshellTestDeps = + clan-vm-manager.externalTestDeps + ++ (with python3.pkgs; [ + rope + mypy + ipdb + setuptools + wheel + pip + ]); +in +mkShell { + inherit (clan-vm-manager) nativeBuildInputs; + buildInputs = + [ + ruff + gtk4.dev # has the demo called 'gtk4-widget-factory' + libadwaita.devdoc # has the demo called 'adwaita-1-demo' + ] + ++ devshellTestDeps + + # Dependencies for testing for linux hosts + ++ (lib.optionals stdenv.isLinux [ + xdg-utils # install desktop files + desktop-file-utils # verify desktop files + ]); + + PYTHONBREAKPOINT = "ipdb.set_trace"; + + shellHook = '' + export GIT_ROOT=$(git rev-parse --show-toplevel) + export PKG_ROOT=$GIT_ROOT/pkgs/clan-vm-manager + + # Add clan-vm-manager command to PATH + export PATH="$PKG_ROOT/bin":"$PATH" + + # Add clan-cli to the python path so that we can import it without building it in nix first + export PYTHONPATH="$GIT_ROOT/pkgs/clan-cli":"$PYTHONPATH" + + # Add the webview-ui to the .webui directory + rm -rf ./clan_vm_manager/.webui/* + mkdir -p ./clan_vm_manager/.webui + cp -a ${webview-ui}/lib/node_modules/@clan/webview-ui/dist/* ./clan_vm_manager/.webui + chmod -R +w ./clan_vm_manager/.webui + ''; +} diff --git a/pkgs/clan-vm-manager/tests/command.py b/pkgs/clan-vm-manager/tests/command.py new file mode 100644 index 00000000..f951c8dd --- /dev/null +++ b/pkgs/clan-vm-manager/tests/command.py @@ -0,0 +1,64 @@ +import os +import signal +import subprocess +from collections.abc import Iterator +from pathlib import Path +from typing import IO, Any + +import pytest + +_FILE = None | int | IO[Any] + + +class Command: + def __init__(self) -> None: + self.processes: list[subprocess.Popen[str]] = [] + + def run( + self, + command: list[str], + extra_env: dict[str, str] = {}, + stdin: _FILE = None, + stdout: _FILE = None, + stderr: _FILE = None, + workdir: Path | None = None, + ) -> subprocess.Popen[str]: + env = os.environ.copy() + env.update(extra_env) + # We start a new session here so that we can than more reliably kill all childs as well + p = subprocess.Popen( + command, + env=env, + start_new_session=True, + stdout=stdout, + stderr=stderr, + stdin=stdin, + text=True, + cwd=workdir, + ) + self.processes.append(p) + return p + + def terminate(self) -> None: + # Stop in reverse order in case there are dependencies. + # We just kill all processes as quickly as possible because we don't + # care about corrupted state and want to make tests fasts. + for p in reversed(self.processes): + try: + os.killpg(os.getpgid(p.pid), signal.SIGKILL) + except OSError: + pass + + +@pytest.fixture +def command() -> Iterator[Command]: + """ + Starts a background command. The process is automatically terminated in the end. + >>> p = command.run(["some", "daemon"]) + >>> print(p.pid) + """ + c = Command() + try: + yield c + finally: + c.terminate() diff --git a/pkgs/clan-vm-manager/tests/conftest.py b/pkgs/clan-vm-manager/tests/conftest.py new file mode 100644 index 00000000..1841dc81 --- /dev/null +++ b/pkgs/clan-vm-manager/tests/conftest.py @@ -0,0 +1,44 @@ +import subprocess +import sys +from pathlib import Path + +import pytest +from clan_cli.custom_logger import setup_logging +from clan_cli.nix import nix_shell + +sys.path.append(str(Path(__file__).parent / "helpers")) +sys.path.append( + str(Path(__file__).parent.parent) +) # Also add clan vm manager to PYTHONPATH + +pytest_plugins = [ + "temporary_dir", + "root", + "command", + "wayland", +] + + +# Executed on pytest session start +def pytest_sessionstart(session: pytest.Session) -> None: + # This function will be called once at the beginning of the test session + print("Starting pytest session") + # You can access the session config, items, testsfailed, etc. + print(f"Session config: {session.config}") + + setup_logging(level="DEBUG") + + +# fixture for git_repo +@pytest.fixture +def git_repo(tmp_path: Path) -> Path: + # initialize a git repository + cmd = nix_shell(["nixpkgs#git"], ["git", "init"]) + subprocess.run(cmd, cwd=tmp_path, check=True) + # set user.name and user.email + cmd = nix_shell(["nixpkgs#git"], ["git", "config", "user.name", "test"]) + subprocess.run(cmd, cwd=tmp_path, check=True) + cmd = nix_shell(["nixpkgs#git"], ["git", "config", "user.email", "test@test.test"]) + subprocess.run(cmd, cwd=tmp_path, check=True) + # return the path to the git repository + return tmp_path diff --git a/pkgs/clan-vm-manager/tests/helpers/cli.py b/pkgs/clan-vm-manager/tests/helpers/cli.py new file mode 100644 index 00000000..1c2532d5 --- /dev/null +++ b/pkgs/clan-vm-manager/tests/helpers/cli.py @@ -0,0 +1,15 @@ +import logging +import shlex + +from clan_cli.custom_logger import get_caller + +from clan_vm_manager import main + +log = logging.getLogger(__name__) + + +class Cli: + def run(self, args: list[str]) -> None: + cmd = shlex.join(["clan", *args]) + log.debug(f"$ {cmd} \nCaller: {get_caller()}") + main(args) diff --git a/pkgs/clan-vm-manager/tests/root.py b/pkgs/clan-vm-manager/tests/root.py new file mode 100644 index 00000000..0cac067b --- /dev/null +++ b/pkgs/clan-vm-manager/tests/root.py @@ -0,0 +1,35 @@ +import os +from pathlib import Path + +import pytest + +TEST_ROOT = Path(__file__).parent.resolve() +PROJECT_ROOT = TEST_ROOT.parent +if CLAN_CORE_ := os.environ.get("CLAN_CORE"): + CLAN_CORE = Path(CLAN_CORE_) +else: + CLAN_CORE = PROJECT_ROOT.parent.parent + + +@pytest.fixture(scope="session") +def project_root() -> Path: + """ + Root directory the clan-cli + """ + return PROJECT_ROOT + + +@pytest.fixture(scope="session") +def test_root() -> Path: + """ + Root directory of the tests + """ + return TEST_ROOT + + +@pytest.fixture(scope="session") +def clan_core() -> Path: + """ + Directory of the clan-core flake + """ + return CLAN_CORE diff --git a/pkgs/clan-vm-manager/tests/temporary_dir.py b/pkgs/clan-vm-manager/tests/temporary_dir.py new file mode 100644 index 00000000..aaa54ca2 --- /dev/null +++ b/pkgs/clan-vm-manager/tests/temporary_dir.py @@ -0,0 +1,27 @@ +import logging +import os +import tempfile +from collections.abc import Iterator +from pathlib import Path + +import pytest + +log = logging.getLogger(__name__) + + +@pytest.fixture +def temporary_home(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]: + env_dir = os.getenv("TEST_TEMPORARY_DIR") + if env_dir is not None: + path = Path(env_dir).resolve() + log.debug("Temp HOME directory: %s", str(path)) + monkeypatch.setenv("HOME", str(path)) + monkeypatch.chdir(str(path)) + yield path + else: + with tempfile.TemporaryDirectory(prefix="pytest-") as dirpath: + monkeypatch.setenv("HOME", str(dirpath)) + monkeypatch.setenv("XDG_CONFIG_HOME", str(Path(dirpath) / ".config")) + monkeypatch.chdir(str(dirpath)) + log.debug("Temp HOME directory: %s", str(dirpath)) + yield Path(dirpath) diff --git a/pkgs/clan-vm-manager/tests/test_cli.py b/pkgs/clan-vm-manager/tests/test_cli.py new file mode 100644 index 00000000..654fa82a --- /dev/null +++ b/pkgs/clan-vm-manager/tests/test_cli.py @@ -0,0 +1,8 @@ +import pytest +from cli import Cli + + +def test_help(capfd: pytest.CaptureFixture) -> None: + cli = Cli() + with pytest.raises(SystemExit): + cli.run(["clan-vm-manager", "--help"]) diff --git a/pkgs/clan-vm-manager/tests/test_join.py b/pkgs/clan-vm-manager/tests/test_join.py new file mode 100644 index 00000000..fff6de20 --- /dev/null +++ b/pkgs/clan-vm-manager/tests/test_join.py @@ -0,0 +1,8 @@ +import time + +from wayland import GtkProc + + +def test_open(app: GtkProc) -> None: + time.sleep(0.5) + assert app.poll() is None diff --git a/pkgs/clan-vm-manager/tests/wayland.py b/pkgs/clan-vm-manager/tests/wayland.py new file mode 100644 index 00000000..1156b666 --- /dev/null +++ b/pkgs/clan-vm-manager/tests/wayland.py @@ -0,0 +1,27 @@ +import sys +from collections.abc import Generator +from subprocess import Popen +from typing import NewType + +import pytest + + +@pytest.fixture(scope="session") +def wayland_compositor() -> Generator[Popen, None, None]: + # Start the Wayland compositor (e.g., Weston) + # compositor = Popen(["weston", "--backend=headless-backend.so"]) + compositor = Popen(["weston"]) + yield compositor + # Cleanup: Terminate the compositor + compositor.terminate() + + +GtkProc = NewType("GtkProc", Popen) + + +@pytest.fixture(scope="function") +def app() -> Generator[GtkProc, None, None]: + rapp = Popen([sys.executable, "-m", "clan_vm_manager"], text=True) + yield GtkProc(rapp) + # Cleanup: Terminate your application + rapp.terminate() diff --git a/pkgs/flake-module.nix b/pkgs/flake-module.nix index ec2e3fd3..1909c9c3 100644 --- a/pkgs/flake-module.nix +++ b/pkgs/flake-module.nix @@ -4,6 +4,7 @@ imports = [ ./clan-cli/flake-module.nix ./clan-app/flake-module.nix + ./clan-vm-manager/flake-module.nix ./installer/flake-module.nix ./schemas/flake-module.nix ./webview-ui/flake-module.nix