Merge pull request 'clan-app: Remove vm-manager from codebase' (#1754) from Qubasa/clan-core:Qubasa-fix_clan_app into main
Some checks failed
deploy / deploy-docs (push) Successful in 20s
buildbot/nix-build .#checks.aarch64-darwin.nixos-test_install_machine Build done.
buildbot/nix-build .#checks.aarch64-darwin.nixos-test-backup Build done.
buildbot/nix-build .#checks.aarch64-darwin.nixos-minimal-inventory-machine Build done.
buildbot/nix-build .#checks.aarch64-darwin.nixos-flash-installer Build done.
buildbot/nix-build .#checks.aarch64-linux.nixos-test_install_machine Build done.
buildbot/nix-build .#checks.aarch64-linux.nixos-test-backup Build done.
buildbot/nix-build .#checks.aarch64-linux.nixos-minimal-inventory-machine Build done.
buildbot/nix-build .#checks.aarch64-linux.nixos-flash-installer Build done.
buildbot/nix-build .#checks.x86_64-linux.check-for-breakpoints Build done.
buildbot/nix-build .#checks.x86_64-linux.package-gui-installer-apk Build done.
buildbot/nix-build .#checks.x86_64-linux.package-gui-installer-archlinux Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-app-no-breakpoints Build done.
buildbot/nix-build .#checks.x86_64-linux.package-gui-installer-deb Build done.
buildbot/nix-build .#checks.x86_64-linux.package-gui-installer-rpm Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-bash Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-e2fsprogs Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-git Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-sops Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-sshpass Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-tor Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-zbar Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-qemu Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-pytest-with-core Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-openssh Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-app-pytest Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-age Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-mypy Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-nix Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-dep-rsync Build done.
buildbot/nix-build .#checks.x86_64-linux.devShell-clan-cli Build done.
buildbot/nix-build .#checks.x86_64-linux.clan-pytest-without-core Build done.
buildbot/nix-build .#checks.x86_64-linux.devShell-clan-app Build done.
buildbot/nix-build .#checks.x86_64-linux.container Build done.
buildbot/nix-build .#checks.x86_64-linux.devShell-default Build done.
buildbot/nix-build .#checks.x86_64-linux.lib-jsonschema-example-valid Build done.
buildbot/nix-build .#checks.x86_64-linux.lib-inventory-eval Build done.
buildbot/nix-build .#checks.x86_64-linux.borgbackup Build done.
buildbot/nix-build .#checks.x86_64-linux.module-clan-vars-eval Build done.
buildbot/nix-build .#checks.x86_64-linux.deltachat Build done.
buildbot/nix-build .#checks.x86_64-linux.devShell-webview-ui Build done.
buildbot/nix-build .#checks.x86_64-linux.devShell-inventory-schema Build done.
buildbot/nix-build .#checks.x86_64-linux.devShell-docs Build done.
buildbot/nix-build .#checks.x86_64-linux.lib-inventory-examples-cue Build done.
buildbot/nix-build .#checks.x86_64-linux.package-clan-cli Build done.
buildbot/nix-build .#checks.x86_64-linux.package-clan-cli-docs Build done.
buildbot/nix-build .#checks.x86_64-linux.package-clan-cli-full Build done.
buildbot/nix-build .#checks.x86_64-linux.package-clan-ts-api Build done.
buildbot/nix-build .#checks.x86_64-linux.package-default Build done.
buildbot/nix-build .#checks.x86_64-linux.matrix-synapse Build done.
buildbot/nix-build .#checks.x86_64-linux.nixos-test_install_machine Build done.
buildbot/nix-build .#checks.x86_64-linux.package-editor Build done.
buildbot/nix-build .#checks.x86_64-linux.nixos-minimal-inventory-machine Build done.
buildbot/nix-build .#checks.x86_64-linux.package-deploy-docs Build done.
buildbot/nix-build .#checks.x86_64-linux.package-docs Build done.
buildbot/nix-build .#checks.x86_64-linux.package-function-schema Build done.
buildbot/nix-build .#checks.x86_64-linux.postgresql Build done.
buildbot/nix-build .#checks.x86_64-linux.secrets Build done.
buildbot/nix-build .#checks.x86_64-linux.package-module-docs Build done.
buildbot/nix-build .#checks.x86_64-linux.module-schema Build done.
buildbot/nix-build .#checks.x86_64-linux.flash Build done.
buildbot/nix-build .#checks.x86_64-linux.package-clan-app Build done.
buildbot/nix-build .#checks.x86_64-linux.package-impure-checks Build done.
buildbot/nix-build .#checks.x86_64-linux.package-merge-after-ci Build done.
buildbot/nix-build .#checks.x86_64-linux.lib-jsonschema-nix-unit-tests Build done.
buildbot/nix-build .#checks.x86_64-linux.package-moonlight-sunshine-accept Build done.
buildbot/nix-build .#checks.x86_64-linux.package-pending-reviews Build done.
buildbot/nix-build .#checks.x86_64-linux.package-tea-create-pr Build done.
buildbot/nix-build .#checks.x86_64-linux.package-inventory-schema Build done.
buildbot/nix-build .#checks.x86_64-linux.package-zerotier-members Build done.
buildbot/nix-build .#checks.x86_64-linux.package-webview-ui Build done.
buildbot/nix-build .#checks.x86_64-linux.package-zt-tcp-relay Build done.
buildbot/nix-build .#checks.x86_64-linux.package-zerotierone Build done.
buildbot/nix-build .#checks.x86_64-linux.nixos-test-backup Build done.
buildbot/nix-build .#checks.x86_64-linux.package-inventory-schema-pretty Build done.
buildbot/nix-build .#checks.x86_64-linux.renderClanOptions Build done.
buildbot/nix-build .#checks.x86_64-linux.nixos-flash-installer Build done.
buildbot/nix-build .#checks.x86_64-linux.template-minimal Build done.
buildbot/nix-build .#checks.x86_64-linux.package-module-schema Build done.
buildbot/nix-build .#checks.x86_64-linux.test-backups Build done.
buildbot/nix-build .#checks.x86_64-linux.treefmt Build done.
buildbot/nix-build .#checks.x86_64-linux.zt-tcp-relay Build done.
buildbot/nix-build .#checks.x86_64-linux.wayland-proxy-virtwl Build done.
buildbot/nix-build .#checks.x86_64-linux.syncthing Build done.
buildbot/nix-build .#checks.x86_64-linux.test-installation Build done.
buildbot/nix-build .#checks.x86_64-linux.package-gui-install-test-ubuntu-22-04 Build done.
buildbot/nix-eval Build done.
checks / checks-impure (push) Has been cancelled

This commit is contained in:
clan-bot 2024-07-15 17:52:14 +00:00
commit 4afad03fe9
21 changed files with 389 additions and 3107 deletions

View File

@ -14,6 +14,9 @@
},
{
"path": "../../lib/build-clan"
},
{
"path": "../webview-ui"
}
],
"settings": {
@ -25,6 +28,7 @@
"**/.mypy_cache": true,
"**/.reports": true,
"**/.ruff_cache": true,
"**/.webui": true,
"**/result/**": true,
"/nix/store/**": true
},

View File

