clan_vm_manager: Add GKVStore to combat O(n2) runtimes. Add pygdb to devshell

This commit is contained in:
Luis Hebendanz 2024-02-29 22:46:09 +07:00
parent 4b3b573e8c
commit df6683a0bd
6 changed files with 283 additions and 210 deletions

View File

@ -0,0 +1,152 @@
import logging
from collections import OrderedDict
from collections.abc import Callable
from typing import Any, Generic, TypeVar
import gi
gi.require_version("Gio", "2.0")
from gi.repository import Gio, GObject
log = logging.getLogger(__name__)
# Define type variables for key and value types
K = TypeVar("K") # Key type
V = TypeVar(
"V", bound=GObject.GObject
) # Value type, bound to GObject.GObject or its subclasses
class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
"""
A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values.
Only use self[key] and del self[key] for accessing the items for better performance.
This class could be optimized by having the objects remember their position in the list.
"""
def __init__(self, gtype: GObject.GType, key_gen: Callable[[V], K]) -> None:
super().__init__()
self.gtype = gtype
self.key_gen = key_gen
self._items: "OrderedDict[K, V]" = OrderedDict()
# The rest of your class implementation...
@classmethod
def new(cls: Any, gtype: GObject.GType) -> "GKVStore":
return cls.__new__(cls, gtype)
def get_n_items(self) -> int:
return len(self._items)
def get_item(self, position: int) -> V | None:
if position < 0 or position >= self.get_n_items():
return None
# Access items by index since OrderedDict does not support direct indexing
key = list(self._items.keys())[position]
return self._items[key]
def get_item_type(self) -> GObject.GType:
return self.gtype
def insert(self, position: int, item: V) -> None:
key = self.key_gen(item)
if key in self._items:
raise ValueError("Key already exists in the dictionary")
if position < 0 or position > len(self._items):
raise IndexError("Index out of range")
# Temporary storage for items to be reinserted
temp_list = [(k, self._items[k]) for k in list(self._items.keys())[position:]]
# Delete items from the original dict
for k in list(self._items.keys())[position:]:
del self._items[k]
# Insert the new key-value pair
self._items[key] = item
# Reinsert the items
for i, (k, v) in enumerate(temp_list):
self._items[k] = v
# Notify the model of the changes
self.items_changed(position, 0, 1)
def append(self, item: V) -> None:
key = self.key_gen(item)
self._items[key] = item
def remove(self, position: int) -> None:
if position < 0 or position >= self.get_n_items():
return
key = list(self._items.keys())[position]
del self._items[key]
self.items_changed(position, 1, 0)
def remove_all(self) -> None:
self._items.clear()
self.items_changed(0, len(self._items), 0)
# O(n) operation
def find(self, item: V) -> tuple[bool, int]:
log.debug("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self._items.values()):
if v == item:
return True, i
return False, -1
def first(self) -> V:
res = next(iter(self._items.values()))
if res is None:
raise ValueError("The store is empty")
return res
def last(self) -> V:
res = next(reversed(self._items.values()))
if res is None:
raise ValueError("The store is empty")
return res
# O(1) operation if the key does not exist, O(n) if it does
def __setitem__(self, key: K, value: V) -> None:
# If the key already exists, remove it O(n)
if key in self._items:
position = list(self._items.keys()).index(key)
del self._items[key]
self.items_changed(position, 1, 0)
# Add the new key-value pair
self._items[key] = value
self._items.move_to_end(key)
position = len(self._items) - 1
self.items_changed(position, 0, 1)
# O(n) operation
def __delitem__(self, key: K) -> None:
position = list(self._items.keys()).index(key)
del self._items[key]
self.items_changed(position, 1, 0)
# O(1) operation
def __getitem__(self, key: K) -> V:
return self._items[key]
def sort(self) -> None:
raise NotImplementedError("Sorting is not supported")
def find_with_equal_func(self, item: V, equal_func: Callable[[V, V], bool]) -> int:
raise NotImplementedError("Finding is not supported")
def find_with_equal_func_full(
self, item: V, equal_func: Callable[[V, V], bool], user_data: object | None
) -> int:
raise NotImplementedError("Finding is not supported")
def insert_sorted(
self, item: V, compare_func: Callable[[V, V], int], user_data: object | None
) -> None:
raise NotImplementedError("Sorting is not supported")
def splice(self, position: int, n_removals: int, additions: list[V]) -> None:
raise NotImplementedError("Splicing is not supported")

View File

@ -9,7 +9,6 @@ from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import add_history
from clan_vm_manager.errors.show_error import show_error_dialog
from clan_vm_manager.models.use_vms import Clans
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
@ -75,8 +74,6 @@ class Join:
def after_join(item: JoinValue, _: Any) -> None:
self.discard(item)
Clans.use().refresh()
# VMS.use().refresh()
print("Refreshed list after join")
on_join(item)

View File

@ -6,19 +6,16 @@ from collections.abc import Generator
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import IO, Any, ClassVar
from typing import IO, Any, ClassVar, NewType
import gi
from clan_cli import vms
from clan_cli.clan_uri import ClanScheme, ClanURI
from clan_cli.errors import ClanError
from clan_cli.history.add import HistoryEntry
from clan_cli.history.list import list_history
from clan_vm_manager import assets
from clan_vm_manager.errors.show_error import show_error_dialog
from .executor import MPProcess, spawn
from .gkvstore import GKVStore
gi.require_version("Gtk", "4.0")
import logging
@ -26,77 +23,11 @@ import multiprocessing as mp
import threading
from clan_cli.machines.machines import Machine
from gi.repository import Gio, GLib, GObject, Gtk
from gi.repository import GLib, GObject, Gtk
log = logging.getLogger(__name__)
class ClanGroup(GObject.Object):
def __init__(self, url: str | Path, vms: list["VM"]) -> None:
super().__init__()
self.url = url
self.vms = vms
self.clan_name = vms[0].data.flake.clan_name
self.list_store = Gio.ListStore.new(VM)
for vm in vms:
self.list_store.append(vm)
def init_grp_store(list_store: Gio.ListStore) -> None:
groups: dict[str | Path, list["VM"]] = {}
for vm in get_saved_vms():
ll = groups.get(vm.data.flake.flake_url, [])
ll.append(vm)
groups[vm.data.flake.flake_url] = ll
for url, vm_list in groups.items():
grp = ClanGroup(url, vm_list)
list_store.append(grp)
class Clans:
list_store: Gio.ListStore
_instance: "None | ClanGroup" = None
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
raise RuntimeError("Call use() instead")
@classmethod
def use(cls: Any) -> "ClanGroup":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.list_store = Gio.ListStore.new(ClanGroup)
init_grp_store(cls.list_store)
return cls._instance
def filter_by_name(self, text: str) -> None:
if text:
filtered_list = self.list_store
filtered_list.remove_all()
groups: dict[str | Path, list["VM"]] = {}
for vm in get_saved_vms():
ll = groups.get(vm.data.flake.flake_url, [])
print(text, vm.data.flake.vm.machine_name)
if text.lower() in vm.data.flake.vm.machine_name.lower():
ll.append(vm)
groups[vm.data.flake.flake_url] = ll
for url, vm_list in groups.items():
grp = ClanGroup(url, vm_list)
filtered_list.append(grp)
else:
self.refresh()
def refresh(self) -> None:
self.list_store.remove_all()
init_grp_store(self.list_store)
class VM(GObject.Object):
# Define a custom signal with the name "vm_stopped" and a string argument for the message
__gsignals__: ClassVar = {
@ -371,9 +302,12 @@ class VM(GObject.Object):
return self.vm_process.out_file.read_text()
class VMs:
list_store: Gio.ListStore
VMStore = NewType("VMStore", GKVStore[str, VM])
class VMs(GObject.Object):
_instance: "None | VMs" = None
_clan_store: GKVStore[str, VMStore]
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
@ -383,60 +317,37 @@ class VMs:
def use(cls: Any) -> "VMs":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.list_store = Gio.ListStore.new(VM)
for vm in get_saved_vms():
cls.list_store.append(vm)
cls._clan_store = GKVStore(
VMStore, lambda store: store.first().data.flake.flake_url
)
return cls._instance
def filter_by_name(self, text: str) -> None:
if text:
filtered_list = self.list_store
filtered_list.remove_all()
for vm in get_saved_vms():
if text.lower() in vm.data.flake.vm.machine_name.lower():
filtered_list.append(vm)
else:
self.refresh()
@property
def clan_store(self) -> GKVStore[str, VMStore]:
return self._clan_store
def get_by_id(self, ident: str) -> None | VM:
for vm in self.list_store:
if ident == vm.get_id():
return vm
return None
def push(self, vm: VM) -> None:
url = vm.data.flake.flake_url
if url not in self.clan_store:
self.clan_store[url] = GKVStore[str, VM](
VM, lambda vm: vm.data.flake.flake_attr
)
self.clan_store[url].append(vm)
def remove(self, vm: VM) -> None:
del self.clan_store[vm.data.flake.flake_url][vm.data.flake.flake_attr]
def get_vm(self, flake_url: str, flake_attr: str) -> None | VM:
return self.clan_store.get(flake_url, {}).get(flake_attr, None)
def get_running_vms(self) -> list[VM]:
return list(filter(lambda vm: vm.is_running(), self.list_store))
return [
vm
for clan in self.clan_store.values()
for vm in clan.values()
if vm.is_running()
]
def kill_all(self) -> None:
for vm in self.get_running_vms():
vm.kill()
def refresh(self) -> None:
log.error("NEVER FUCKING DO THIS")
return
self.list_store.remove_all()
for vm in get_saved_vms():
self.list_store.append(vm)
def get_saved_vms() -> list[VM]:
vm_list = []
log.info("=====CREATING NEW VM OBJ====")
try:
# Execute `clan flakes add <path>` to democlan for this to work
for entry in list_history():
if entry.flake.icon is None:
icon = assets.loc / "placeholder.jpeg"
else:
icon = entry.flake.icon
base = VM(
icon=Path(icon),
data=entry,
)
vm_list.append(base)
except ClanError as e:
show_error_dialog(e)
return vm_list

View File

@ -9,13 +9,11 @@ from clan_cli.clan_uri import ClanURI
from clan_vm_manager.models.interfaces import ClanConfig
from clan_vm_manager.models.use_join import Join, JoinValue
from clan_vm_manager.models.use_vms import VMs
from clan_vm_manager.models.use_vms import VM, VMs, VMStore
gi.require_version("Adw", "1")
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
from clan_vm_manager.models.use_vms import VM, ClanGroup, Clans
log = logging.getLogger(__name__)
@ -51,42 +49,29 @@ class ClanList(Gtk.Box):
self.app = Gio.Application.get_default()
self.app.connect("join_request", self.on_join_request)
groups = Clans.use()
join = Join.use()
self.log_label: Gtk.Label = Gtk.Label()
self.__init_machines = history.add.list_history()
# Add join list
self.join_boxed_list = create_boxed_list(
model=join.list_store, render_row=self.render_join_row
model=Join.use().list_store, render_row=self.render_join_row
)
self.join_boxed_list.add_css_class("join-list")
self.append(self.join_boxed_list)
self.group_list = create_boxed_list(
model=groups.list_store, render_row=self.render_group_row
model=VMs.use().clan_store, render_row=self.render_group_row
)
self.group_list.add_css_class("group-list")
# disable search bar because of unsound handling of VM objects
# search_bar = Gtk.SearchBar()
# # This widget will typically be the top-level window
# search_bar.set_key_capture_widget(Views.use().main_window)
# entry = Gtk.SearchEntry()
# entry.set_placeholder_text("Search cLan")
# entry.connect("search-changed", self.on_search_changed)
# entry.add_css_class("search-entry")
# search_bar.set_child(entry)
# self.append(search_bar)
self.append(self.join_boxed_list)
self.append(self.group_list)
def render_group_row(self, boxed_list: Gtk.ListBox, group: ClanGroup) -> Gtk.Widget:
# if boxed_list.has_css_class("no-shadow"):
# boxed_list.remove_css_class("no-shadow")
def render_group_row(
self, boxed_list: Gtk.ListBox, vm_store: VMStore
) -> Gtk.Widget:
vm = vm_store.first()
grp = Adw.PreferencesGroup()
grp.set_title(group.clan_name)
grp.set_description(group.url)
grp.set_title(vm.data.flake.clan_name)
grp.set_description(vm.data.flake.flake_url)
add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s"))
add_action.connect("activate", self.on_add)
@ -94,8 +79,8 @@ class ClanList(Gtk.Box):
app.add_action(add_action)
menu_model = Gio.Menu()
for vm in machines.list.list_machines(flake_url=group.url):
if vm not in [item.data.flake.flake_attr for item in group.list_store]:
for vm in machines.list.list_machines(flake_url=vm.data.flake.flake_url):
if vm not in [item.data.flake.flake_attr for item in VMs.use().list_store]:
menu_model.append(vm, f"app.add::{vm}")
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
@ -109,11 +94,11 @@ class ClanList(Gtk.Box):
grp.set_header_suffix(box)
vm_list = create_boxed_list(
model=group.list_store, render_row=self.render_vm_row
)
# vm_list = create_boxed_list(
# model=group, render_row=self.render_vm_row
# )
grp.add(vm_list)
# grp.add(vm_list)
return grp
@ -121,12 +106,6 @@ class ClanList(Gtk.Box):
target = parameter.get_string()
print("Adding new machine", target)
def on_search_changed(self, entry: Gtk.SearchEntry) -> None:
Clans.use().filter_by_name(entry.get_text())
# Disable the shadow if the list is empty
if not VMs.use().list_store.get_n_items():
self.group_list.add_css_class("no-shadow")
def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VM) -> Gtk.Widget:
# Remove no-shadow class if attached
if boxed_list.has_css_class("no-shadow"):

View File

@ -1,16 +1,21 @@
import threading
import time
from pathlib import Path
from typing import Any
import gi
from clan_cli.history.list import list_history
from clan_vm_manager import assets
from clan_vm_manager.models.interfaces import ClanConfig
from clan_vm_manager.models.use_views import Views
from clan_vm_manager.models.use_vms import VMs
from clan_vm_manager.models.use_vms import VM, VMs
from clan_vm_manager.views.details import Details
from clan_vm_manager.views.list import ClanList
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, Gtk
from gi.repository import Adw, Gio, GLib, Gtk
from ..trayicon import TrayIcon
@ -27,10 +32,12 @@ class MainWindow(Adw.ApplicationWindow):
header = Adw.HeaderBar()
view.add_top_bar(header)
self.vms = VMs.use()
app = Gio.Application.get_default()
self.tray_icon: TrayIcon = TrayIcon(app)
# Initialize all VMs
threading.Thread(target=self._populate_vms).start()
# Initialize all views
stack_view = Views.use().view
@ -52,6 +59,22 @@ class MainWindow(Adw.ApplicationWindow):
self.connect("destroy", self.on_destroy)
def _populate_vms(self) -> None:
# Execute `clan flakes add <path>` to democlan for this to work
# TODO: Make list_history a generator function
for entry in list_history():
if entry.flake.icon is None:
icon = assets.loc / "placeholder.jpeg"
else:
icon = entry.flake.icon
vm = VM(
icon=Path(icon),
data=entry,
)
GLib.idle_add(lambda: VMs.use().push(vm))
time.sleep(0.5) # Add sleep for testing purposes
def on_destroy(self, *_args: Any) -> None:
self.tray_icon.destroy()
self.vms.kill_all()
VMs.use().kill_all()

View File

@ -1,47 +1,58 @@
{ lib, stdenv, clan-vm-manager, gtk4, libadwaita, clan-cli, mkShell, ruff, desktop-file-utils, xdg-utils, mypy, python3Packages }:
mkShell {
inherit (clan-vm-manager) propagatedBuildInputs buildInputs;
{ lib, runCommand, makeWrapper, stdenv, clan-vm-manager, gdb, gtk4, libadwaita, clan-cli, mkShell, ruff, desktop-file-utils, xdg-utils, mypy, python3, python3Packages }:
mkShell (
let
pygdb = runCommand "pygdb" { buildInputs = [ gdb python3 makeWrapper ]; } ''
mkdir -p "$out/bin"
makeWrapper "${gdb}/bin/gdb" "$out/bin/gdb" \
--add-flags '-ex "source ${python3}/share/gdb/libpython.py"'
'';
in
{
inherit (clan-vm-manager) propagatedBuildInputs buildInputs;
linuxOnlyPackages = lib.optionals stdenv.isLinux [
xdg-utils
];
nativeBuildInputs = [
ruff
desktop-file-utils
mypy
python3Packages.ipdb
gtk4.dev
libadwaita.devdoc # has the demo called 'adwaita-1-demo'
] ++ clan-vm-manager.nativeBuildInputs;
PYTHONBREAKPOINT = "ipdb.set_trace";
shellHook = ''
ln -sfT ${clan-cli.nixpkgs} ../clan-cli/clan_cli/nixpkgs
# prepend clan-cli for development
export PYTHONPATH=../clan-cli:$PYTHONPATH
linuxOnlyPackages = lib.optionals stdenv.isLinux [
xdg-utils
];
if ! command -v xdg-mime &> /dev/null; then
echo "Warning: 'xdg-mime' is not available. The desktop file cannot be installed."
fi
nativeBuildInputs = [
pygdb
ruff
desktop-file-utils
mypy
python3Packages.ipdb
gtk4.dev
libadwaita.devdoc # has the demo called 'adwaita-1-demo'
] ++ clan-vm-manager.nativeBuildInputs;
# install desktop file
set -eou pipefail
DESKTOP_FILE_NAME=lol.clan.vm.manager.desktop
DESKTOP_DST=~/.local/share/applications/$DESKTOP_FILE_NAME
DESKTOP_SRC=${clan-vm-manager}/share/applications/$DESKTOP_FILE_NAME
UI_BIN="${clan-vm-manager}/bin/clan-vm-manager"
PYTHONBREAKPOINT = "ipdb.set_trace";
cp -f $DESKTOP_SRC $DESKTOP_DST
sleep 2
sed -i "s|Exec=.*clan-vm-manager|Exec=$UI_BIN|" $DESKTOP_DST
xdg-mime default $DESKTOP_FILE_NAME x-scheme-handler/clan
echo "==== Validating desktop file installation ===="
set -x
desktop-file-validate $DESKTOP_DST
set +xeou pipefail
'';
}
shellHook = ''
ln -sfT ${clan-cli.nixpkgs} ../clan-cli/clan_cli/nixpkgs
# prepend clan-cli for development
export PYTHONPATH=../clan-cli:$PYTHONPATH
if ! command -v xdg-mime &> /dev/null; then
echo "Warning: 'xdg-mime' is not available. The desktop file cannot be installed."
fi
# install desktop file
set -eou pipefail
DESKTOP_FILE_NAME=lol.clan.vm.manager.desktop
DESKTOP_DST=~/.local/share/applications/$DESKTOP_FILE_NAME
DESKTOP_SRC=${clan-vm-manager}/share/applications/$DESKTOP_FILE_NAME
UI_BIN="${clan-vm-manager}/bin/clan-vm-manager"
cp -f $DESKTOP_SRC $DESKTOP_DST
sleep 2
sed -i "s|Exec=.*clan-vm-manager|Exec=$UI_BIN|" $DESKTOP_DST
xdg-mime default $DESKTOP_FILE_NAME x-scheme-handler/clan
echo "==== Validating desktop file installation ===="
set -x
desktop-file-validate $DESKTOP_DST
set +xeou pipefail
'';
}
)