1
0
forked from clan/clan-core

Merge pull request 'clan_vm_manager: Add GKVStore to combat O(n2) runtimes. Add pygdb to devshell' (#884) from Qubasa-main into main

This commit is contained in:
clan-bot 2024-03-01 03:49:53 +00:00
commit 3acc4b4d25
6 changed files with 339 additions and 222 deletions

View File

@ -0,0 +1,176 @@
import logging
from collections import OrderedDict
from collections.abc import Callable
from typing import Any, Generic, TypeVar
import gi
gi.require_version("Gio", "2.0")
from gi.repository import Gio, GObject
log = logging.getLogger(__name__)
# Define type variables for key and value types
K = TypeVar("K") # Key type
V = TypeVar(
"V", bound=GObject.Object
) # Value type, bound to GObject.GObject or its subclasses
class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
__gtype_name__ = "MyGKVStore"
"""
A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values.
Only use self[key] and del self[key] for accessing the items for better performance.
This class could be optimized by having the objects remember their position in the list.
"""
def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None:
super().__init__()
self.gtype = gtype
self.key_gen = key_gen
self._items: "OrderedDict[K, V]" = OrderedDict()
@classmethod
def new(cls: Any, gtype: type[V]) -> "GKVStore":
return cls.__new__(cls, gtype)
#########################
# #
# READ OPERATIONS #
# #
#########################
def keys(self) -> list[K]:
return list(self._items.keys())
def values(self) -> list[V]:
return list(self._items.values())
def items(self) -> list[tuple[K, V]]:
return list(self._items.items())
def get(self, key: K, default: V | None = None) -> V | None:
return self._items.get(key, default)
def get_n_items(self) -> int:
return len(self._items)
def do_get_n_items(self) -> int:
return self.get_n_items()
def get_item(self, position: int) -> V | None:
if position < 0 or position >= self.get_n_items():
return None
# Access items by index since OrderedDict does not support direct indexing
key = list(self._items.keys())[position]
return self._items[key]
def do_get_item(self, position: int) -> V | None:
return self.get_item(position)
def get_item_type(self) -> GObject.GType:
return self.gtype.__gtype__
def do_get_item_type(self) -> GObject.GType:
return self.get_item_type()
def first(self) -> V:
return self.values()[0]
def last(self) -> V:
return self.values()[-1]
# O(n) operation
def find(self, item: V) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if v == item:
return True, i
return False, -1
#########################
# #
# WRITE OPERATIONS #
# #
#########################
def insert(self, position: int, item: V) -> None:
key = self.key_gen(item)
if key in self._items:
raise ValueError("Key already exists in the dictionary")
if position < 0 or position > len(self._items):
raise IndexError("Index out of range")
# Temporary storage for items to be reinserted
temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]]
# Delete items from the original dict
for k in list(self.keys())[position:]:
del self._items[k]
# Insert the new key-value pair
self._items[key] = item
# Reinsert the items
for i, (k, v) in enumerate(temp_list):
self._items[k] = v
# Notify the model of the changes
self.items_changed(position, 0, 1)
def append(self, item: V) -> None:
key = self.key_gen(item)
self[key] = item
def remove(self, position: int) -> None:
if position < 0 or position >= self.get_n_items():
return
key = self.keys()[position]
del self[key]
self.items_changed(position, 1, 0)
def remove_all(self) -> None:
self._items.clear()
self.items_changed(0, len(self._items), 0)
# O(1) operation if the key does not exist, O(n) if it does
def __setitem__(self, key: K, value: V) -> None:
# If the key already exists, remove it O(n)
# TODO: We have to check if updating an existing key is working correctly
if key in self._items:
log.warning("Updating an existing key in GKVStore is O(n)")
position = self.keys().index(key)
self._items[key] = value
self.items_changed(position, 0, 1)
else:
# Add the new key-value pair
position = max(len(self._items) - 1, 0)
self._items[key] = value
self._items.move_to_end(key)
self.items_changed(position, 0, 1)
# O(n) operation
def __delitem__(self, key: K) -> None:
position = self.keys().index(key)
del self._items[key]
self.items_changed(position, 1, 0)
def __len__(self) -> int:
return len(self._items)
# O(1) operation
def __getitem__(self, key: K) -> V:
return self._items[key]
def __contains__(self, key: K) -> bool:
return key in self._items
def __str__(self) -> str:
resp = "GKVStore(\n"
for k, v in self._items.items():
resp += f"{k}: {v}\n"
resp += ")"
return resp
def __repr__(self) -> str:
return self._items.__str__()