@ -0,0 +1,115 @@
import logging
from collections.abc import Callable
from typing import Any, ClassVar, Generic, ParamSpec, TypeVar, cast
from gi.repository import GLib, GObject
log = logging.getLogger(__name__)
class GResult(GObject.Object):
result: Any
op_key: str
method_name: str
def __init__(self, result: Any, method_name: str, op_key: str) -> None:
super().__init__()
self.op_key = op_key
self.result = result
self.method_name = method_name
B = TypeVar("B")
P = ParamSpec("P")
class ImplFunc(GObject.Object, Generic[P, B]):
op_key: str | None = None
__gsignals__: ClassVar = {
"returns": (GObject.SignalFlags.RUN_FIRST, None, [GResult]),
}
def returns(self, result: B) -> None:
method_name = self.__class__.__name__
if self.op_key is None:
raise ValueError(f"op_key is not set for the function {method_name}")
self.emit("returns", GResult(result, method_name, self.op_key))
def await_result(self, fn: Callable[["ImplFunc[..., Any]", B], None]) -> None:
self.connect("returns", fn)
def async_run(self, *args: P.args, **kwargs: P.kwargs) -> bool:
raise NotImplementedError("Method 'async_run' must be implemented")
def _async_run(self, data: Any, op_key: str) -> bool:
self.op_key = op_key
result = GLib.SOURCE_REMOVE
try:
result = self.async_run(**data)
except Exception as e:
log.exception(e)
# TODO: send error to js
finally:
return result
class GObjApi:
def __init__(self, methods: dict[str, Callable[..., Any]]) -> None:
self._methods: dict[str, Callable[..., Any]] = methods
self._obj_registry: dict[str, type[ImplFunc]] = {}
def register_overwrite(self, obj: type[ImplFunc]) -> None:
fn_name = obj.__name__
if not isinstance(obj, type(ImplFunc)):
raise ValueError(f"Object '{fn_name}' is not an instance of ImplFunc")
if fn_name in self._obj_registry:
raise ValueError(f"Function '{fn_name}' already registered")
self._obj_registry[fn_name] = obj
def check_signature(self, method_annotations: dict[str, dict[str, Any]]) -> None:
overwrite_fns = self._obj_registry
# iterate over the methods and check if all are implemented
for m_name, m_annotations in method_annotations.items():
if m_name not in overwrite_fns:
continue
else:
# check if the signature of the abstract method matches the implementation
# abstract signature
values = list(m_annotations.values())
expected_signature = (tuple(values[:-1]), values[-1:][0])
# implementation signature
obj = dict(overwrite_fns[m_name].__dict__)
obj_type = obj["__orig_bases__"][0]
got_signature = obj_type.__args__
if expected_signature != got_signature:
log.error(f"Expected signature: {expected_signature}")
log.error(f"Actual signature: {got_signature}")
raise ValueError(
f"Overwritten method '{m_name}' has different signature than the implementation"
)
def get_obj(self, name: str) -> type[ImplFunc]:
result = self._obj_registry.get(name, None)
if result is not None:
return result
plain_method = self._methods.get(name, None)
if plain_method is None:
raise ValueError(f"Method '{name}' not found in Api")
class GenericFnRuntime(ImplFunc[..., Any]):
def __init__(self) -> None:
super().__init__()
def async_run(self, *args: Any, **kwargs: dict[str, Any]) -> bool:
assert plain_method is not None
result = plain_method(*args, **kwargs)
self.returns(result)
return GLib.SOURCE_REMOVE
return cast(type[ImplFunc], GenericFnRuntime)

View File

@ -0,0 +1,86 @@
# ruff: noqa: N801
import gi
gi.require_version("Gtk", "4.0")
import logging
from clan_cli.api.directory import FileRequest
from gi.repository import Gio, GLib, Gtk
from clan_app.api import ImplFunc
log = logging.getLogger(__name__)
# This implements the abstract function open_file with one argument, file_request,
# which is a FileRequest object and returns a string or None.
class open_file(ImplFunc[[FileRequest], str | None]):
def __init__(self) -> None:
super().__init__()
def async_run(self, file_request: FileRequest) -> bool:
def on_file_select(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
try:
gfile = file_dialog.open_finish(task)
if gfile:
selected_path = gfile.get_path()
self.returns(selected_path)
except Exception as e:
print(f"Error getting selected file or directory: {e}")
def on_folder_select(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
try:
gfile = file_dialog.select_folder_finish(task)
if gfile:
selected_path = gfile.get_path()
self.returns(selected_path)
except Exception as e:
print(f"Error getting selected directory: {e}")
def on_save_finish(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
try:
gfile = file_dialog.save_finish(task)
if gfile:
selected_path = gfile.get_path()
self.returns(selected_path)
except Exception as e:
print(f"Error getting selected file: {e}")
dialog = Gtk.FileDialog()
if file_request.title:
dialog.set_title(file_request.title)
if file_request.filters:
filters = Gio.ListStore.new(Gtk.FileFilter)
file_filters = Gtk.FileFilter()
if file_request.filters.title:
file_filters.set_name(file_request.filters.title)
if file_request.filters.mime_types:
for mime in file_request.filters.mime_types:
file_filters.add_mime_type(mime)
filters.append(file_filters)
if file_request.filters.patterns:
for pattern in file_request.filters.patterns:
file_filters.add_pattern(pattern)
if file_request.filters.suffixes:
for suffix in file_request.filters.suffixes:
file_filters.add_suffix(suffix)
filters.append(file_filters)
dialog.set_filters(filters)
# if select_folder
if file_request.mode == "select_folder":
dialog.select_folder(callback=on_folder_select)
elif file_request.mode == "open_file":
dialog.open(callback=on_file_select)
elif file_request.mode == "save":
dialog.save(callback=on_save_finish)
return GLib.SOURCE_REMOVE

View File

@ -13,10 +13,9 @@ gi.require_version("Adw", "1")
from pathlib import Path
from clan_cli.custom_logger import setup_logging
from gi.repository import Adw, Gdk, Gio, Gtk
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
from clan_app.components.interfaces import ClanConfig
from clan_app.singletons.use_join import GLib, GObject
from .windows.main_window import MainWindow
@ -70,13 +69,12 @@ class MainApplication(Adw.Application):
log.debug("Shutting down Adw.Application")
if self.get_windows() == []:
log.warning("No windows to destroy")
log.debug("No windows to destroy")
if self.window:
# TODO: Doesn't seem to raise the destroy signal. Need to investigate
# self.get_windows() returns an empty list. Desync between window and application?
self.window.close()
# Killing vms directly. This is dirty
self.window.kill_vms()
else:
log.error("No window to destroy")

View File

@ -1,132 +0,0 @@
import logging
import os
import signal
import sys
import traceback
from pathlib import Path
from typing import Any
import gi
gi.require_version("GdkPixbuf", "2.0")
import dataclasses
import multiprocessing as mp
from collections.abc import Callable
log = logging.getLogger(__name__)
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def _kill_group(proc: mp.Process) -> None:
pid = proc.pid
if proc.is_alive() and pid:
os.killpg(pid, signal.SIGTERM)
else:
log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead")
@dataclasses.dataclass(frozen=True)
class MPProcess:
name: str
proc: mp.Process
out_file: Path
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def kill_group(self) -> None:
_kill_group(proc=self.proc)
def _set_proc_name(name: str) -> None:
if sys.platform != "linux":
return
import ctypes
# Define the prctl function with the appropriate arguments and return type
libc = ctypes.CDLL("libc.so.6")
prctl = libc.prctl
prctl.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
]
prctl.restype = ctypes.c_int
# Set the process name to "my_process"
prctl(15, name.encode(), 0, 0, 0)
def _init_proc(
func: Callable,
out_file: Path,
proc_name: str,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
**kwargs: Any,
) -> None:
# Create a new process group
os.setsid()
# Open stdout and stderr
with open(out_file, "w") as out_fd:
os.dup2(out_fd.fileno(), sys.stdout.fileno())
os.dup2(out_fd.fileno(), sys.stderr.fileno())
# Print some information
pid = os.getpid()
gpid = os.getpgid(pid=pid)
# Set the process name
_set_proc_name(proc_name)
# Close stdin
sys.stdin.close()
linebreak = "=" * 5
# Execute the main function
print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr)
try:
func(**kwargs)
except Exception as ex:
traceback.print_exc()
if on_except is not None:
on_except(ex, mp.current_process())
# Kill the new process and all its children by sending a SIGTERM signal to the process group
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
os.killpg(gpid, signal.SIGTERM)
sys.exit(1)
# Don't use a finally block here, because we want the exitcode to be set to
# 0 if the function returns normally
def spawn(
*,
out_file: Path,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
func: Callable,
**kwargs: Any,
) -> MPProcess:
# Decouple the process from the parent
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="forkserver")
# Set names
proc_name = f"MPExec:{func.__name__}"
# Start the process
proc = mp.Process(
target=_init_proc,
args=(func, out_file, proc_name, on_except),
name=proc_name,
kwargs=kwargs,
)
proc.start()
# Return the process
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
return mp_proc

View File

