forked from clan/clan-core
clan_vm_manager: Working GKVStore that emulates the ListStore Object
This commit is contained in:
parent
df6683a0bd
commit
d079bc85a8
@ -14,32 +14,51 @@ log = logging.getLogger(__name__)
|
||||
# Define type variables for key and value types
|
||||
K = TypeVar("K") # Key type
|
||||
V = TypeVar(
|
||||
"V", bound=GObject.GObject
|
||||
"V", bound=GObject.Object
|
||||
) # Value type, bound to GObject.GObject or its subclasses
|
||||
|
||||
|
||||
class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
|
||||
__gtype_name__ = "MyGKVStore"
|
||||
"""
|
||||
A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values.
|
||||
Only use self[key] and del self[key] for accessing the items for better performance.
|
||||
This class could be optimized by having the objects remember their position in the list.
|
||||
"""
|
||||
|
||||
def __init__(self, gtype: GObject.GType, key_gen: Callable[[V], K]) -> None:
|
||||
def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None:
|
||||
super().__init__()
|
||||
self.gtype = gtype
|
||||
self.key_gen = key_gen
|
||||
self._items: "OrderedDict[K, V]" = OrderedDict()
|
||||
|
||||
# The rest of your class implementation...
|
||||
|
||||
@classmethod
|
||||
def new(cls: Any, gtype: GObject.GType) -> "GKVStore":
|
||||
def new(cls: Any, gtype: type[V]) -> "GKVStore":
|
||||
return cls.__new__(cls, gtype)
|
||||
|
||||
#########################
|
||||
# #
|
||||
# READ OPERATIONS #
|
||||
# #
|
||||
#########################
|
||||
def keys(self) -> list[K]:
|
||||
return list(self._items.keys())
|
||||
|
||||
def values(self) -> list[V]:
|
||||
return list(self._items.values())
|
||||
|
||||
def items(self) -> list[tuple[K, V]]:
|
||||
return list(self._items.items())
|
||||
|
||||
def get(self, key: K, default: V | None = None) -> V | None:
|
||||
return self._items.get(key, default)
|
||||
|
||||
def get_n_items(self) -> int:
|
||||
return len(self._items)
|
||||
|
||||
def do_get_n_items(self) -> int:
|
||||
return self.get_n_items()
|
||||
|
||||
def get_item(self, position: int) -> V | None:
|
||||
if position < 0 or position >= self.get_n_items():
|
||||
return None
|
||||
@ -47,9 +66,34 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
|
||||
key = list(self._items.keys())[position]
|
||||
return self._items[key]
|
||||
|
||||
def get_item_type(self) -> GObject.GType:
|
||||
return self.gtype
|
||||
def do_get_item(self, position: int) -> V | None:
|
||||
return self.get_item(position)
|
||||
|
||||
def get_item_type(self) -> GObject.GType:
|
||||
return self.gtype.__gtype__
|
||||
|
||||
def do_get_item_type(self) -> GObject.GType:
|
||||
return self.get_item_type()
|
||||
|
||||
def first(self) -> V:
|
||||
return self.values()[0]
|
||||
|
||||
def last(self) -> V:
|
||||
return self.values()[-1]
|
||||
|
||||
# O(n) operation
|
||||
def find(self, item: V) -> tuple[bool, int]:
|
||||
log.warning("Finding is O(n) in GKVStore. Better use indexing")
|
||||
for i, v in enumerate(self.values()):
|
||||
if v == item:
|
||||
return True, i
|
||||
return False, -1
|
||||
|
||||
#########################
|
||||
# #
|
||||
# WRITE OPERATIONS #
|
||||
# #
|
||||
#########################
|
||||
def insert(self, position: int, item: V) -> None:
|
||||
key = self.key_gen(item)
|
||||
if key in self._items:
|
||||
@ -58,9 +102,10 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
|
||||
raise IndexError("Index out of range")
|
||||
|
||||
# Temporary storage for items to be reinserted
|
||||
temp_list = [(k, self._items[k]) for k in list(self._items.keys())[position:]]
|
||||
temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]]
|
||||
|
||||
# Delete items from the original dict
|
||||
for k in list(self._items.keys())[position:]:
|
||||
for k in list(self.keys())[position:]:
|
||||
del self._items[k]
|
||||
|
||||
# Insert the new key-value pair
|
||||
@ -75,46 +120,25 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
|
||||
|
||||
def append(self, item: V) -> None:
|
||||
key = self.key_gen(item)
|
||||
self._items[key] = item
|
||||
self[key] = item
|
||||
|
||||
def remove(self, position: int) -> None:
|
||||
if position < 0 or position >= self.get_n_items():
|
||||
return
|
||||
key = list(self._items.keys())[position]
|
||||
del self._items[key]
|
||||
key = self.keys()[position]
|
||||
del self[key]
|
||||
self.items_changed(position, 1, 0)
|
||||
|
||||
def remove_all(self) -> None:
|
||||
self._items.clear()
|
||||
self.items_changed(0, len(self._items), 0)
|
||||
|
||||
# O(n) operation
|
||||
def find(self, item: V) -> tuple[bool, int]:
|
||||
log.debug("Finding is O(n) in GKVStore. Better use indexing")
|
||||
for i, v in enumerate(self._items.values()):
|
||||
if v == item:
|
||||
return True, i
|
||||
return False, -1
|
||||
|
||||
def first(self) -> V:
|
||||
res = next(iter(self._items.values()))
|
||||
if res is None:
|
||||
raise ValueError("The store is empty")
|
||||
return res
|
||||
|
||||
def last(self) -> V:
|
||||
res = next(reversed(self._items.values()))
|
||||
if res is None:
|
||||
raise ValueError("The store is empty")
|
||||
return res
|
||||
|
||||
# O(1) operation if the key does not exist, O(n) if it does
|
||||
def __setitem__(self, key: K, value: V) -> None:
|
||||
# If the key already exists, remove it O(n)
|
||||
if key in self._items:
|
||||
position = list(self._items.keys()).index(key)
|
||||
del self._items[key]
|
||||
self.items_changed(position, 1, 0)
|
||||
log.warning("Updating an existing key in GKVStore is O(n)")
|
||||
del self[key]
|
||||
|
||||
# Add the new key-value pair
|
||||
self._items[key] = value
|
||||
@ -124,29 +148,26 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
|
||||
|
||||
# O(n) operation
|
||||
def __delitem__(self, key: K) -> None:
|
||||
position = list(self._items.keys()).index(key)
|
||||
position = self.keys().index(key)
|
||||
del self._items[key]
|
||||
self.items_changed(position, 1, 0)
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._items)
|
||||
|
||||
# O(1) operation
|
||||
def __getitem__(self, key: K) -> V:
|
||||
return self._items[key]
|
||||
|
||||
def sort(self) -> None:
|
||||
raise NotImplementedError("Sorting is not supported")
|
||||
def __contains__(self, key: K) -> bool:
|
||||
return key in self._items
|
||||
|
||||
def find_with_equal_func(self, item: V, equal_func: Callable[[V, V], bool]) -> int:
|
||||
raise NotImplementedError("Finding is not supported")
|
||||
def __str__(self) -> str:
|
||||
resp = "GKVStore(\n"
|
||||
for k, v in self._items.items():
|
||||
resp += f"{k}: {v}\n"
|
||||
resp += ")"
|
||||
return resp
|
||||
|
||||
def find_with_equal_func_full(
|
||||
self, item: V, equal_func: Callable[[V, V], bool], user_data: object | None
|
||||
) -> int:
|
||||
raise NotImplementedError("Finding is not supported")
|
||||
|
||||
def insert_sorted(
|
||||
self, item: V, compare_func: Callable[[V, V], int], user_data: object | None
|
||||
) -> None:
|
||||
raise NotImplementedError("Sorting is not supported")
|
||||
|
||||
def splice(self, position: int, n_removals: int, additions: list[V]) -> None:
|
||||
raise NotImplementedError("Splicing is not supported")
|
||||
def __repr__(self) -> str:
|
||||
return self._items.__str__()
|
||||
|
@ -1,34 +1,35 @@
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import weakref
|
||||
from collections.abc import Generator
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import IO, Any, ClassVar, NewType
|
||||
from typing import IO, Any, ClassVar
|
||||
|
||||
import gi
|
||||
from clan_cli import vms
|
||||
from clan_cli.clan_uri import ClanScheme, ClanURI
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.history.add import HistoryEntry
|
||||
from clan_cli.machines.machines import Machine
|
||||
|
||||
from .executor import MPProcess, spawn
|
||||
from .gkvstore import GKVStore
|
||||
|
||||
gi.require_version("GObject", "2.0")
|
||||
gi.require_version("Gtk", "4.0")
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import threading
|
||||
|
||||
from clan_cli.machines.machines import Machine
|
||||
from gi.repository import GLib, GObject, Gtk
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VM(GObject.Object):
|
||||
__gtype_name__: ClassVar = "VMGobject"
|
||||
# Define a custom signal with the name "vm_stopped" and a string argument for the message
|
||||
__gsignals__: ClassVar = {
|
||||
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object])
|
||||
@ -301,11 +302,21 @@ class VM(GObject.Object):
|
||||
return ""
|
||||
return self.vm_process.out_file.read_text()
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"VM({self.get_id()})"
|
||||
|
||||
VMStore = NewType("VMStore", GKVStore[str, VM])
|
||||
def __repr__(self) -> str:
|
||||
return self.__str__()
|
||||
|
||||
|
||||
class VMs(GObject.Object):
|
||||
class VMStore(GKVStore):
|
||||
__gtype_name__ = "MyVMStore"
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(VM, lambda vm: vm.data.flake.flake_attr)
|
||||
|
||||
|
||||
class VMs:
|
||||
_instance: "None | VMs" = None
|
||||
_clan_store: GKVStore[str, VMStore]
|
||||
|
||||
@ -320,6 +331,7 @@ class VMs(GObject.Object):
|
||||
cls._clan_store = GKVStore(
|
||||
VMStore, lambda store: store.first().data.flake.flake_url
|
||||
)
|
||||
|
||||
return cls._instance
|
||||
|
||||
@property
|
||||
@ -328,17 +340,27 @@ class VMs(GObject.Object):
|
||||
|
||||
def push(self, vm: VM) -> None:
|
||||
url = vm.data.flake.flake_url
|
||||
|
||||
# Only write to the store if the VM is not already in it
|
||||
# Every write to the KVStore rerenders bound widgets to the clan_store
|
||||
if url not in self.clan_store:
|
||||
self.clan_store[url] = GKVStore[str, VM](
|
||||
VM, lambda vm: vm.data.flake.flake_attr
|
||||
)
|
||||
self.clan_store[url].append(vm)
|
||||
log.debug(f"Creating new VMStore for {url}")
|
||||
vm_store = VMStore()
|
||||
vm_store.append(vm)
|
||||
self.clan_store[url] = vm_store
|
||||
else:
|
||||
log.debug(f"Appending VM {vm.data.flake.flake_attr} to store")
|
||||
vm_store = self.clan_store[url]
|
||||
vm_store.append(vm)
|
||||
|
||||
def remove(self, vm: VM) -> None:
|
||||
del self.clan_store[vm.data.flake.flake_url][vm.data.flake.flake_attr]
|
||||
|
||||
def get_vm(self, flake_url: str, flake_attr: str) -> None | VM:
|
||||
return self.clan_store.get(flake_url, {}).get(flake_attr, None)
|
||||
clan = self.clan_store.get(flake_url)
|
||||
if clan is None:
|
||||
return None
|
||||
return clan.get(flake_attr, None)
|
||||
|
||||
def get_running_vms(self) -> list[VM]:
|
||||
return [
|
||||
|
@ -4,7 +4,7 @@ from functools import partial
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
from clan_cli import ClanError, history, machines
|
||||
from clan_cli import history, machines
|
||||
from clan_cli.clan_uri import ClanURI
|
||||
|
||||
from clan_vm_manager.models.interfaces import ClanConfig
|
||||
@ -69,6 +69,7 @@ class ClanList(Gtk.Box):
|
||||
self, boxed_list: Gtk.ListBox, vm_store: VMStore
|
||||
) -> Gtk.Widget:
|
||||
vm = vm_store.first()
|
||||
log.debug("Rendering group row for %s", vm.data.flake.flake_url)
|
||||
grp = Adw.PreferencesGroup()
|
||||
grp.set_title(vm.data.flake.clan_name)
|
||||
grp.set_description(vm.data.flake.flake_url)
|
||||
@ -80,7 +81,7 @@ class ClanList(Gtk.Box):
|
||||
|
||||
menu_model = Gio.Menu()
|
||||
for vm in machines.list.list_machines(flake_url=vm.data.flake.flake_url):
|
||||
if vm not in [item.data.flake.flake_attr for item in VMs.use().list_store]:
|
||||
if vm not in vm_store:
|
||||
menu_model.append(vm, f"app.add::{vm}")
|
||||
|
||||
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
|
||||
@ -94,11 +95,8 @@ class ClanList(Gtk.Box):
|
||||
|
||||
grp.set_header_suffix(box)
|
||||
|
||||
# vm_list = create_boxed_list(
|
||||
# model=group, render_row=self.render_vm_row
|
||||
# )
|
||||
|
||||
# grp.add(vm_list)
|
||||
vm_list = create_boxed_list(model=vm_store, render_row=self.render_vm_row)
|
||||
grp.add(vm_list)
|
||||
|
||||
return grp
|
||||
|
||||
@ -177,12 +175,8 @@ class ClanList(Gtk.Box):
|
||||
|
||||
def on_edit(self, action: Any, parameter: Any) -> None:
|
||||
target = parameter.get_string()
|
||||
vm = VMs.use().get_by_id(target)
|
||||
|
||||
if not vm:
|
||||
raise ClanError("Something went wrong. Please restart the app.")
|
||||
|
||||
print("Editing settings for machine", vm)
|
||||
print("Editing settings for machine", target)
|
||||
|
||||
def render_join_row(self, boxed_list: Gtk.ListBox, item: JoinValue) -> Gtk.Widget:
|
||||
if boxed_list.has_css_class("no-shadow"):
|
||||
@ -194,9 +188,7 @@ class ClanList(Gtk.Box):
|
||||
row.set_subtitle(item.url.get_internal())
|
||||
row.add_css_class("trust")
|
||||
|
||||
# TODO: figure out how to detect that
|
||||
exist = VMs.use().use().get_by_id(item.url.get_id())
|
||||
if exist:
|
||||
if item.url.params.flake_attr in VMs.use().clan_store:
|
||||
sub = row.get_subtitle()
|
||||
row.set_subtitle(
|
||||
sub + "\nClan already exists. Joining again will update it"
|
||||
|
@ -73,7 +73,6 @@ class MainWindow(Adw.ApplicationWindow):
|
||||
data=entry,
|
||||
)
|
||||
GLib.idle_add(lambda: VMs.use().push(vm))
|
||||
time.sleep(0.5) # Add sleep for testing purposes
|
||||
|
||||
def on_destroy(self, *_args: Any) -> None:
|
||||
self.tray_icon.destroy()
|
||||
|
@ -3,7 +3,7 @@ mkShell (
|
||||
let
|
||||
pygdb = runCommand "pygdb" { buildInputs = [ gdb python3 makeWrapper ]; } ''
|
||||
mkdir -p "$out/bin"
|
||||
makeWrapper "${gdb}/bin/gdb" "$out/bin/gdb" \
|
||||
makeWrapper "${gdb}/bin/gdb" "$out/bin/pygdb" \
|
||||
--add-flags '-ex "source ${python3}/share/gdb/libpython.py"'
|
||||
'';
|
||||
in
|
||||
@ -15,6 +15,7 @@ mkShell (
|
||||
];
|
||||
|
||||
|
||||
# To debug clan-vm-manger execute pygdb --args python ./bin/clan-vm-manager
|
||||
nativeBuildInputs = [
|
||||
pygdb
|
||||
ruff
|
||||
|
Loading…
Reference in New Issue
Block a user