View File

@ -9,7 +9,6 @@ from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import add_history
from clan_vm_manager.errors.show_error import show_error_dialog
from clan_vm_manager.models.use_vms import Clans
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
@ -36,7 +35,7 @@ class JoinValue(GObject.Object):
def __join(self) -> None:
add_history(self.url, all_machines=False)
GLib.idle_add(lambda: self.emit("join_finished", self))
GLib.idle_add(self.emit, "join_finished", self)
def join(self) -> None:
threading.Thread(target=self.__join).start()
@ -75,8 +74,6 @@ class Join:
def after_join(item: JoinValue, _: Any) -> None:
self.discard(item)
Clans.use().refresh()
# VMS.use().refresh()
print("Refreshed list after join")
on_join(item)

View File

@ -1,5 +1,8 @@
import logging
import multiprocessing as mp
import os
import tempfile
import threading
import time
import weakref
from collections.abc import Generator
@ -13,91 +16,20 @@ from clan_cli import vms
from clan_cli.clan_uri import ClanScheme, ClanURI
from clan_cli.errors import ClanError
from clan_cli.history.add import HistoryEntry
from clan_cli.history.list import list_history
from clan_vm_manager import assets
from clan_vm_manager.errors.show_error import show_error_dialog
from clan_cli.machines.machines import Machine
from .executor import MPProcess, spawn
from .gkvstore import GKVStore
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
import logging
import multiprocessing as mp
import threading
from clan_cli.machines.machines import Machine
from gi.repository import Gio, GLib, GObject, Gtk
from gi.repository import GLib, GObject, Gtk
log = logging.getLogger(__name__)
class ClanGroup(GObject.Object):
def __init__(self, url: str | Path, vms: list["VM"]) -> None:
super().__init__()
self.url = url
self.vms = vms
self.clan_name = vms[0].data.flake.clan_name
self.list_store = Gio.ListStore.new(VM)
for vm in vms:
self.list_store.append(vm)
def init_grp_store(list_store: Gio.ListStore) -> None:
groups: dict[str | Path, list["VM"]] = {}
for vm in get_saved_vms():
ll = groups.get(vm.data.flake.flake_url, [])
ll.append(vm)
groups[vm.data.flake.flake_url] = ll
for url, vm_list in groups.items():
grp = ClanGroup(url, vm_list)
list_store.append(grp)
class Clans:
list_store: Gio.ListStore
_instance: "None | ClanGroup" = None
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
raise RuntimeError("Call use() instead")
@classmethod
def use(cls: Any) -> "ClanGroup":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.list_store = Gio.ListStore.new(ClanGroup)
init_grp_store(cls.list_store)
return cls._instance
def filter_by_name(self, text: str) -> None:
if text:
filtered_list = self.list_store
filtered_list.remove_all()
groups: dict[str | Path, list["VM"]] = {}
for vm in get_saved_vms():
ll = groups.get(vm.data.flake.flake_url, [])
print(text, vm.data.flake.vm.machine_name)
if text.lower() in vm.data.flake.vm.machine_name.lower():
ll.append(vm)
groups[vm.data.flake.flake_url] = ll
for url, vm_list in groups.items():
grp = ClanGroup(url, vm_list)
filtered_list.append(grp)
else:
self.refresh()
def refresh(self) -> None:
self.list_store.remove_all()
init_grp_store(self.list_store)
class VM(GObject.Object):
__gtype_name__: ClassVar = "VMGobject"
# Define a custom signal with the name "vm_stopped" and a string argument for the message
__gsignals__: ClassVar = {
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object])
@ -370,10 +302,23 @@ class VM(GObject.Object):
return ""
return self.vm_process.out_file.read_text()
def __str__(self) -> str:
return f"VM({self.get_id()})"
def __repr__(self) -> str:
return self.__str__()
class VMStore(GKVStore):
__gtype_name__ = "MyVMStore"
def __init__(self) -> None:
super().__init__(VM, lambda vm: vm.data.flake.flake_attr)
class VMs:
list_store: Gio.ListStore
_instance: "None | VMs" = None
_clan_store: GKVStore[str, VMStore]
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
@ -383,60 +328,48 @@ class VMs:
def use(cls: Any) -> "VMs":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.list_store = Gio.ListStore.new(VM)
cls._clan_store = GKVStore(
VMStore, lambda store: store.first().data.flake.flake_url
)
for vm in get_saved_vms():
cls.list_store.append(vm)
return cls._instance
def filter_by_name(self, text: str) -> None:
if text:
filtered_list = self.list_store
filtered_list.remove_all()
for vm in get_saved_vms():
if text.lower() in vm.data.flake.vm.machine_name.lower():
filtered_list.append(vm)
else:
self.refresh()
@property
def clan_store(self) -> GKVStore[str, VMStore]:
return self._clan_store
def get_by_id(self, ident: str) -> None | VM:
for vm in self.list_store:
if ident == vm.get_id():
return vm
return None
def push(self, vm: VM) -> None:
url = vm.data.flake.flake_url
# Only write to the store if the VM is not already in it
# Every write to the KVStore rerenders bound widgets to the clan_store
if url not in self.clan_store:
log.debug(f"Creating new VMStore for {url}")
vm_store = VMStore()
vm_store.append(vm)
self.clan_store[url] = vm_store
else:
log.debug(f"Appending VM {vm.data.flake.flake_attr} to store")
vm_store = self.clan_store[url]
vm_store.append(vm)
def remove(self, vm: VM) -> None:
del self.clan_store[vm.data.flake.flake_url][vm.data.flake.flake_attr]
def get_vm(self, flake_url: str, flake_attr: str) -> None | VM:
clan = self.clan_store.get(flake_url)
if clan is None:
return None
return clan.get(flake_attr, None)
def get_running_vms(self) -> list[VM]:
return list(filter(lambda vm: vm.is_running(), self.list_store))
return [
vm
for clan in self.clan_store.values()
for vm in clan.values()
if vm.is_running()
]
def kill_all(self) -> None:
for vm in self.get_running_vms():
vm.kill()
def refresh(self) -> None:
log.error("NEVER FUCKING DO THIS")
return
self.list_store.remove_all()
for vm in get_saved_vms():
self.list_store.append(vm)
def get_saved_vms() -> list[VM]:
vm_list = []
log.info("=====CREATING NEW VM OBJ====")
try:
# Execute `clan flakes add <path>` to democlan for this to work
for entry in list_history():
if entry.flake.icon is None:
icon = assets.loc / "placeholder.jpeg"
else:
icon = entry.flake.icon
base = VM(
icon=Path(icon),
data=entry,
)
vm_list.append(base)
except ClanError as e:
show_error_dialog(e)
return vm_list