@ -1,220 +0,0 @@
import logging
from collections.abc import Callable
from typing import Any, Generic, TypeVar
import gi
gi.require_version("Gio", "2.0")
from gi.repository import Gio, GObject
log = logging.getLogger(__name__)
# Define type variables for key and value types
K = TypeVar("K") # Key type
V = TypeVar(
"V", bound=GObject.Object
) # Value type, bound to GObject.GObject or its subclasses
class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
"""
A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values.
Only use self[key] and del self[key] for accessing the items for better performance.
This class could be optimized by having the objects remember their position in the list.
"""
def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None:
super().__init__()
self.gtype = gtype
self.key_gen = key_gen
# From Python 3.7 onwards dictionaries are ordered by default
self._items: dict[K, V] = dict()
##################################
# #
# Gio.ListStore Interface #
# #
##################################
@classmethod
def new(cls: Any, gtype: type[V]) -> "GKVStore":
return cls.__new__(cls, gtype)
def append(self, item: V) -> None:
key = self.key_gen(item)
self[key] = item
def find(self, item: V) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if v == item:
return True, i
return False, -1
def find_with_equal_func(
self, item: V, equal_func: Callable[[V, V], bool]
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item):
return True, i
return False, -1
def find_with_equal_func_full(
self, item: V, equal_func: Callable[[V, V, Any], bool], user_data: Any
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item, user_data):
return True, i
return False, -1
def insert(self, position: int, item: V) -> None:
log.warning("Inserting is O(n) in GKVStore. Better use append")
log.warning(
"This functions may have incorrect items_changed signal behavior. Please test it"
)
key = self.key_gen(item)
if key in self._items:
raise ValueError("Key already exists in the dictionary")
if position < 0 or position > len(self._items):
raise IndexError("Index out of range")
# Temporary storage for items to be reinserted
temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]]
# Delete items from the original dict
for k in list(self.keys())[position:]:
del self._items[k]
# Insert the new key-value pair
self._items[key] = item
# Reinsert the items
for i, (k, v) in enumerate(temp_list):
self._items[k] = v
# Notify the model of the changes
self.items_changed(position, 0, 1)
def insert_sorted(
self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any
) -> None:
raise NotImplementedError("insert_sorted is not implemented in GKVStore")
def remove(self, position: int) -> None:
if position < 0 or position >= self.get_n_items():
return
key = self.keys()[position]
del self[key]
self.items_changed(position, 1, 0)
def remove_all(self) -> None:
self._items.clear()
self.items_changed(0, len(self._items), 0)
def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None:
raise NotImplementedError("sort is not implemented in GKVStore")
def splice(self, position: int, n_removals: int, additions: list[V]) -> None:
raise NotImplementedError("splice is not implemented in GKVStore")
##################################
# #
# Gio.ListModel Interface #
# #
##################################
def get_item(self, position: int) -> V | None:
if position < 0 or position >= self.get_n_items():
return None
# Access items by index since OrderedDict does not support direct indexing
key = list(self._items.keys())[position]
return self._items[key]
def do_get_item(self, position: int) -> V | None:
return self.get_item(position)
def get_item_type(self) -> Any:
return self.gtype.__gtype__ # type: ignore[attr-defined]
def do_get_item_type(self) -> GObject.GType:
return self.get_item_type()
def get_n_items(self) -> int:
return len(self._items)
def do_get_n_items(self) -> int:
return self.get_n_items()
##################################
# #
# Dict Interface #
# #
##################################
def keys(self) -> list[K]:
return list(self._items.keys())
def values(self) -> list[V]:
return list(self._items.values())
def items(self) -> list[tuple[K, V]]:
return list(self._items.items())
def get(self, key: K, default: V | None = None) -> V | None:
return self._items.get(key, default)
# O(1) operation if the key does not exist, O(n) if it does
def __setitem__(self, key: K, value: V) -> None:
# If the key already exists, remove it O(n)
if key in self._items:
log.debug("Updating an existing key in GKVStore is O(n)")
position = self.keys().index(key)
self._items[key] = value
self.items_changed(position, 1, 1)
else:
# Add the new key-value pair
self._items[key] = value
position = max(len(self._items) - 1, 0)
self.items_changed(position, 0, 1)
# O(n) operation
def __delitem__(self, key: K) -> None:
position = self.keys().index(key)
del self._items[key]
self.items_changed(position, 1, 0)
def __len__(self) -> int:
return len(self._items)
# O(1) operation
def __getitem__(self, key: K) -> V: # type: ignore[override]
return self._items[key]
def __contains__(self, key: K) -> bool: # type: ignore[override]
return key in self._items
def __str__(self) -> str:
resp = "GKVStore(\n"
for k, v in self._items.items():
resp += f"{k}: {v}\n"
resp += ")"
return resp
def __repr__(self) -> str:
return self._items.__str__()
##################################
# #
# Custom Methods #
# #
##################################
def first(self) -> V:
return self.values()[0]
def last(self) -> V:
return self.values()[-1]
def register_on_change(
self, callback: Callable[["GKVStore[K,V]", int, int, int], None]
) -> None:
self.connect("items-changed", callback)

View File

@ -1,74 +0,0 @@
import logging
from collections.abc import Callable
from typing import TypeVar
import gi
from clan_app import assets
gi.require_version("Adw", "1")
from gi.repository import Adw, GdkPixbuf, Gio, GObject, Gtk
log = logging.getLogger(__name__)
ListItem = TypeVar("ListItem", bound=GObject.Object)
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
class EmptySplash(Gtk.Box):
def __init__(self, on_join: Callable[[str], None]) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
self.on_join = on_join
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
clan_icon = self.load_image(str(assets.get_asset("clan_black_notext.png")))
if clan_icon:
image = Gtk.Image.new_from_pixbuf(clan_icon)
else:
image = Gtk.Image.new_from_icon_name("image-missing")
# same as the clamp
image.set_pixel_size(400)
image.set_opacity(0.5)
image.set_margin_top(20)
image.set_margin_bottom(10)
vbox.append(image)
empty_label = Gtk.Label(label="Welcome to Clan! Join your first clan.")
join_entry = Gtk.Entry()
join_entry.set_placeholder_text("clan://<url>")
join_entry.set_hexpand(True)
join_button = Gtk.Button(label="Join")
join_button.connect("clicked", self._on_join, join_entry)
join_entry.connect("activate", lambda e: self._on_join(join_button, e))
clamp = Adw.Clamp()
clamp.set_maximum_size(400)
clamp.set_margin_bottom(40)
vbox.append(empty_label)
hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
hbox.append(join_entry)
hbox.append(join_button)
vbox.append(hbox)
clamp.set_child(vbox)
self.append(clamp)
def load_image(self, file_path: str) -> GdkPixbuf.Pixbuf | None:
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file(file_path)
return pixbuf
except Exception as e:
log.error(f"Failed to load image: {e}")
return None
def _on_join(self, button: Gtk.Button, entry: Gtk.Entry) -> None:
"""
Callback for the join button
Extracts the text from the entry and calls the on_join callback
"""
log.info(f"Splash screen: Joining {entry.get_text()}")
self.on_join(entry.get_text())

View File

@ -0,0 +1,92 @@
import dataclasses
import logging
from dataclasses import fields, is_dataclass
from pathlib import Path
from types import UnionType
from typing import Any, get_args
import gi
gi.require_version("WebKit", "6.0")
log = logging.getLogger(__name__)
def sanitize_string(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if dataclasses.is_dataclass(obj):
return {
sanitize_string(k): dataclass_to_dict(v)
for k, v in dataclasses.asdict(obj).items()
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return str(obj)
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if not data:
return None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
field_value = data.get(field.name)
field_type = get_inner_type(field.type)
if field_value is not None:
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Field has a default value. We cannot set the value to None
if field_value is not None:
field_values[field.name] = field_value
else:
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e}")
return None

File diff suppressed because it is too large Load Diff

View File

@ -1,375 +0,0 @@
import logging
import multiprocessing as mp
import os
import tempfile
import threading
import time
import weakref
from collections.abc import Callable, Generator
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import IO, ClassVar
import gi
from clan_cli import vms
from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import HistoryEntry
from clan_cli.machines.machines import Machine
from clan_app.components.executor import MPProcess, spawn
from clan_app.singletons.toast import (
InfoToast,
SuccessToast,
ToastOverlay,
WarningToast,
)
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib, GObject, Gtk
log = logging.getLogger(__name__)
class VMObject(GObject.Object):
# Define a custom signal with the name "vm_stopped" and a string argument for the message
__gsignals__: ClassVar = {
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, []),
"vm_build_notify": (GObject.SignalFlags.RUN_FIRST, None, [bool, bool]),
}
def __init__(
self,
icon: Path,
data: HistoryEntry,
build_log_cb: Callable[[Gio.File], None],
) -> None:
super().__init__()
# Store the data from the history entry
self.data: HistoryEntry = data
self.build_log_cb = build_log_cb
# Create a process object to store the VM process
self.vm_process: MPProcess = MPProcess(
"vm_dummy", mp.Process(), Path("./dummy")
)
self.build_process: MPProcess = MPProcess(
"build_dummy", mp.Process(), Path("./dummy")
)
self._start_thread: threading.Thread = threading.Thread()
self.machine: Machine | None = None
# Watcher to stop the VM
self.KILL_TIMEOUT: int = 20 # seconds
self._stop_thread: threading.Thread = threading.Thread()
# Build progress bar vars
self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar()
self.progress_bar.hide()
self.progress_bar.set_hexpand(True) # Horizontally expand
self.prog_bar_id: int = 0
# Create a temporary directory to store the logs
self.log_dir: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory(
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
)
self._logs_id: int = 0
self._log_file: IO[str] | None = None
# To be able to set the switch state programmatically
# we need to store the handler id returned by the connect method
# and block the signal while we change the state. This is cursed.
self.switch: Gtk.Switch = Gtk.Switch()
self.switch_handler_id: int = self.switch.connect(
"notify::active", self._on_switch_toggle
)
self.connect("vm_status_changed", self._on_vm_status_changed)
# Make sure the VM is killed when the reference to this object is dropped
self._finalizer: weakref.finalize = weakref.finalize(self, self._kill_ref_drop)
def _vm_status_changed_task(self) -> bool:
self.emit("vm_status_changed")
return GLib.SOURCE_REMOVE
def update(self, data: HistoryEntry) -> None:
self.data = data
def _on_vm_status_changed(self, source: "VMObject") -> None:
# Signal may be emitted multiple times
self.emit("vm_build_notify", self.is_building(), self.is_running())
prev_state = self.switch.get_state()
next_state = self.is_running() and not self.is_building()
self.switch.set_state(next_state)
if prev_state is False and next_state is True:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"{source.data.flake.flake_attr} started").toast,
"success.vm.start",
)
if self.switch.get_sensitive() is False and not self.is_building():
self.switch.set_sensitive(True)
exit_vm = self.vm_process.proc.exitcode
exit_build = self.build_process.proc.exitcode
exitc = exit_vm or exit_build
if not self.is_running() and exitc != 0:
with self.switch.handler_block(self.switch_handler_id):
self.switch.set_active(False)
log.error(f"VM exited with error. Exitcode: {exitc}")
ToastOverlay.use().add_toast_unique(
WarningToast(f"VM exited with error. Exitcode: {exitc}").toast,
"warning.vm.exit",
)
def _on_switch_toggle(self, switch: Gtk.Switch, user_state: bool) -> None:
if switch.get_active():
switch.set_state(False)
switch.set_sensitive(False)
self.start()
else:
switch.set_state(True)
self.shutdown()
switch.set_sensitive(False)
# We use a context manager to create the machine object
# and make sure it is destroyed when the context is exited
@contextmanager
def _create_machine(self) -> Generator[Machine, None, None]:
uri = ClanURI.from_str(
url=str(self.data.flake.flake_url), machine_name=self.data.flake.flake_attr
)
if uri.flake_id.is_local():
self.machine = Machine(
name=self.data.flake.flake_attr,
flake=uri.flake_id.path,
)
if uri.flake_id.is_remote():
self.machine = Machine(
name=self.data.flake.flake_attr,
flake=uri.flake_id.url,
)
assert self.machine is not None
yield self.machine
self.machine = None
def _pulse_progress_bar_task(self) -> bool:
if self.progress_bar.is_visible():
self.progress_bar.pulse()
return GLib.SOURCE_CONTINUE
else:
return GLib.SOURCE_REMOVE
def __start(self) -> None:
with self._create_machine() as machine:
# Start building VM
tstart = datetime.now()
log.info(f"Building VM {self.get_id()}")
log_dir = Path(str(self.log_dir.name))
# Start the build process
self.build_process = spawn(
on_except=None,
out_file=log_dir / "build.log",
func=vms.run.build_vm,
machine=machine,
tmpdir=log_dir,
)
gfile = Gio.File.new_for_path(str(log_dir / "build.log"))
# Gio documentation:
# Obtains a file monitor for the given file.
# If no file notification mechanism exists, then regular polling of the file is used.
g_monitor = gfile.monitor_file(Gio.FileMonitorFlags.NONE, None)
g_monitor.connect("changed", self.on_logs_changed)
GLib.idle_add(self._vm_status_changed_task)
self.switch.set_sensitive(True)
# Start the logs watcher
self._logs_id = GLib.timeout_add(
50, self._get_logs_task, self.build_process
)
if self._logs_id == 0:
log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.build_process.out_file}")
# Start the progress bar and show it
self.progress_bar.show()
self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar_task)
if self.prog_bar_id == 0:
log.error("Couldn't spawn a progress bar task")
# Wait for the build to finish then hide the progress bar
self.build_process.proc.join()
tend = datetime.now()
log.info(f"VM {self.get_id()} build took {tend - tstart}s")
self.progress_bar.hide()
# Check if the VM was built successfully
if self.build_process.proc.exitcode != 0:
log.error(f"Failed to build VM {self.get_id()}")
GLib.idle_add(self._vm_status_changed_task)
return
log.info(f"Successfully built VM {self.get_id()}")
# Start the VM
self.vm_process = spawn(
on_except=None,
out_file=Path(str(self.log_dir.name)) / "vm.log",
func=vms.run.run_vm,
vm=self.data.flake.vm,
cachedir=log_dir,
socketdir=log_dir,
)
log.debug(f"Started VM {self.get_id()}")
GLib.idle_add(self._vm_status_changed_task)
# Start the logs watcher
self._logs_id = GLib.timeout_add(50, self._get_logs_task, self.vm_process)
if self._logs_id == 0:
log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.vm_process.out_file}")
# Wait for the VM to stop
self.vm_process.proc.join()
log.debug(f"VM {self.get_id()} has stopped")
GLib.idle_add(self._vm_status_changed_task)
def on_logs_changed(
self,
monitor: Gio.FileMonitor,
file: Gio.File,
other_file: Gio.File,
event_type: Gio.FileMonitorEvent,
) -> None:
if event_type == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
# File was changed and the changes were written to disk
# wire up the callback for setting the logs
self.build_log_cb(file)
def start(self) -> None:
if self.is_running():
log.warn("VM is already running. Ignoring start request")
self.emit("vm_status_changed", self)
return
log.debug(f"VM state dir {self.log_dir.name}")
self._start_thread = threading.Thread(target=self.__start)
self._start_thread.start()
def _get_logs_task(self, proc: MPProcess) -> bool:
if not proc.out_file.exists():
return GLib.SOURCE_CONTINUE
if not self._log_file:
try:
self._log_file = open(proc.out_file)
except Exception as ex:
log.exception(ex)
self._log_file = None
return GLib.SOURCE_REMOVE
line = os.read(self._log_file.fileno(), 4096)
if len(line) != 0:
print(line.decode("utf-8"), end="", flush=True)
if not proc.proc.is_alive():
log.debug("Removing logs watcher")
self._log_file = None
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
def is_running(self) -> bool:
return self._start_thread.is_alive()
def is_building(self) -> bool:
return self.build_process.proc.is_alive()
def is_shutting_down(self) -> bool:
return self._stop_thread.is_alive()
def get_id(self) -> str:
return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}"
def __stop(self) -> None:
log.info(f"Stopping VM {self.get_id()}")
start_time = datetime.now()
while self.is_running():
diff = datetime.now() - start_time
if diff.seconds > self.KILL_TIMEOUT:
log.error(
f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it"
)
self.vm_process.kill_group()
break
if self.is_building():
log.info(f"VM {self.get_id()} is still building. Killing it")
self.build_process.kill_group()
break
if not self.machine:
log.error(f"Machine object is None. Killing VM {self.get_id()}")
self.vm_process.kill_group()
break
# Try to shutdown the VM gracefully using QMP
try:
with self.machine.vm.qmp_ctx() as qmp:
qmp.command("system_powerdown")
except Exception as ex:
log.debug(f"QMP command 'system_powerdown' ignored. Error: {ex}")
# Try 20 times to stop the VM
time.sleep(self.KILL_TIMEOUT / 20)
GLib.idle_add(self._vm_status_changed_task)
log.debug(f"VM {self.get_id()} has stopped")
ToastOverlay.use().add_toast_unique(
InfoToast(f"Stopped {self.get_id()}").toast, "info.vm.exit"
)
def shutdown(self) -> None:
if not self.is_running():
log.warning("VM not running. Ignoring shutdown request.")
self.emit("vm_status_changed", self)
return
if self.is_shutting_down():
log.warning("Shutdown already in progress")
self.emit("vm_status_changed", self)
return
self._stop_thread = threading.Thread(target=self.__stop)
self._stop_thread.start()
def _kill_ref_drop(self) -> None:
if self.is_running():
log.warning("Killing VM due to reference drop")
self.kill()
def kill(self) -> None:
if not self.is_running():
log.warning(f"Tried to kill VM {self.get_id()} is not running")
return
log.info(f"Killing VM {self.get_id()} now")
if self.vm_process.proc.is_alive():
self.vm_process.kill_group()
if self.build_process.proc.is_alive():
self.build_process.kill_group()
def read_whole_log(self) -> str:
if not self.vm_process.out_file.exists():
log.error(f"Log file {self.vm_process.out_file} does not exist")
return ""
return self.vm_process.out_file.read_text()
def __str__(self) -> str:
return f"VM({self.get_id()})"
def __repr__(self) -> str:
return self.__str__()