View File

@ -4,18 +4,16 @@ from functools import partial
from typing import Any
import gi
from clan_cli import ClanError, history, machines
from clan_cli import history, machines
from clan_cli.clan_uri import ClanURI
from clan_vm_manager.models.interfaces import ClanConfig
from clan_vm_manager.models.use_join import Join, JoinValue
from clan_vm_manager.models.use_vms import VMs
from clan_vm_manager.models.use_vms import VM, VMs, VMStore
gi.require_version("Adw", "1")
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
from clan_vm_manager.models.use_vms import VM, ClanGroup, Clans
log = logging.getLogger(__name__)
@ -51,42 +49,30 @@ class ClanList(Gtk.Box):
self.app = Gio.Application.get_default()
self.app.connect("join_request", self.on_join_request)
groups = Clans.use()
join = Join.use()
self.log_label: Gtk.Label = Gtk.Label()
self.__init_machines = history.add.list_history()
# Add join list
self.join_boxed_list = create_boxed_list(
model=join.list_store, render_row=self.render_join_row
model=Join.use().list_store, render_row=self.render_join_row
)
self.join_boxed_list.add_css_class("join-list")
self.append(self.join_boxed_list)
self.group_list = create_boxed_list(
model=groups.list_store, render_row=self.render_group_row
model=VMs.use().clan_store, render_row=self.render_group_row
)
self.group_list.add_css_class("group-list")
# disable search bar because of unsound handling of VM objects
# search_bar = Gtk.SearchBar()
# # This widget will typically be the top-level window
# search_bar.set_key_capture_widget(Views.use().main_window)
# entry = Gtk.SearchEntry()
# entry.set_placeholder_text("Search cLan")
# entry.connect("search-changed", self.on_search_changed)
# entry.add_css_class("search-entry")
# search_bar.set_child(entry)
# self.append(search_bar)
self.append(self.join_boxed_list)
self.append(self.group_list)
def render_group_row(self, boxed_list: Gtk.ListBox, group: ClanGroup) -> Gtk.Widget:
# if boxed_list.has_css_class("no-shadow"):
# boxed_list.remove_css_class("no-shadow")
def render_group_row(
self, boxed_list: Gtk.ListBox, vm_store: VMStore
) -> Gtk.Widget:
vm = vm_store.first()
log.debug("Rendering group row for %s", vm.data.flake.flake_url)
grp = Adw.PreferencesGroup()
grp.set_title(group.clan_name)
grp.set_description(group.url)
grp.set_title(vm.data.flake.clan_name)
grp.set_description(vm.data.flake.flake_url)
add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s"))
add_action.connect("activate", self.on_add)
@ -94,8 +80,8 @@ class ClanList(Gtk.Box):
app.add_action(add_action)
menu_model = Gio.Menu()
for vm in machines.list.list_machines(flake_url=group.url):
if vm not in [item.data.flake.flake_attr for item in group.list_store]:
for vm in machines.list.list_machines(flake_url=vm.data.flake.flake_url):
if vm not in vm_store:
menu_model.append(vm, f"app.add::{vm}")
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
@ -109,10 +95,7 @@ class ClanList(Gtk.Box):
grp.set_header_suffix(box)
vm_list = create_boxed_list(
model=group.list_store, render_row=self.render_vm_row
)
vm_list = create_boxed_list(model=vm_store, render_row=self.render_vm_row)
grp.add(vm_list)
return grp
@ -121,12 +104,6 @@ class ClanList(Gtk.Box):
target = parameter.get_string()
print("Adding new machine", target)
def on_search_changed(self, entry: Gtk.SearchEntry) -> None:
Clans.use().filter_by_name(entry.get_text())
# Disable the shadow if the list is empty
if not VMs.use().list_store.get_n_items():
self.group_list.add_css_class("no-shadow")
def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VM) -> Gtk.Widget:
# Remove no-shadow class if attached
if boxed_list.has_css_class("no-shadow"):
@ -198,12 +175,8 @@ class ClanList(Gtk.Box):
def on_edit(self, action: Any, parameter: Any) -> None:
target = parameter.get_string()
vm = VMs.use().get_by_id(target)
if not vm:
raise ClanError("Something went wrong. Please restart the app.")
print("Editing settings for machine", vm)
print("Editing settings for machine", target)
def render_join_row(self, boxed_list: Gtk.ListBox, item: JoinValue) -> Gtk.Widget:
if boxed_list.has_css_class("no-shadow"):
@ -215,9 +188,7 @@ class ClanList(Gtk.Box):
row.set_subtitle(item.url.get_internal())
row.add_css_class("trust")
# TODO: figure out how to detect that
exist = VMs.use().use().get_by_id(item.url.get_id())
if exist:
if item.url.params.flake_attr in VMs.use().clan_store:
sub = row.get_subtitle()
row.set_subtitle(
sub + "\nClan already exists. Joining again will update it"

View File

@ -1,19 +1,26 @@
import logging
import threading
from pathlib import Path
from typing import Any
import gi
from clan_cli.history.list import list_history
from clan_vm_manager import assets
from clan_vm_manager.models.interfaces import ClanConfig
from clan_vm_manager.models.use_views import Views
from clan_vm_manager.models.use_vms import VMs
from clan_vm_manager.models.use_vms import VM, VMs
from clan_vm_manager.views.details import Details
from clan_vm_manager.views.list import ClanList
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, Gtk
from gi.repository import Adw, Gio, GLib, Gtk
from ..trayicon import TrayIcon
log = logging.getLogger(__name__)
class MainWindow(Adw.ApplicationWindow):
def __init__(self, config: ClanConfig) -> None:
@ -27,10 +34,12 @@ class MainWindow(Adw.ApplicationWindow):
header = Adw.HeaderBar()
view.add_top_bar(header)
self.vms = VMs.use()
app = Gio.Application.get_default()
self.tray_icon: TrayIcon = TrayIcon(app)
# Initialize all VMs
threading.Thread(target=self._populate_vms).start()
# Initialize all views
stack_view = Views.use().view
@ -52,6 +61,25 @@ class MainWindow(Adw.ApplicationWindow):
self.connect("destroy", self.on_destroy)
def push_vm(self, vm: VM) -> bool:
VMs.use().push(vm)
return GLib.SOURCE_REMOVE
def _populate_vms(self) -> None:
# Execute `clan flakes add <path>` to democlan for this to work
# TODO: Make list_history a generator function
for entry in list_history():
if entry.flake.icon is None:
icon = assets.loc / "placeholder.jpeg"
else:
icon = entry.flake.icon
vm = VM(
icon=Path(icon),
data=entry,
)
GLib.idle_add(self.push_vm, vm)
def on_destroy(self, *_args: Any) -> None:
self.tray_icon.destroy()
self.vms.kill_all()
VMs.use().kill_all()

View File

@ -1,47 +1,59 @@
{ lib, stdenv, clan-vm-manager, gtk4, libadwaita, clan-cli, mkShell, ruff, desktop-file-utils, xdg-utils, mypy, python3Packages }:
mkShell {
inherit (clan-vm-manager) propagatedBuildInputs buildInputs;
{ lib, runCommand, makeWrapper, stdenv, clan-vm-manager, gdb, gtk4, libadwaita, clan-cli, mkShell, ruff, desktop-file-utils, xdg-utils, mypy, python3, python3Packages }:
mkShell (
let
pygdb = runCommand "pygdb" { buildInputs = [ gdb python3 makeWrapper ]; } ''
mkdir -p "$out/bin"
makeWrapper "${gdb}/bin/gdb" "$out/bin/pygdb" \
--add-flags '-ex "source ${python3}/share/gdb/libpython.py"'
'';
in
{
inherit (clan-vm-manager) propagatedBuildInputs buildInputs;
linuxOnlyPackages = lib.optionals stdenv.isLinux [
xdg-utils
];
nativeBuildInputs = [
ruff
desktop-file-utils
mypy
python3Packages.ipdb
gtk4.dev
libadwaita.devdoc # has the demo called 'adwaita-1-demo'
] ++ clan-vm-manager.nativeBuildInputs;
PYTHONBREAKPOINT = "ipdb.set_trace";
shellHook = ''
ln -sfT ${clan-cli.nixpkgs} ../clan-cli/clan_cli/nixpkgs
# prepend clan-cli for development
export PYTHONPATH=../clan-cli:$PYTHONPATH
linuxOnlyPackages = lib.optionals stdenv.isLinux [
xdg-utils
pygdb
];
if ! command -v xdg-mime &> /dev/null; then
echo "Warning: 'xdg-mime' is not available. The desktop file cannot be installed."
fi
# To debug clan-vm-manger execute pygdb --args python ./bin/clan-vm-manager
nativeBuildInputs = [
ruff
desktop-file-utils
mypy
python3Packages.ipdb
gtk4.dev
libadwaita.devdoc # has the demo called 'adwaita-1-demo'
] ++ clan-vm-manager.nativeBuildInputs;
# install desktop file
set -eou pipefail
DESKTOP_FILE_NAME=lol.clan.vm.manager.desktop
DESKTOP_DST=~/.local/share/applications/$DESKTOP_FILE_NAME
DESKTOP_SRC=${clan-vm-manager}/share/applications/$DESKTOP_FILE_NAME
UI_BIN="${clan-vm-manager}/bin/clan-vm-manager"
PYTHONBREAKPOINT = "ipdb.set_trace";
cp -f $DESKTOP_SRC $DESKTOP_DST
sleep 2
sed -i "s|Exec=.*clan-vm-manager|Exec=$UI_BIN|" $DESKTOP_DST
xdg-mime default $DESKTOP_FILE_NAME x-scheme-handler/clan
echo "==== Validating desktop file installation ===="
set -x
desktop-file-validate $DESKTOP_DST
set +xeou pipefail
'';
}
shellHook = ''
ln -sfT ${clan-cli.nixpkgs} ../clan-cli/clan_cli/nixpkgs
# prepend clan-cli for development
export PYTHONPATH=../clan-cli:$PYTHONPATH
if ! command -v xdg-mime &> /dev/null; then
echo "Warning: 'xdg-mime' is not available. The desktop file cannot be installed."
fi
# install desktop file
set -eou pipefail
DESKTOP_FILE_NAME=lol.clan.vm.manager.desktop
DESKTOP_DST=~/.local/share/applications/$DESKTOP_FILE_NAME
DESKTOP_SRC=${clan-vm-manager}/share/applications/$DESKTOP_FILE_NAME
UI_BIN="${clan-vm-manager}/bin/clan-vm-manager"
cp -f $DESKTOP_SRC $DESKTOP_DST
sleep 2
sed -i "s|Exec=.*clan-vm-manager|Exec=$UI_BIN|" $DESKTOP_DST
xdg-mime default $DESKTOP_FILE_NAME x-scheme-handler/clan
echo "==== Validating desktop file installation ===="
set -x
desktop-file-validate $DESKTOP_DST
set +xeou pipefail
'';
}
)