View File

@ -9,9 +9,6 @@ gi.require_version("Adw", "1")
from gi.repository import Adw
from clan_app.singletons.use_views import ViewStack
from clan_app.views.logs import Logs
log = logging.getLogger(__name__)
@ -48,36 +45,6 @@ class ToastOverlay:
toast.connect("dismissed", lambda toast: self.active_toasts.remove(key))
class ErrorToast:
toast: Adw.Toast
def __init__(
self, message: str, persistent: bool = False, details: str = ""
) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"""<span foreground='red'>❌ Error </span> {message}"""
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.HIGH)
self.toast.set_button_label("Show more")
if persistent:
self.toast.set_timeout(0)
views = ViewStack.use().view
# we cannot check this type, python is not smart enough
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
logs_view.set_message(details)
self.toast.connect(
"button-clicked",
lambda _: views.set_visible_child_name("logs"),
)
class WarningToast:
toast: Adw.Toast

View File

@ -1,106 +0,0 @@
import logging
import threading
from collections.abc import Callable
from typing import Any, ClassVar, cast
import gi
from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import HistoryEntry, add_history
from clan_app.components.gkvstore import GKVStore
from clan_app.singletons.use_vms import ClanStore
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
class JoinValue(GObject.Object):
__gsignals__: ClassVar = {
"join_finished": (GObject.SignalFlags.RUN_FIRST, None, []),
}
url: ClanURI
entry: HistoryEntry | None
def _join_finished_task(self) -> bool:
self.emit("join_finished")
return GLib.SOURCE_REMOVE
def __init__(self, url: ClanURI) -> None:
super().__init__()
self.url: ClanURI = url
self.entry: HistoryEntry | None = None
def __join(self) -> None:
new_entry = add_history(self.url)
self.entry = new_entry
GLib.idle_add(self._join_finished_task)
def join(self) -> None:
threading.Thread(target=self.__join).start()
class JoinList:
"""
This is a singleton.
It is initialized with the first call of use()
"""
_instance: "None | JoinList" = None
list_store: Gio.ListStore
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
raise RuntimeError("Call use() instead")
@classmethod
def use(cls: Any) -> "JoinList":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.list_store = Gio.ListStore.new(JoinValue)
ClanStore.use().register_on_deep_change(cls._instance._rerender_join_list)
return cls._instance
def _rerender_join_list(
self, source: GKVStore, position: int, removed: int, added: int
) -> None:
self.list_store.items_changed(
0, self.list_store.get_n_items(), self.list_store.get_n_items()
)
def is_empty(self) -> bool:
return self.list_store.get_n_items() == 0
def push(self, uri: ClanURI, after_join: Callable[[JoinValue], None]) -> None:
"""
Add a join request.
This method can add multiple join requests if called subsequently for each request.
"""
value = JoinValue(uri)
if value.url.machine.get_id() in [
cast(JoinValue, item).url.machine.get_id() for item in self.list_store
]:
log.info(f"Join request already exists: {value.url}. Ignoring.")
return
value.connect("join_finished", self._on_join_finished)
value.connect("join_finished", after_join)
self.list_store.append(value)
def _on_join_finished(self, source: JoinValue) -> None:
log.info(f"Join finished: {source.url}")
self.discard(source)
assert source.entry is not None
ClanStore.use().push_history_entry(source.entry)
def discard(self, value: JoinValue) -> None:
(has, idx) = self.list_store.find(value)
if has:
self.list_store.remove(idx)

View File

@ -1,181 +0,0 @@
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any, ClassVar
import gi
from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import HistoryEntry
from clan_app import assets
from clan_app.components.gkvstore import GKVStore
from clan_app.components.vmobj import VMObject
from clan_app.singletons.use_views import ViewStack
from clan_app.views.logs import Logs
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
class VMStore(GKVStore):
def __init__(self) -> None:
super().__init__(VMObject, lambda vm: vm.data.flake.flake_attr)
class Emitter(GObject.GObject):
__gsignals__: ClassVar = {
"is_ready": (GObject.SignalFlags.RUN_FIRST, None, []),
}
class ClanStore:
_instance: "None | ClanStore" = None
_clan_store: GKVStore[str, VMStore]
_emitter: Emitter
# set the vm that is outputting logs
# build logs are automatically streamed to the logs-view
_logging_vm: VMObject | None = None
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
raise RuntimeError("Call use() instead")
@classmethod
def use(cls: Any) -> "ClanStore":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls._clan_store = GKVStore(
VMStore, lambda store: store.first().data.flake.flake_url
)
cls._emitter = Emitter()
return cls._instance
def emit(self, signal: str) -> None:
self._emitter.emit(signal)
def connect(self, signal: str, cb: Callable[(...), Any]) -> None:
self._emitter.connect(signal, cb)
def set_logging_vm(self, ident: str) -> VMObject | None:
vm = self.get_vm(ClanURI(f"clan://{ident}"))
if vm is not None:
self._logging_vm = vm
return self._logging_vm
def register_on_deep_change(
self, callback: Callable[[GKVStore, int, int, int], None]
) -> None:
"""
Register a callback that is called when a clan_store or one of the included VMStores changes
"""
def on_vmstore_change(
store: VMStore, position: int, removed: int, added: int
) -> None:
callback(store, position, removed, added)
def on_clanstore_change(
store: "GKVStore", position: int, removed: int, added: int
) -> None:
if added > 0:
store.values()[position].register_on_change(on_vmstore_change)
callback(store, position, removed, added)
self.clan_store.register_on_change(on_clanstore_change)
@property
def clan_store(self) -> GKVStore[str, VMStore]:
return self._clan_store
def create_vm_task(self, vm: HistoryEntry) -> bool:
self.push_history_entry(vm)
return GLib.SOURCE_REMOVE
def push_history_entry(self, entry: HistoryEntry) -> None:
# TODO: We shouldn't do this here but in the list view
if entry.flake.icon is None:
icon: Path = assets.loc / "placeholder.jpeg"
else:
icon = Path(entry.flake.icon)
def log_details(gfile: Gio.File) -> None:
self.log_details(vm, gfile)
vm = VMObject(icon=icon, data=entry, build_log_cb=log_details)
self.push(vm)
def log_details(self, vm: VMObject, gfile: Gio.File) -> None:
views = ViewStack.use().view
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
def file_read_callback(
source_object: Gio.File, result: Gio.AsyncResult, _user_data: Any
) -> None:
try:
# Finish the asynchronous read operation
res = source_object.load_contents_finish(result)
_success, contents, _etag_out = res
# Convert the byte array to a string and print it
logs_view.set_message(contents.decode("utf-8"))
except Exception as e:
print(f"Error reading file: {e}")
# only one vm can output logs at a time
if vm == self._logging_vm:
gfile.load_contents_async(None, file_read_callback, None)
# we cannot check this type, python is not smart enough
def push(self, vm: VMObject) -> None:
url = str(vm.data.flake.flake_url)
# Only write to the store if the Clan is not already in it
# Every write to the KVStore rerenders bound widgets to the clan_store
if url not in self.clan_store:
log.debug(f"Creating new VMStore for {url}")
vm_store = VMStore()
vm_store.append(vm)
self.clan_store[url] = vm_store
else:
vm_store = self.clan_store[url]
machine = vm.data.flake.flake_attr
old_vm = vm_store.get(machine)
if old_vm:
log.info(
f"VM {vm.data.flake.flake_attr} already exists in store. Updating data field."
)
old_vm.update(vm.data)
else:
log.debug(f"Appending VM {vm.data.flake.flake_attr} to store")
vm_store.append(vm)
def remove(self, vm: VMObject) -> None:
del self.clan_store[str(vm.data.flake.flake_url)][vm.data.flake.flake_attr]
def get_vm(self, uri: ClanURI) -> None | VMObject:
vm_store = self.clan_store.get(str(uri.flake_id))
if vm_store is None:
return None
machine = vm_store.get(uri.machine.name, None)
return machine
def get_running_vms(self) -> list[VMObject]:
return [
vm
for clan in self.clan_store.values()
for vm in clan.values()
if vm.is_running()
]
def kill_all(self) -> None:
for vm in self.get_running_vms():
vm.kill()

View File

@ -1,61 +0,0 @@
import os
from collections.abc import Callable
from functools import partial
from typing import Any, Literal, TypeVar
import gi
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GObject, Gtk
# Define a TypeVar that is bound to GObject.Object
ListItem = TypeVar("ListItem", bound=GObject.Object)
def create_details_list(
model: Gio.ListStore, render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget]
) -> Gtk.ListBox:
boxed_list = Gtk.ListBox()
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
boxed_list.add_css_class("boxed-list")
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
return boxed_list
class PreferencesValue(GObject.Object):
variant: Literal["CPU", "MEMORY"]
editable: bool
data: Any
def __init__(
self, variant: Literal["CPU", "MEMORY"], editable: bool, data: Any
) -> None:
super().__init__()
self.variant = variant
self.editable = editable
self.data = data
class Details(Gtk.Box):
def __init__(self) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
preferences_store = Gio.ListStore.new(PreferencesValue)
preferences_store.append(PreferencesValue("CPU", True, 1))
self.details_list = create_details_list(
model=preferences_store, render_row=self.render_entry_row
)
self.append(self.details_list)
def render_entry_row(
self, boxed_list: Gtk.ListBox, item: PreferencesValue
) -> Gtk.Widget:
cores: int | None = os.cpu_count()
fcores = float(cores) if cores else 1.0
row = Adw.SpinRow.new_with_range(0, fcores, 1)
row.set_value(item.data)
return row

View File

@ -1,356 +0,0 @@
import base64
import logging
from collections.abc import Callable
from functools import partial
from typing import Any, TypeVar
import gi
from clan_cli.clan_uri import ClanURI
from clan_app.components.gkvstore import GKVStore
from clan_app.components.interfaces import ClanConfig
from clan_app.components.list_splash import EmptySplash
from clan_app.components.vmobj import VMObject
from clan_app.singletons.toast import (
LogToast,
SuccessToast,
ToastOverlay,
WarningToast,
)
from clan_app.singletons.use_join import JoinList, JoinValue
from clan_app.singletons.use_views import ViewStack
from clan_app.singletons.use_vms import ClanStore, VMStore
from clan_app.views.logs import Logs
gi.require_version("Adw", "1")
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
log = logging.getLogger(__name__)
ListItem = TypeVar("ListItem", bound=GObject.Object)
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
def create_boxed_list(
model: CustomStore,
render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget],
) -> Gtk.ListBox:
boxed_list = Gtk.ListBox()
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
boxed_list.add_css_class("boxed-list")
boxed_list.add_css_class("no-shadow")
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
return boxed_list
class ClanList(Gtk.Box):
"""
The ClanList
Is the composition of
the ClanListToolbar
the clanListView
# ------------------------ #
# - Tools <Start> <Stop> < Edit> #
# ------------------------ #
# - List Items
# - <...>
# ------------------------#
"""
def __init__(self, config: ClanConfig) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
app.connect("join_request", self.on_join_request)
self.log_label: Gtk.Label = Gtk.Label()
# Add join list
self.join_boxed_list = create_boxed_list(
model=JoinList.use().list_store, render_row=self.render_join_row
)
self.join_boxed_list.add_css_class("join-list")
self.append(self.join_boxed_list)
clan_store = ClanStore.use()
clan_store.connect("is_ready", self.display_splash)
self.group_list = create_boxed_list(
model=clan_store.clan_store, render_row=self.render_group_row
)
self.group_list.add_css_class("group-list")
self.append(self.group_list)
self.splash = EmptySplash(on_join=lambda x: self.on_join_request(x, x))
def display_splash(self, source: GKVStore) -> None:
print("Displaying splash")
if (
ClanStore.use().clan_store.get_n_items() == 0
and JoinList.use().list_store.get_n_items() == 0
):
self.append(self.splash)
def render_group_row(
self, boxed_list: Gtk.ListBox, vm_store: VMStore
) -> Gtk.Widget:
self.remove(self.splash)
vm = vm_store.first()
log.debug("Rendering group row for %s", vm.data.flake.flake_url)
grp = Adw.PreferencesGroup()
grp.set_title(vm.data.flake.clan_name)
grp.set_description(vm.data.flake.flake_url)
add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s"))
add_action.connect("activate", self.on_add)
app = Gio.Application.get_default()
assert app is not None
app.add_action(add_action)
# menu_model = Gio.Menu()
# TODO: Make this lazy, blocks UI startup for too long
# for vm in machines.list.list_machines(flake_url=vm.data.flake.flake_url):
# if vm not in vm_store:
# menu_model.append(vm, f"app.add::{vm}")
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
add_button = Gtk.Button()
add_button_content = Adw.ButtonContent.new()
add_button_content.set_label("Add machine")
add_button_content.set_icon_name("list-add-symbolic")
add_button.add_css_class("flat")
add_button.set_child(add_button_content)
# add_button.set_has_frame(False)
# add_button.set_menu_model(menu_model)
# add_button.set_label("Add machine")
box.append(add_button)
grp.set_header_suffix(box)
vm_list = create_boxed_list(model=vm_store, render_row=self.render_vm_row)
grp.add(vm_list)
return grp
def on_add(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Adding new machine", target)
def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VMObject) -> Gtk.Widget:
# Remove no-shadow class if attached
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
flake = vm.data.flake
row = Adw.ActionRow()
# ====== Display Avatar ======
avatar = Adw.Avatar()
machine_icon = flake.vm.machine_icon
# If there is a machine icon, display it else
# display the clan icon
if machine_icon:
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(machine_icon)))
elif flake.icon:
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(flake.icon)))
else:
avatar.set_text(flake.clan_name + " " + flake.flake_attr)
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
# ====== Display Name And Url =====
row.set_title(flake.flake_attr)
row.set_title_lines(1)
row.set_title_selectable(True)
# If there is a machine description, display it else
# display the clan name
if flake.vm.machine_description:
row.set_subtitle(flake.vm.machine_description)
else:
row.set_subtitle(flake.clan_name)
row.set_subtitle_lines(1)
# ==== Display build progress bar ====
build_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
build_box.set_valign(Gtk.Align.CENTER)
build_box.append(vm.progress_bar)
build_box.set_homogeneous(False)
row.add_suffix(build_box) # This allows children to have different sizes
# ==== Action buttons ====
button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
button_box.set_valign(Gtk.Align.CENTER)
## Drop down menu
open_action = Gio.SimpleAction.new("edit", GLib.VariantType.new("s"))
open_action.connect("activate", self.on_edit)
action_id = base64.b64encode(vm.get_id().encode("utf-8")).decode("utf-8")
build_logs_action = Gio.SimpleAction.new(
f"logs.{action_id}", GLib.VariantType.new("s")
)
build_logs_action.connect("activate", self.on_show_build_logs)
build_logs_action.set_enabled(False)
app = Gio.Application.get_default()
assert app is not None
app.add_action(open_action)
app.add_action(build_logs_action)
# set a callback function for conditionally enabling the build_logs action
def on_vm_build_notify(
vm: VMObject, is_building: bool, is_running: bool
) -> None:
build_logs_action.set_enabled(is_building or is_running)
app.add_action(build_logs_action)
if is_building:
ToastOverlay.use().add_toast_unique(
LogToast(
"""Build process running ...""",
on_button_click=lambda: self.show_vm_build_logs(vm.get_id()),
).toast,
f"info.build.running.{vm}",
)
vm.connect("vm_build_notify", on_vm_build_notify)
menu_model = Gio.Menu()
menu_model.append("Edit", f"app.edit::{vm.get_id()}")
menu_model.append("Show Logs", f"app.logs.{action_id}::{vm.get_id()}")
pref_button = Gtk.MenuButton()
pref_button.set_icon_name("open-menu-symbolic")
pref_button.set_menu_model(menu_model)
button_box.append(pref_button)
## VM switch button
switch_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
switch_box.set_valign(Gtk.Align.CENTER)
switch_box.append(vm.switch)
button_box.append(switch_box)
row.add_suffix(button_box)
return row
def on_edit(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Editing settings for machine", target)
def on_show_build_logs(self, _: Any, parameter: Any) -> None:
target = parameter.get_string()
self.show_vm_build_logs(target)
def show_vm_build_logs(self, target: str) -> None:
vm = ClanStore.use().set_logging_vm(target)
if vm is None:
raise ValueError(f"VM {target} not found")
views = ViewStack.use().view
# Reset the logs view
logs: Logs = views.get_child_by_name("logs") # type: ignore
if logs is None:
raise ValueError("Logs view not found")
name = vm.machine.name if vm.machine else "Unknown"
logs.set_title(f"""📄<span weight="normal"> {name}</span>""")
# initial message. Streaming happens automatically when the file is changed by the build process
with open(vm.build_process.out_file) as f:
logs.set_message(f.read())
views.set_visible_child_name("logs")
def render_join_row(
self, boxed_list: Gtk.ListBox, join_val: JoinValue
) -> Gtk.Widget:
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
log.debug("Rendering join row for %s", join_val.url)
row = Adw.ActionRow()
row.set_title(join_val.url.machine.name)
row.set_subtitle(str(join_val.url))
row.add_css_class("trust")
vm = ClanStore.use().get_vm(join_val.url)
# Can't do this here because clan store is empty at this point
if vm is not None:
sub = row.get_subtitle()
assert sub is not None
ToastOverlay.use().add_toast_unique(
WarningToast(
f"""<span weight="regular">{join_val.url.machine.name!s}</span> Already exists. Joining again will update it"""
).toast,
"warning.duplicate.join",
)
row.set_subtitle(
sub + "\nClan already exists. Joining again will update it"
)
avatar = Adw.Avatar()
avatar.set_text(str(join_val.url.machine.name))
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
cancel_button = Gtk.Button(label="Cancel")
cancel_button.add_css_class("error")
cancel_button.connect("clicked", partial(self.on_discard_clicked, join_val))
self.cancel_button = cancel_button
trust_button = Gtk.Button(label="Join")
trust_button.add_css_class("success")
trust_button.connect("clicked", partial(self.on_trust_clicked, join_val))
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
box.append(cancel_button)
box.append(trust_button)
row.add_suffix(box)
return row
def on_join_request(self, source: Any, url: str) -> None:
log.debug("Join request: %s", url)
clan_uri = ClanURI(url)
JoinList.use().push(clan_uri, self.on_after_join)
def on_after_join(self, source: JoinValue) -> None:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"Updated {source.url.machine.name}").toast,
"success.join",
)
# If the join request list is empty disable the shadow artefact
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")
def on_trust_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
source.set_sensitive(False)
self.cancel_button.set_sensitive(False)
value.join()
def on_discard_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
JoinList.use().discard(value)
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")

View File

@ -1,65 +0,0 @@
import logging
import gi
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, Gtk
from clan_app.singletons.use_views import ViewStack
log = logging.getLogger(__name__)
class Logs(Gtk.Box):
"""
Simple log view
This includes a banner and a text view and a button to close the log and navigate back to the overview
"""
def __init__(self) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
self.banner = Adw.Banner.new("")
self.banner.set_use_markup(True)
self.banner.set_revealed(True)
self.banner.set_button_label("Close")
self.banner.connect(
"button-clicked",
lambda _: ViewStack.use().view.set_visible_child_name("list"),
)
self.text_view = Gtk.TextView()
self.text_view.set_editable(False)
self.text_view.set_wrap_mode(Gtk.WrapMode.WORD)
self.text_view.add_css_class("log-view")
self.append(self.banner)
self.append(self.text_view)
def set_title(self, title: str) -> None:
self.banner.set_title(title)
def set_message(self, message: str) -> None:
"""
Set the log message. This will delete any previous message
"""
buffer = self.text_view.get_buffer()
buffer.set_text(message)
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)
def append_message(self, message: str) -> None:
"""
Append to the end of a potentially existent log message
"""
buffer = self.text_view.get_buffer()
end_iter = buffer.get_end_iter()
buffer.insert(end_iter, message) # type: ignore
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)

View File

@ -1,214 +1,37 @@
import dataclasses
import json
import logging
import threading
from collections.abc import Callable
from dataclasses import fields, is_dataclass
from pathlib import Path
from threading import Lock
from types import UnionType
from typing import Any, get_args
from typing import Any
import gi
from clan_cli.api import API
from clan_cli.api.directory import FileRequest
from clan_cli.api import MethodRegistry
from clan_app.api import GObjApi, GResult, ImplFunc
from clan_app.api.file import open_file
from clan_app.components.serializer import dataclass_to_dict, from_dict
gi.require_version("WebKit", "6.0")
from gi.repository import Gio, GLib, Gtk, WebKit
from gi.repository import GLib, GObject, WebKit
log = logging.getLogger(__name__)
def sanitize_string(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
class WebExecutor(GObject.Object):
def __init__(self, content_uri: str, plain_api: MethodRegistry) -> None:
super().__init__()
self.plain_api: MethodRegistry = plain_api
self.webview: WebKit.WebView = WebKit.WebView()
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if dataclasses.is_dataclass(obj):
return {
sanitize_string(k): dataclass_to_dict(v)
for k, v in dataclasses.asdict(obj).items()
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return str(obj)
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
# Implement the abstract open_file function
def open_file(file_request: FileRequest) -> str | None:
# Function to handle the response and stop the loop
selected_path = None
def on_file_select(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.open_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected file or directory: {e}")
finally:
main_loop.quit()
def on_folder_select(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.select_folder_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected directory: {e}")
finally:
main_loop.quit()
def on_save_finish(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.save_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected file: {e}")
finally:
main_loop.quit()
dialog = Gtk.FileDialog()
if file_request.title:
dialog.set_title(file_request.title)
if file_request.filters:
filters = Gio.ListStore.new(Gtk.FileFilter)
file_filters = Gtk.FileFilter()
if file_request.filters.title:
file_filters.set_name(file_request.filters.title)
# Create and configure a filter for image files
if file_request.filters.mime_types:
for mime in file_request.filters.mime_types:
file_filters.add_mime_type(mime)
filters.append(file_filters)
if file_request.filters.patterns:
for pattern in file_request.filters.patterns:
file_filters.add_pattern(pattern)
if file_request.filters.suffixes:
for suffix in file_request.filters.suffixes:
file_filters.add_suffix(suffix)
filters.append(file_filters)
dialog.set_filters(filters)
main_loop = GLib.MainLoop()
# if select_folder
if file_request.mode == "select_folder":
dialog.select_folder(
callback=lambda dialog, task: on_folder_select(dialog, task, main_loop),
)
elif file_request.mode == "open_file":
dialog.open(
callback=lambda dialog, task: on_file_select(dialog, task, main_loop)
)
elif file_request.mode == "save":
dialog.save(
callback=lambda dialog, task: on_save_finish(dialog, task, main_loop)
)
# Wait for the user to select a file or directory
main_loop.run() # type: ignore
return selected_path
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if not data:
return None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
field_value = data.get(field.name)
field_type = get_inner_type(field.type)
if field_value is not None:
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Field has a default value. We cannot set the value to None
if field_value is not None:
field_values[field.name] = field_value
else:
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e}")
return None
class WebView:
def __init__(self, content_uri: str, methods: dict[str, Callable]) -> None:
self.method_registry: dict[str, Callable] = methods
self.webview = WebKit.WebView()
settings = self.webview.get_settings()
settings: WebKit.Settings = self.webview.get_settings()
# settings.
settings.set_property("enable-developer-extras", True)
self.webview.set_settings(settings)
# Fixme. This filtering is incomplete, it only triggers if a user clicks a link
self.webview.connect("decide-policy", self.on_decide_policy)
self.manager = self.webview.get_user_content_manager()
self.manager: WebKit.UserContentManager = (
self.webview.get_user_content_manager()
)
# Can be called with: window.webkit.messageHandlers.gtk.postMessage("...")
# Important: it seems postMessage must be given some payload, otherwise it won't trigger the event
self.manager.register_script_message_handler("gtk")
@ -217,9 +40,10 @@ class WebView:
self.webview.load_uri(content_uri)
self.content_uri = content_uri
# global mutex lock to ensure functions run sequentially
self.mutex_lock = Lock()
self.queue_size = 0
self.api: GObjApi = GObjApi(self.plain_api.functions)
self.api.register_overwrite(open_file)
self.api.check_signature(self.plain_api.annotations)
def on_decide_policy(
self,
@ -250,85 +74,55 @@ class WebView:
def on_message_received(
self, user_content_manager: WebKit.UserContentManager, message: Any
) -> None:
payload = json.loads(message.to_json(0))
json_msg = message.to_json(4) # 4 is num of indents
log.debug(f"Webview Request: {json_msg}")
payload = json.loads(json_msg)
method_name = payload["method"]
handler_fn = self.method_registry[method_name]
log.debug(f"Received message: {payload}")
log.debug(f"Queue size: {self.queue_size} (Wait)")
# Get the function gobject from the api
function_obj = self.api.get_obj(method_name)
def threaded_wrapper() -> bool:
"""
Ensures only one function is executed at a time
# Create an instance of the function gobject
fn_instance = function_obj()
fn_instance.await_result(self.on_result)
Wait until there is no other function acquiring the global lock.
# Extract the data from the payload
data = payload.get("data")
if data is None:
log.error(f"Method '{method_name}' has no data field. Skipping execution.")
return
Starts a thread with the potentially long running API function within.
"""
if not self.mutex_lock.locked():
thread = threading.Thread(
target=self.threaded_handler,
args=(
handler_fn,
payload.get("data"),
method_name,
),
)
thread.start()
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
# Initialize dataclasses from the payload
reconciled_arguments = {}
op_key = data.pop("op_key", None)
for k, v in data.items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_class = self.plain_api.get_method_argtype(method_name, k)
if dataclasses.is_dataclass(arg_class):
reconciled_arguments[k] = from_dict(arg_class, v)
else:
reconciled_arguments[k] = v
GLib.idle_add(
threaded_wrapper,
fn_instance._async_run,
reconciled_arguments,
op_key,
)
self.queue_size += 1
def threaded_handler(
self,
handler_fn: Callable[
...,
Any,
],
data: dict[str, Any] | None,
method_name: str,
) -> None:
with self.mutex_lock:
log.debug("Executing... ", method_name)
log.debug(f"{data}")
if data is None:
result = handler_fn()
else:
reconciled_arguments = {}
op_key = data.pop("op_key", None)
for k, v in data.items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_class = API.get_method_argtype(method_name, k)
if dataclasses.is_dataclass(arg_class):
reconciled_arguments[k] = from_dict(arg_class, v)
else:
reconciled_arguments[k] = v
def on_result(self, source: ImplFunc, data: GResult) -> None:
result = dict()
result["result"] = dataclass_to_dict(data.result)
result["op_key"] = data.op_key
r = handler_fn(**reconciled_arguments)
# Parse the result to a serializable dictionary
# Echo back the "op_key" to the js api
result = dataclass_to_dict(r)
result["op_key"] = op_key
serialized = json.dumps(result)
# Use idle_add to queue the response call to js on the main GTK thread
GLib.idle_add(self.return_data_to_js, method_name, serialized)
self.queue_size -= 1
log.debug(f"Done: Remaining queue size: {self.queue_size}")
serialized = json.dumps(result, indent=4)
log.debug(f"Result: {serialized}")
# Use idle_add to queue the response call to js on the main GTK thread
self.return_data_to_js(data.method_name, serialized)
def return_data_to_js(self, method_name: str, serialized: str) -> bool:
# This function must be run on the main GTK thread to interact with the webview
# result = method_fn(data) # takes very long
# serialized = result
self.webview.evaluate_javascript(
f"""
window.clan.{method_name}(`{serialized}`);

View File

@ -1,24 +1,16 @@
import logging
import threading
import gi
from clan_cli.api import API
from clan_cli.history.list import list_history
from clan_app.components.interfaces import ClanConfig
from clan_app.singletons.toast import ToastOverlay
from clan_app.singletons.use_views import ViewStack
from clan_app.singletons.use_vms import ClanStore
from clan_app.views.details import Details
from clan_app.views.list import ClanList
from clan_app.views.logs import Logs
from clan_app.views.webview import WebView, open_file
from clan_app.views.webview import WebExecutor
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GLib
from clan_app.components.trayicon import TrayIcon
from gi.repository import Adw, Gio
log = logging.getLogger(__name__)
@ -41,45 +33,17 @@ class MainWindow(Adw.ApplicationWindow):
app = Gio.Application.get_default()
assert app is not None
self.tray_icon: TrayIcon = TrayIcon(app)
# Initialize all ClanStore
threading.Thread(target=self._populate_vms).start()
stack_view = ViewStack.use().view
stack_view.add_named(ClanList(config), "list")
stack_view.add_named(Details(), "details")
stack_view.add_named(Logs(), "logs")
# Override platform specific functions
API.register(open_file)
webexec = WebExecutor(plain_api=API, content_uri=config.content_uri)
webview = WebView(methods=API._registry, content_uri=config.content_uri)
stack_view.add_named(webview.get_webview(), "webview")
stack_view.add_named(webexec.get_webview(), "webview")
stack_view.set_visible_child_name(config.initial_view)
view.set_content(stack_view)
self.connect("destroy", self.on_destroy)
def _set_clan_store_ready(self) -> bool:
ClanStore.use().emit("is_ready")
return GLib.SOURCE_REMOVE
def _populate_vms(self) -> None:
# Execute `clan flakes add <path>` to democlan for this to work
# TODO: Make list_history a generator function
for entry in list_history():
GLib.idle_add(ClanStore.use().create_vm_task, entry)
GLib.idle_add(self._set_clan_store_ready)
def kill_vms(self) -> None:
log.debug("Killing all VMs")
ClanStore.use().kill_all()
def on_destroy(self, source: "Adw.ApplicationWindow") -> None:
log.info("====Destroying Adw.ApplicationWindow===")
ClanStore.use().kill_all()
self.tray_icon.destroy()
log.debug("Destroying Adw.ApplicationWindow")

View File

@ -37,9 +37,10 @@ disallow_untyped_defs = true
no_implicit_optional = true
[[tool.mypy.overrides]]
module = "clan_cli.*"
module = "argcomplete.*"
ignore_missing_imports = true
[tool.ruff]
target-version = "py311"
line-length = 88

View File

@ -50,10 +50,22 @@ def update_wrapper_signature(wrapper: Callable, wrapped: Callable) -> None:
wrapper.__signature__ = new_sig # type: ignore
class _MethodRegistry:
class MethodRegistry:
def __init__(self) -> None:
self._orig: dict[str, Callable[[Any], Any]] = {}
self._registry: dict[str, Callable[[Any], Any]] = {}
self._orig_annotations: dict[str, dict[str, Any]] = {}
self._registry: dict[str, Callable[..., Any]] = {}
@property
def annotations(self) -> dict[str, dict[str, Any]]:
return self._orig_annotations
@property
def functions(self) -> dict[str, Callable[..., Any]]:
return self._registry
def reset(self) -> None:
self._orig_annotations.clear()
self._registry.clear()
def register_abstract(self, fn: Callable[..., T]) -> Callable[..., T]:
@wraps(fn)
@ -84,7 +96,12 @@ API.register(open_file)
return fn
def register(self, fn: Callable[..., T]) -> Callable[..., T]:
self._orig[fn.__name__] = fn
if fn.__name__ in self._registry:
raise ValueError(f"Function {fn.__name__} already registered")
if fn.__name__ in self._orig_annotations:
raise ValueError(f"Function {fn.__name__} already registered")
# make copy of original function
self._orig_annotations[fn.__name__] = fn.__annotations__.copy()
@wraps(fn)
def wrapper(
@ -118,6 +135,7 @@ API.register(open_file)
update_wrapper_signature(wrapper, fn)
self._registry[fn.__name__] = wrapper
return fn
def to_json_schema(self) -> dict[str, Any]:
@ -182,4 +200,4 @@ API.register(open_file)
return None
API = _MethodRegistry()
API = MethodRegistry()

View File

@ -5,6 +5,7 @@ import sys
from dataclasses import is_dataclass
from pathlib import Path
from clan_cli.api import API
from clan_cli.api.util import JSchemaTypeError, type_to_dict
from clan_cli.errors import ClanError
@ -121,6 +122,7 @@ def test_all_dataclasses() -> None:
for file, dataclass in dataclasses:
print(f"checking dataclass {dataclass} in file: {file}")
try:
API.reset()
dclass = load_dataclass_from_file(file, dataclass, str(cli_path.parent))
type_to_dict(dclass)
except JSchemaTypeError as e: