clan-app: Remove vm-manager from codebase

This commit is contained in:
Luis Hebendanz 2024-07-09 17:35:00 +02:00
parent aa286e4e63
commit bb9058f5ef
16 changed files with 116 additions and 2922 deletions

View File

@ -14,6 +14,9 @@
},
{
"path": "../../lib/build-clan"
},
{
"path": "../webview-ui"
}
],
"settings": {
@ -25,6 +28,7 @@
"**/.mypy_cache": true,
"**/.reports": true,
"**/.ruff_cache": true,
"**/.webui": true,
"**/result/**": true,
"/nix/store/**": true
},

View File

@ -0,0 +1,103 @@
import gi
gi.require_version("WebKit", "6.0")
from gi.repository import Gio, GLib, Gtk, WebKit
from clan_cli.api.directory import FileRequest
# Implement the abstract open_file function
def open_file(file_request: FileRequest) -> str | None:
# Function to handle the response and stop the loop
selected_path = None
def on_file_select(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.open_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected file or directory: {e}")
finally:
main_loop.quit()
def on_folder_select(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.select_folder_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected directory: {e}")
finally:
main_loop.quit()
def on_save_finish(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.save_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected file: {e}")
finally:
main_loop.quit()
dialog = Gtk.FileDialog()
if file_request.title:
dialog.set_title(file_request.title)
if file_request.filters:
filters = Gio.ListStore.new(Gtk.FileFilter)
file_filters = Gtk.FileFilter()
if file_request.filters.title:
file_filters.set_name(file_request.filters.title)
# Create and configure a filter for image files
if file_request.filters.mime_types:
for mime in file_request.filters.mime_types:
file_filters.add_mime_type(mime)
filters.append(file_filters)
if file_request.filters.patterns:
for pattern in file_request.filters.patterns:
file_filters.add_pattern(pattern)
if file_request.filters.suffixes:
for suffix in file_request.filters.suffixes:
file_filters.add_suffix(suffix)
filters.append(file_filters)
dialog.set_filters(filters)
main_loop = GLib.MainLoop()
# if select_folder
if file_request.mode == "select_folder":
dialog.select_folder(
callback=lambda dialog, task: on_folder_select(dialog, task, main_loop),
)
elif file_request.mode == "open_file":
dialog.open(
callback=lambda dialog, task: on_file_select(dialog, task, main_loop)
)
elif file_request.mode == "save":
dialog.save(
callback=lambda dialog, task: on_save_finish(dialog, task, main_loop)
)
# Wait for the user to select a file or directory
main_loop.run() # type: ignore
return selected_path

View File

@ -13,10 +13,9 @@ gi.require_version("Adw", "1")
from pathlib import Path
from clan_cli.custom_logger import setup_logging
from gi.repository import Adw, Gdk, Gio, Gtk
from gi.repository import Adw, Gdk, Gio, Gtk, GLib, GObject
from clan_app.components.interfaces import ClanConfig
from clan_app.singletons.use_join import GLib, GObject
from .windows.main_window import MainWindow
@ -75,8 +74,7 @@ class MainApplication(Adw.Application):
# TODO: Doesn't seem to raise the destroy signal. Need to investigate
# self.get_windows() returns an empty list. Desync between window and application?
self.window.close()
# Killing vms directly. This is dirty
self.window.kill_vms()
else:
log.error("No window to destroy")

View File

@ -1,132 +0,0 @@
import logging
import os
import signal
import sys
import traceback
from pathlib import Path
from typing import Any
import gi
gi.require_version("GdkPixbuf", "2.0")
import dataclasses
import multiprocessing as mp
from collections.abc import Callable
log = logging.getLogger(__name__)
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def _kill_group(proc: mp.Process) -> None:
pid = proc.pid
if proc.is_alive() and pid:
os.killpg(pid, signal.SIGTERM)
else:
log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead")
@dataclasses.dataclass(frozen=True)
class MPProcess:
name: str
proc: mp.Process
out_file: Path
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def kill_group(self) -> None:
_kill_group(proc=self.proc)
def _set_proc_name(name: str) -> None:
if sys.platform != "linux":
return
import ctypes
# Define the prctl function with the appropriate arguments and return type
libc = ctypes.CDLL("libc.so.6")
prctl = libc.prctl
prctl.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
]
prctl.restype = ctypes.c_int
# Set the process name to "my_process"
prctl(15, name.encode(), 0, 0, 0)
def _init_proc(
func: Callable,
out_file: Path,
proc_name: str,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
**kwargs: Any,
) -> None:
# Create a new process group
os.setsid()
# Open stdout and stderr
with open(out_file, "w") as out_fd:
os.dup2(out_fd.fileno(), sys.stdout.fileno())
os.dup2(out_fd.fileno(), sys.stderr.fileno())
# Print some information
pid = os.getpid()
gpid = os.getpgid(pid=pid)
# Set the process name
_set_proc_name(proc_name)
# Close stdin
sys.stdin.close()
linebreak = "=" * 5
# Execute the main function
print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr)
try:
func(**kwargs)
except Exception as ex:
traceback.print_exc()
if on_except is not None:
on_except(ex, mp.current_process())
# Kill the new process and all its children by sending a SIGTERM signal to the process group
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
os.killpg(gpid, signal.SIGTERM)
sys.exit(1)
# Don't use a finally block here, because we want the exitcode to be set to
# 0 if the function returns normally
def spawn(
*,
out_file: Path,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
func: Callable,
**kwargs: Any,
) -> MPProcess:
# Decouple the process from the parent
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="forkserver")
# Set names
proc_name = f"MPExec:{func.__name__}"
# Start the process
proc = mp.Process(
target=_init_proc,
args=(func, out_file, proc_name, on_except),
name=proc_name,
kwargs=kwargs,
)
proc.start()
# Return the process
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
return mp_proc

View File

@ -1,220 +0,0 @@
import logging
from collections.abc import Callable
from typing import Any, Generic, TypeVar
import gi
gi.require_version("Gio", "2.0")
from gi.repository import Gio, GObject
log = logging.getLogger(__name__)
# Define type variables for key and value types
K = TypeVar("K") # Key type
V = TypeVar(
"V", bound=GObject.Object
) # Value type, bound to GObject.GObject or its subclasses
class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
"""
A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values.
Only use self[key] and del self[key] for accessing the items for better performance.
This class could be optimized by having the objects remember their position in the list.
"""
def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None:
super().__init__()
self.gtype = gtype
self.key_gen = key_gen
# From Python 3.7 onwards dictionaries are ordered by default
self._items: dict[K, V] = dict()
##################################
# #
# Gio.ListStore Interface #
# #
##################################
@classmethod
def new(cls: Any, gtype: type[V]) -> "GKVStore":
return cls.__new__(cls, gtype)
def append(self, item: V) -> None:
key = self.key_gen(item)
self[key] = item
def find(self, item: V) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if v == item:
return True, i
return False, -1
def find_with_equal_func(
self, item: V, equal_func: Callable[[V, V], bool]
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item):
return True, i
return False, -1
def find_with_equal_func_full(
self, item: V, equal_func: Callable[[V, V, Any], bool], user_data: Any
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item, user_data):
return True, i
return False, -1
def insert(self, position: int, item: V) -> None:
log.warning("Inserting is O(n) in GKVStore. Better use append")
log.warning(
"This functions may have incorrect items_changed signal behavior. Please test it"
)
key = self.key_gen(item)
if key in self._items:
raise ValueError("Key already exists in the dictionary")
if position < 0 or position > len(self._items):
raise IndexError("Index out of range")
# Temporary storage for items to be reinserted
temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]]
# Delete items from the original dict
for k in list(self.keys())[position:]:
del self._items[k]
# Insert the new key-value pair
self._items[key] = item
# Reinsert the items
for i, (k, v) in enumerate(temp_list):
self._items[k] = v
# Notify the model of the changes
self.items_changed(position, 0, 1)
def insert_sorted(
self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any
) -> None:
raise NotImplementedError("insert_sorted is not implemented in GKVStore")
def remove(self, position: int) -> None:
if position < 0 or position >= self.get_n_items():
return
key = self.keys()[position]
del self[key]
self.items_changed(position, 1, 0)
def remove_all(self) -> None:
self._items.clear()
self.items_changed(0, len(self._items), 0)
def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None:
raise NotImplementedError("sort is not implemented in GKVStore")
def splice(self, position: int, n_removals: int, additions: list[V]) -> None:
raise NotImplementedError("splice is not implemented in GKVStore")
##################################
# #
# Gio.ListModel Interface #
# #
##################################
def get_item(self, position: int) -> V | None:
if position < 0 or position >= self.get_n_items():
return None
# Access items by index since OrderedDict does not support direct indexing
key = list(self._items.keys())[position]
return self._items[key]
def do_get_item(self, position: int) -> V | None:
return self.get_item(position)
def get_item_type(self) -> Any:
return self.gtype.__gtype__ # type: ignore[attr-defined]
def do_get_item_type(self) -> GObject.GType:
return self.get_item_type()
def get_n_items(self) -> int:
return len(self._items)
def do_get_n_items(self) -> int:
return self.get_n_items()
##################################
# #
# Dict Interface #
# #
##################################
def keys(self) -> list[K]:
return list(self._items.keys())
def values(self) -> list[V]:
return list(self._items.values())
def items(self) -> list[tuple[K, V]]:
return list(self._items.items())
def get(self, key: K, default: V | None = None) -> V | None:
return self._items.get(key, default)
# O(1) operation if the key does not exist, O(n) if it does
def __setitem__(self, key: K, value: V) -> None:
# If the key already exists, remove it O(n)
if key in self._items:
log.debug("Updating an existing key in GKVStore is O(n)")
position = self.keys().index(key)
self._items[key] = value
self.items_changed(position, 1, 1)
else:
# Add the new key-value pair
self._items[key] = value
position = max(len(self._items) - 1, 0)
self.items_changed(position, 0, 1)
# O(n) operation
def __delitem__(self, key: K) -> None:
position = self.keys().index(key)
del self._items[key]
self.items_changed(position, 1, 0)
def __len__(self) -> int:
return len(self._items)
# O(1) operation
def __getitem__(self, key: K) -> V: # type: ignore[override]
return self._items[key]
def __contains__(self, key: K) -> bool: # type: ignore[override]
return key in self._items
def __str__(self) -> str:
resp = "GKVStore(\n"
for k, v in self._items.items():
resp += f"{k}: {v}\n"
resp += ")"
return resp
def __repr__(self) -> str:
return self._items.__str__()
##################################
# #
# Custom Methods #
# #
##################################
def first(self) -> V:
return self.values()[0]
def last(self) -> V:
return self.values()[-1]
def register_on_change(
self, callback: Callable[["GKVStore[K,V]", int, int, int], None]
) -> None:
self.connect("items-changed", callback)

View File

@ -1,74 +0,0 @@
import logging
from collections.abc import Callable
from typing import TypeVar
import gi
from clan_app import assets
gi.require_version("Adw", "1")
from gi.repository import Adw, GdkPixbuf, Gio, GObject, Gtk
log = logging.getLogger(__name__)
ListItem = TypeVar("ListItem", bound=GObject.Object)
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
class EmptySplash(Gtk.Box):
def __init__(self, on_join: Callable[[str], None]) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
self.on_join = on_join
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
clan_icon = self.load_image(str(assets.get_asset("clan_black_notext.png")))
if clan_icon:
image = Gtk.Image.new_from_pixbuf(clan_icon)
else:
image = Gtk.Image.new_from_icon_name("image-missing")
# same as the clamp
image.set_pixel_size(400)
image.set_opacity(0.5)
image.set_margin_top(20)
image.set_margin_bottom(10)
vbox.append(image)
empty_label = Gtk.Label(label="Welcome to Clan! Join your first clan.")
join_entry = Gtk.Entry()
join_entry.set_placeholder_text("clan://<url>")
join_entry.set_hexpand(True)
join_button = Gtk.Button(label="Join")
join_button.connect("clicked", self._on_join, join_entry)
join_entry.connect("activate", lambda e: self._on_join(join_button, e))
clamp = Adw.Clamp()
clamp.set_maximum_size(400)
clamp.set_margin_bottom(40)
vbox.append(empty_label)
hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
hbox.append(join_entry)
hbox.append(join_button)
vbox.append(hbox)
clamp.set_child(vbox)
self.append(clamp)
def load_image(self, file_path: str) -> GdkPixbuf.Pixbuf | None:
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file(file_path)
return pixbuf
except Exception as e:
log.error(f"Failed to load image: {e}")
return None
def _on_join(self, button: Gtk.Button, entry: Gtk.Entry) -> None:
"""
Callback for the join button
Extracts the text from the entry and calls the on_join callback
"""
log.info(f"Splash screen: Joining {entry.get_text()}")
self.on_join(entry.get_text())

File diff suppressed because it is too large Load Diff

View File

@ -1,375 +0,0 @@
import logging
import multiprocessing as mp
import os
import tempfile
import threading
import time
import weakref
from collections.abc import Callable, Generator
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import IO, ClassVar
import gi
from clan_cli import vms
from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import HistoryEntry
from clan_cli.machines.machines import Machine
from clan_app.components.executor import MPProcess, spawn
from clan_app.singletons.toast import (
InfoToast,
SuccessToast,
ToastOverlay,
WarningToast,
)
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib, GObject, Gtk
log = logging.getLogger(__name__)
class VMObject(GObject.Object):
# Define a custom signal with the name "vm_stopped" and a string argument for the message
__gsignals__: ClassVar = {
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, []),
"vm_build_notify": (GObject.SignalFlags.RUN_FIRST, None, [bool, bool]),
}
def __init__(
self,
icon: Path,
data: HistoryEntry,
build_log_cb: Callable[[Gio.File], None],
) -> None:
super().__init__()
# Store the data from the history entry
self.data: HistoryEntry = data
self.build_log_cb = build_log_cb
# Create a process object to store the VM process
self.vm_process: MPProcess = MPProcess(
"vm_dummy", mp.Process(), Path("./dummy")
)
self.build_process: MPProcess = MPProcess(
"build_dummy", mp.Process(), Path("./dummy")
)
self._start_thread: threading.Thread = threading.Thread()
self.machine: Machine | None = None
# Watcher to stop the VM
self.KILL_TIMEOUT: int = 20 # seconds
self._stop_thread: threading.Thread = threading.Thread()
# Build progress bar vars
self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar()
self.progress_bar.hide()
self.progress_bar.set_hexpand(True) # Horizontally expand
self.prog_bar_id: int = 0
# Create a temporary directory to store the logs
self.log_dir: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory(
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
)
self._logs_id: int = 0
self._log_file: IO[str] | None = None
# To be able to set the switch state programmatically
# we need to store the handler id returned by the connect method
# and block the signal while we change the state. This is cursed.
self.switch: Gtk.Switch = Gtk.Switch()
self.switch_handler_id: int = self.switch.connect(
"notify::active", self._on_switch_toggle
)
self.connect("vm_status_changed", self._on_vm_status_changed)
# Make sure the VM is killed when the reference to this object is dropped
self._finalizer: weakref.finalize = weakref.finalize(self, self._kill_ref_drop)
def _vm_status_changed_task(self) -> bool:
self.emit("vm_status_changed")
return GLib.SOURCE_REMOVE
def update(self, data: HistoryEntry) -> None:
self.data = data
def _on_vm_status_changed(self, source: "VMObject") -> None:
# Signal may be emitted multiple times
self.emit("vm_build_notify", self.is_building(), self.is_running())
prev_state = self.switch.get_state()
next_state = self.is_running() and not self.is_building()
self.switch.set_state(next_state)
if prev_state is False and next_state is True:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"{source.data.flake.flake_attr} started").toast,
"success.vm.start",
)
if self.switch.get_sensitive() is False and not self.is_building():
self.switch.set_sensitive(True)
exit_vm = self.vm_process.proc.exitcode
exit_build = self.build_process.proc.exitcode
exitc = exit_vm or exit_build
if not self.is_running() and exitc != 0:
with self.switch.handler_block(self.switch_handler_id):
self.switch.set_active(False)
log.error(f"VM exited with error. Exitcode: {exitc}")
ToastOverlay.use().add_toast_unique(
WarningToast(f"VM exited with error. Exitcode: {exitc}").toast,
"warning.vm.exit",
)
def _on_switch_toggle(self, switch: Gtk.Switch, user_state: bool) -> None:
if switch.get_active():
switch.set_state(False)
switch.set_sensitive(False)
self.start()
else:
switch.set_state(True)
self.shutdown()
switch.set_sensitive(False)
# We use a context manager to create the machine object
# and make sure it is destroyed when the context is exited
@contextmanager
def _create_machine(self) -> Generator[Machine, None, None]:
uri = ClanURI.from_str(
url=str(self.data.flake.flake_url), machine_name=self.data.flake.flake_attr
)
if uri.flake_id.is_local():
self.machine = Machine(
name=self.data.flake.flake_attr,
flake=uri.flake_id.path,
)
if uri.flake_id.is_remote():
self.machine = Machine(
name=self.data.flake.flake_attr,
flake=uri.flake_id.url,
)
assert self.machine is not None
yield self.machine
self.machine = None
def _pulse_progress_bar_task(self) -> bool:
if self.progress_bar.is_visible():
self.progress_bar.pulse()
return GLib.SOURCE_CONTINUE
else:
return GLib.SOURCE_REMOVE
def __start(self) -> None:
with self._create_machine() as machine:
# Start building VM
tstart = datetime.now()
log.info(f"Building VM {self.get_id()}")
log_dir = Path(str(self.log_dir.name))
# Start the build process
self.build_process = spawn(
on_except=None,
out_file=log_dir / "build.log",
func=vms.run.build_vm,
machine=machine,
tmpdir=log_dir,
)
gfile = Gio.File.new_for_path(str(log_dir / "build.log"))
# Gio documentation:
# Obtains a file monitor for the given file.
# If no file notification mechanism exists, then regular polling of the file is used.
g_monitor = gfile.monitor_file(Gio.FileMonitorFlags.NONE, None)
g_monitor.connect("changed", self.on_logs_changed)
GLib.idle_add(self._vm_status_changed_task)
self.switch.set_sensitive(True)
# Start the logs watcher
self._logs_id = GLib.timeout_add(
50, self._get_logs_task, self.build_process
)
if self._logs_id == 0:
log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.build_process.out_file}")
# Start the progress bar and show it
self.progress_bar.show()
self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar_task)
if self.prog_bar_id == 0:
log.error("Couldn't spawn a progress bar task")
# Wait for the build to finish then hide the progress bar
self.build_process.proc.join()
tend = datetime.now()
log.info(f"VM {self.get_id()} build took {tend - tstart}s")
self.progress_bar.hide()
# Check if the VM was built successfully
if self.build_process.proc.exitcode != 0:
log.error(f"Failed to build VM {self.get_id()}")
GLib.idle_add(self._vm_status_changed_task)
return
log.info(f"Successfully built VM {self.get_id()}")
# Start the VM
self.vm_process = spawn(
on_except=None,
out_file=Path(str(self.log_dir.name)) / "vm.log",
func=vms.run.run_vm,
vm=self.data.flake.vm,
cachedir=log_dir,
socketdir=log_dir,
)
log.debug(f"Started VM {self.get_id()}")
GLib.idle_add(self._vm_status_changed_task)
# Start the logs watcher
self._logs_id = GLib.timeout_add(50, self._get_logs_task, self.vm_process)
if self._logs_id == 0:
log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.vm_process.out_file}")
# Wait for the VM to stop
self.vm_process.proc.join()
log.debug(f"VM {self.get_id()} has stopped")
GLib.idle_add(self._vm_status_changed_task)
def on_logs_changed(
self,
monitor: Gio.FileMonitor,
file: Gio.File,
other_file: Gio.File,
event_type: Gio.FileMonitorEvent,
) -> None:
if event_type == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
# File was changed and the changes were written to disk
# wire up the callback for setting the logs
self.build_log_cb(file)
def start(self) -> None:
if self.is_running():
log.warn("VM is already running. Ignoring start request")
self.emit("vm_status_changed", self)
return
log.debug(f"VM state dir {self.log_dir.name}")
self._start_thread = threading.Thread(target=self.__start)
self._start_thread.start()
def _get_logs_task(self, proc: MPProcess) -> bool:
if not proc.out_file.exists():
return GLib.SOURCE_CONTINUE
if not self._log_file:
try:
self._log_file = open(proc.out_file)
except Exception as ex:
log.exception(ex)
self._log_file = None
return GLib.SOURCE_REMOVE
line = os.read(self._log_file.fileno(), 4096)
if len(line) != 0:
print(line.decode("utf-8"), end="", flush=True)
if not proc.proc.is_alive():
log.debug("Removing logs watcher")
self._log_file = None
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
def is_running(self) -> bool:
return self._start_thread.is_alive()
def is_building(self) -> bool:
return self.build_process.proc.is_alive()
def is_shutting_down(self) -> bool:
return self._stop_thread.is_alive()
def get_id(self) -> str:
return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}"
def __stop(self) -> None:
log.info(f"Stopping VM {self.get_id()}")
start_time = datetime.now()
while self.is_running():
diff = datetime.now() - start_time
if diff.seconds > self.KILL_TIMEOUT:
log.error(
f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it"
)
self.vm_process.kill_group()
break
if self.is_building():
log.info(f"VM {self.get_id()} is still building. Killing it")
self.build_process.kill_group()
break
if not self.machine:
log.error(f"Machine object is None. Killing VM {self.get_id()}")
self.vm_process.kill_group()
break
# Try to shutdown the VM gracefully using QMP
try:
with self.machine.vm.qmp_ctx() as qmp:
qmp.command("system_powerdown")
except Exception as ex:
log.debug(f"QMP command 'system_powerdown' ignored. Error: {ex}")
# Try 20 times to stop the VM
time.sleep(self.KILL_TIMEOUT / 20)
GLib.idle_add(self._vm_status_changed_task)
log.debug(f"VM {self.get_id()} has stopped")
ToastOverlay.use().add_toast_unique(
InfoToast(f"Stopped {self.get_id()}").toast, "info.vm.exit"
)
def shutdown(self) -> None:
if not self.is_running():
log.warning("VM not running. Ignoring shutdown request.")
self.emit("vm_status_changed", self)
return
if self.is_shutting_down():
log.warning("Shutdown already in progress")
self.emit("vm_status_changed", self)
return
self._stop_thread = threading.Thread(target=self.__stop)
self._stop_thread.start()
def _kill_ref_drop(self) -> None:
if self.is_running():
log.warning("Killing VM due to reference drop")
self.kill()
def kill(self) -> None:
if not self.is_running():
log.warning(f"Tried to kill VM {self.get_id()} is not running")
return
log.info(f"Killing VM {self.get_id()} now")
if self.vm_process.proc.is_alive():
self.vm_process.kill_group()
if self.build_process.proc.is_alive():
self.build_process.kill_group()
def read_whole_log(self) -> str:
if not self.vm_process.out_file.exists():
log.error(f"Log file {self.vm_process.out_file} does not exist")
return ""
return self.vm_process.out_file.read_text()
def __str__(self) -> str:
return f"VM({self.get_id()})"
def __repr__(self) -> str:
return self.__str__()

View File

@ -10,7 +10,6 @@ gi.require_version("Adw", "1")
from gi.repository import Adw
from clan_app.singletons.use_views import ViewStack
from clan_app.views.logs import Logs
log = logging.getLogger(__name__)
@ -48,35 +47,6 @@ class ToastOverlay:
toast.connect("dismissed", lambda toast: self.active_toasts.remove(key))
class ErrorToast:
toast: Adw.Toast
def __init__(
self, message: str, persistent: bool = False, details: str = ""
) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"""<span foreground='red'>❌ Error </span> {message}"""
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.HIGH)
self.toast.set_button_label("Show more")
if persistent:
self.toast.set_timeout(0)
views = ViewStack.use().view
# we cannot check this type, python is not smart enough
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
logs_view.set_message(details)
self.toast.connect(
"button-clicked",
lambda _: views.set_visible_child_name("logs"),
)
class WarningToast:
toast: Adw.Toast

View File

@ -1,106 +0,0 @@
import logging
import threading
from collections.abc import Callable
from typing import Any, ClassVar, cast
import gi
from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import HistoryEntry, add_history
from clan_app.components.gkvstore import GKVStore
from clan_app.singletons.use_vms import ClanStore
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
class JoinValue(GObject.Object):
__gsignals__: ClassVar = {
"join_finished": (GObject.SignalFlags.RUN_FIRST, None, []),
}
url: ClanURI
entry: HistoryEntry | None
def _join_finished_task(self) -> bool:
self.emit("join_finished")
return GLib.SOURCE_REMOVE
def __init__(self, url: ClanURI) -> None:
super().__init__()
self.url: ClanURI = url
self.entry: HistoryEntry | None = None
def __join(self) -> None:
new_entry = add_history(self.url)
self.entry = new_entry
GLib.idle_add(self._join_finished_task)
def join(self) -> None:
threading.Thread(target=self.__join).start()
class JoinList:
"""
This is a singleton.
It is initialized with the first call of use()
"""
_instance: "None | JoinList" = None
list_store: Gio.ListStore
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
raise RuntimeError("Call use() instead")
@classmethod
def use(cls: Any) -> "JoinList":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.list_store = Gio.ListStore.new(JoinValue)
ClanStore.use().register_on_deep_change(cls._instance._rerender_join_list)
return cls._instance
def _rerender_join_list(
self, source: GKVStore, position: int, removed: int, added: int
) -> None:
self.list_store.items_changed(
0, self.list_store.get_n_items(), self.list_store.get_n_items()
)
def is_empty(self) -> bool:
return self.list_store.get_n_items() == 0
def push(self, uri: ClanURI, after_join: Callable[[JoinValue], None]) -> None:
"""
Add a join request.
This method can add multiple join requests if called subsequently for each request.
"""
value = JoinValue(uri)
if value.url.machine.get_id() in [
cast(JoinValue, item).url.machine.get_id() for item in self.list_store
]:
log.info(f"Join request already exists: {value.url}. Ignoring.")
return
value.connect("join_finished", self._on_join_finished)
value.connect("join_finished", after_join)
self.list_store.append(value)
def _on_join_finished(self, source: JoinValue) -> None:
log.info(f"Join finished: {source.url}")
self.discard(source)
assert source.entry is not None
ClanStore.use().push_history_entry(source.entry)
def discard(self, value: JoinValue) -> None:
(has, idx) = self.list_store.find(value)
if has:
self.list_store.remove(idx)

View File

@ -1,181 +0,0 @@
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any, ClassVar
import gi
from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import HistoryEntry
from clan_app import assets
from clan_app.components.gkvstore import GKVStore
from clan_app.components.vmobj import VMObject
from clan_app.singletons.use_views import ViewStack
from clan_app.views.logs import Logs
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
class VMStore(GKVStore):
def __init__(self) -> None:
super().__init__(VMObject, lambda vm: vm.data.flake.flake_attr)
class Emitter(GObject.GObject):
__gsignals__: ClassVar = {
"is_ready": (GObject.SignalFlags.RUN_FIRST, None, []),
}
class ClanStore:
_instance: "None | ClanStore" = None
_clan_store: GKVStore[str, VMStore]
_emitter: Emitter
# set the vm that is outputting logs
# build logs are automatically streamed to the logs-view
_logging_vm: VMObject | None = None
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
raise RuntimeError("Call use() instead")
@classmethod
def use(cls: Any) -> "ClanStore":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls._clan_store = GKVStore(
VMStore, lambda store: store.first().data.flake.flake_url
)
cls._emitter = Emitter()
return cls._instance
def emit(self, signal: str) -> None:
self._emitter.emit(signal)
def connect(self, signal: str, cb: Callable[(...), Any]) -> None:
self._emitter.connect(signal, cb)
def set_logging_vm(self, ident: str) -> VMObject | None:
vm = self.get_vm(ClanURI(f"clan://{ident}"))
if vm is not None:
self._logging_vm = vm
return self._logging_vm
def register_on_deep_change(
self, callback: Callable[[GKVStore, int, int, int], None]
) -> None:
"""
Register a callback that is called when a clan_store or one of the included VMStores changes
"""
def on_vmstore_change(
store: VMStore, position: int, removed: int, added: int
) -> None:
callback(store, position, removed, added)
def on_clanstore_change(
store: "GKVStore", position: int, removed: int, added: int
) -> None:
if added > 0:
store.values()[position].register_on_change(on_vmstore_change)
callback(store, position, removed, added)
self.clan_store.register_on_change(on_clanstore_change)
@property
def clan_store(self) -> GKVStore[str, VMStore]:
return self._clan_store
def create_vm_task(self, vm: HistoryEntry) -> bool:
self.push_history_entry(vm)
return GLib.SOURCE_REMOVE
def push_history_entry(self, entry: HistoryEntry) -> None:
# TODO: We shouldn't do this here but in the list view
if entry.flake.icon is None:
icon: Path = assets.loc / "placeholder.jpeg"
else:
icon = Path(entry.flake.icon)
def log_details(gfile: Gio.File) -> None:
self.log_details(vm, gfile)
vm = VMObject(icon=icon, data=entry, build_log_cb=log_details)
self.push(vm)
def log_details(self, vm: VMObject, gfile: Gio.File) -> None:
views = ViewStack.use().view
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
def file_read_callback(
source_object: Gio.File, result: Gio.AsyncResult, _user_data: Any
) -> None:
try:
# Finish the asynchronous read operation
res = source_object.load_contents_finish(result)
_success, contents, _etag_out = res
# Convert the byte array to a string and print it
logs_view.set_message(contents.decode("utf-8"))
except Exception as e:
print(f"Error reading file: {e}")
# only one vm can output logs at a time
if vm == self._logging_vm:
gfile.load_contents_async(None, file_read_callback, None)
# we cannot check this type, python is not smart enough
def push(self, vm: VMObject) -> None:
url = str(vm.data.flake.flake_url)
# Only write to the store if the Clan is not already in it
# Every write to the KVStore rerenders bound widgets to the clan_store
if url not in self.clan_store:
log.debug(f"Creating new VMStore for {url}")
vm_store = VMStore()
vm_store.append(vm)
self.clan_store[url] = vm_store
else:
vm_store = self.clan_store[url]
machine = vm.data.flake.flake_attr
old_vm = vm_store.get(machine)
if old_vm:
log.info(
f"VM {vm.data.flake.flake_attr} already exists in store. Updating data field."
)
old_vm.update(vm.data)
else:
log.debug(f"Appending VM {vm.data.flake.flake_attr} to store")
vm_store.append(vm)
def remove(self, vm: VMObject) -> None:
del self.clan_store[str(vm.data.flake.flake_url)][vm.data.flake.flake_attr]
def get_vm(self, uri: ClanURI) -> None | VMObject:
vm_store = self.clan_store.get(str(uri.flake_id))
if vm_store is None:
return None
machine = vm_store.get(uri.machine.name, None)
return machine
def get_running_vms(self) -> list[VMObject]:
return [
vm
for clan in self.clan_store.values()
for vm in clan.values()
if vm.is_running()
]
def kill_all(self) -> None:
for vm in self.get_running_vms():
vm.kill()

View File

@ -1,61 +0,0 @@
import os
from collections.abc import Callable
from functools import partial
from typing import Any, Literal, TypeVar
import gi
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GObject, Gtk
# Define a TypeVar that is bound to GObject.Object
ListItem = TypeVar("ListItem", bound=GObject.Object)
def create_details_list(
model: Gio.ListStore, render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget]
) -> Gtk.ListBox:
boxed_list = Gtk.ListBox()
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
boxed_list.add_css_class("boxed-list")
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
return boxed_list
class PreferencesValue(GObject.Object):
variant: Literal["CPU", "MEMORY"]
editable: bool
data: Any
def __init__(
self, variant: Literal["CPU", "MEMORY"], editable: bool, data: Any
) -> None:
super().__init__()
self.variant = variant
self.editable = editable
self.data = data
class Details(Gtk.Box):
def __init__(self) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
preferences_store = Gio.ListStore.new(PreferencesValue)
preferences_store.append(PreferencesValue("CPU", True, 1))
self.details_list = create_details_list(
model=preferences_store, render_row=self.render_entry_row
)
self.append(self.details_list)
def render_entry_row(
self, boxed_list: Gtk.ListBox, item: PreferencesValue
) -> Gtk.Widget:
cores: int | None = os.cpu_count()
fcores = float(cores) if cores else 1.0
row = Adw.SpinRow.new_with_range(0, fcores, 1)
row.set_value(item.data)
return row

View File

@ -1,356 +0,0 @@
import base64
import logging
from collections.abc import Callable
from functools import partial
from typing import Any, TypeVar
import gi
from clan_cli.clan_uri import ClanURI
from clan_app.components.gkvstore import GKVStore
from clan_app.components.interfaces import ClanConfig
from clan_app.components.list_splash import EmptySplash
from clan_app.components.vmobj import VMObject
from clan_app.singletons.toast import (
LogToast,
SuccessToast,
ToastOverlay,
WarningToast,
)
from clan_app.singletons.use_join import JoinList, JoinValue
from clan_app.singletons.use_views import ViewStack
from clan_app.singletons.use_vms import ClanStore, VMStore
from clan_app.views.logs import Logs
gi.require_version("Adw", "1")
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
log = logging.getLogger(__name__)
ListItem = TypeVar("ListItem", bound=GObject.Object)
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
def create_boxed_list(
model: CustomStore,
render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget],
) -> Gtk.ListBox:
boxed_list = Gtk.ListBox()
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
boxed_list.add_css_class("boxed-list")
boxed_list.add_css_class("no-shadow")
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
return boxed_list
class ClanList(Gtk.Box):
"""
The ClanList
Is the composition of
the ClanListToolbar
the clanListView
# ------------------------ #
# - Tools <Start> <Stop> < Edit> #
# ------------------------ #
# - List Items
# - <...>
# ------------------------#
"""
def __init__(self, config: ClanConfig) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
app.connect("join_request", self.on_join_request)
self.log_label: Gtk.Label = Gtk.Label()
# Add join list
self.join_boxed_list = create_boxed_list(
model=JoinList.use().list_store, render_row=self.render_join_row
)
self.join_boxed_list.add_css_class("join-list")
self.append(self.join_boxed_list)
clan_store = ClanStore.use()
clan_store.connect("is_ready", self.display_splash)
self.group_list = create_boxed_list(
model=clan_store.clan_store, render_row=self.render_group_row
)
self.group_list.add_css_class("group-list")
self.append(self.group_list)
self.splash = EmptySplash(on_join=lambda x: self.on_join_request(x, x))
def display_splash(self, source: GKVStore) -> None:
print("Displaying splash")
if (
ClanStore.use().clan_store.get_n_items() == 0
and JoinList.use().list_store.get_n_items() == 0
):
self.append(self.splash)
def render_group_row(
self, boxed_list: Gtk.ListBox, vm_store: VMStore
) -> Gtk.Widget:
self.remove(self.splash)
vm = vm_store.first()
log.debug("Rendering group row for %s", vm.data.flake.flake_url)
grp = Adw.PreferencesGroup()
grp.set_title(vm.data.flake.clan_name)
grp.set_description(vm.data.flake.flake_url)
add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s"))
add_action.connect("activate", self.on_add)
app = Gio.Application.get_default()
assert app is not None
app.add_action(add_action)
# menu_model = Gio.Menu()
# TODO: Make this lazy, blocks UI startup for too long
# for vm in machines.list.list_machines(flake_url=vm.data.flake.flake_url):
# if vm not in vm_store:
# menu_model.append(vm, f"app.add::{vm}")
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
add_button = Gtk.Button()
add_button_content = Adw.ButtonContent.new()
add_button_content.set_label("Add machine")
add_button_content.set_icon_name("list-add-symbolic")
add_button.add_css_class("flat")
add_button.set_child(add_button_content)
# add_button.set_has_frame(False)
# add_button.set_menu_model(menu_model)
# add_button.set_label("Add machine")
box.append(add_button)
grp.set_header_suffix(box)
vm_list = create_boxed_list(model=vm_store, render_row=self.render_vm_row)
grp.add(vm_list)
return grp
def on_add(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Adding new machine", target)
def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VMObject) -> Gtk.Widget:
# Remove no-shadow class if attached
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
flake = vm.data.flake
row = Adw.ActionRow()
# ====== Display Avatar ======
avatar = Adw.Avatar()
machine_icon = flake.vm.machine_icon
# If there is a machine icon, display it else
# display the clan icon
if machine_icon:
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(machine_icon)))
elif flake.icon:
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(flake.icon)))
else:
avatar.set_text(flake.clan_name + " " + flake.flake_attr)
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
# ====== Display Name And Url =====
row.set_title(flake.flake_attr)
row.set_title_lines(1)
row.set_title_selectable(True)
# If there is a machine description, display it else
# display the clan name
if flake.vm.machine_description:
row.set_subtitle(flake.vm.machine_description)
else:
row.set_subtitle(flake.clan_name)
row.set_subtitle_lines(1)
# ==== Display build progress bar ====
build_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
build_box.set_valign(Gtk.Align.CENTER)
build_box.append(vm.progress_bar)
build_box.set_homogeneous(False)
row.add_suffix(build_box) # This allows children to have different sizes
# ==== Action buttons ====
button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
button_box.set_valign(Gtk.Align.CENTER)
## Drop down menu
open_action = Gio.SimpleAction.new("edit", GLib.VariantType.new("s"))
open_action.connect("activate", self.on_edit)
action_id = base64.b64encode(vm.get_id().encode("utf-8")).decode("utf-8")
build_logs_action = Gio.SimpleAction.new(
f"logs.{action_id}", GLib.VariantType.new("s")
)
build_logs_action.connect("activate", self.on_show_build_logs)
build_logs_action.set_enabled(False)
app = Gio.Application.get_default()
assert app is not None
app.add_action(open_action)
app.add_action(build_logs_action)
# set a callback function for conditionally enabling the build_logs action
def on_vm_build_notify(
vm: VMObject, is_building: bool, is_running: bool
) -> None:
build_logs_action.set_enabled(is_building or is_running)
app.add_action(build_logs_action)
if is_building:
ToastOverlay.use().add_toast_unique(
LogToast(
"""Build process running ...""",
on_button_click=lambda: self.show_vm_build_logs(vm.get_id()),
).toast,
f"info.build.running.{vm}",
)
vm.connect("vm_build_notify", on_vm_build_notify)
menu_model = Gio.Menu()
menu_model.append("Edit", f"app.edit::{vm.get_id()}")
menu_model.append("Show Logs", f"app.logs.{action_id}::{vm.get_id()}")
pref_button = Gtk.MenuButton()
pref_button.set_icon_name("open-menu-symbolic")
pref_button.set_menu_model(menu_model)
button_box.append(pref_button)
## VM switch button
switch_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
switch_box.set_valign(Gtk.Align.CENTER)
switch_box.append(vm.switch)
button_box.append(switch_box)
row.add_suffix(button_box)
return row
def on_edit(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Editing settings for machine", target)
def on_show_build_logs(self, _: Any, parameter: Any) -> None:
target = parameter.get_string()
self.show_vm_build_logs(target)
def show_vm_build_logs(self, target: str) -> None:
vm = ClanStore.use().set_logging_vm(target)
if vm is None:
raise ValueError(f"VM {target} not found")
views = ViewStack.use().view
# Reset the logs view
logs: Logs = views.get_child_by_name("logs") # type: ignore
if logs is None:
raise ValueError("Logs view not found")
name = vm.machine.name if vm.machine else "Unknown"
logs.set_title(f"""📄<span weight="normal"> {name}</span>""")
# initial message. Streaming happens automatically when the file is changed by the build process
with open(vm.build_process.out_file) as f:
logs.set_message(f.read())
views.set_visible_child_name("logs")
def render_join_row(
self, boxed_list: Gtk.ListBox, join_val: JoinValue
) -> Gtk.Widget:
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
log.debug("Rendering join row for %s", join_val.url)
row = Adw.ActionRow()
row.set_title(join_val.url.machine.name)
row.set_subtitle(str(join_val.url))
row.add_css_class("trust")
vm = ClanStore.use().get_vm(join_val.url)
# Can't do this here because clan store is empty at this point
if vm is not None:
sub = row.get_subtitle()
assert sub is not None
ToastOverlay.use().add_toast_unique(
WarningToast(
f"""<span weight="regular">{join_val.url.machine.name!s}</span> Already exists. Joining again will update it"""
).toast,
"warning.duplicate.join",
)
row.set_subtitle(
sub + "\nClan already exists. Joining again will update it"
)
avatar = Adw.Avatar()
avatar.set_text(str(join_val.url.machine.name))
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
cancel_button = Gtk.Button(label="Cancel")
cancel_button.add_css_class("error")
cancel_button.connect("clicked", partial(self.on_discard_clicked, join_val))
self.cancel_button = cancel_button
trust_button = Gtk.Button(label="Join")
trust_button.add_css_class("success")
trust_button.connect("clicked", partial(self.on_trust_clicked, join_val))
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
box.append(cancel_button)
box.append(trust_button)
row.add_suffix(box)
return row
def on_join_request(self, source: Any, url: str) -> None:
log.debug("Join request: %s", url)
clan_uri = ClanURI(url)
JoinList.use().push(clan_uri, self.on_after_join)
def on_after_join(self, source: JoinValue) -> None:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"Updated {source.url.machine.name}").toast,
"success.join",
)
# If the join request list is empty disable the shadow artefact
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")
def on_trust_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
source.set_sensitive(False)
self.cancel_button.set_sensitive(False)
value.join()
def on_discard_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
JoinList.use().discard(value)
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")

View File

@ -1,65 +0,0 @@
import logging
import gi
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, Gtk
from clan_app.singletons.use_views import ViewStack
log = logging.getLogger(__name__)
class Logs(Gtk.Box):
"""
Simple log view
This includes a banner and a text view and a button to close the log and navigate back to the overview
"""
def __init__(self) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
self.banner = Adw.Banner.new("")
self.banner.set_use_markup(True)
self.banner.set_revealed(True)
self.banner.set_button_label("Close")
self.banner.connect(
"button-clicked",
lambda _: ViewStack.use().view.set_visible_child_name("list"),
)
self.text_view = Gtk.TextView()
self.text_view.set_editable(False)
self.text_view.set_wrap_mode(Gtk.WrapMode.WORD)
self.text_view.add_css_class("log-view")
self.append(self.banner)
self.append(self.text_view)
def set_title(self, title: str) -> None:
self.banner.set_title(title)
def set_message(self, message: str) -> None:
"""
Set the log message. This will delete any previous message
"""
buffer = self.text_view.get_buffer()
buffer.set_text(message)
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)
def append_message(self, message: str) -> None:
"""
Append to the end of a potentially existent log message
"""
buffer = self.text_view.get_buffer()
end_iter = buffer.get_end_iter()
buffer.insert(end_iter, message) # type: ignore
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)

View File

@ -1,5 +1,6 @@
import dataclasses
import json
import gi
import logging
import threading
from collections.abc import Callable
@ -9,12 +10,10 @@ from threading import Lock
from types import UnionType
from typing import Any, get_args
import gi
from clan_cli.api import API
from clan_cli.api.directory import FileRequest
from clan_app.api.file import open_file
gi.require_version("WebKit", "6.0")
from gi.repository import Gio, GLib, Gtk, WebKit
log = logging.getLogger(__name__)
@ -48,100 +47,6 @@ def dataclass_to_dict(obj: Any) -> Any:
return obj
# Implement the abstract open_file function
def open_file(file_request: FileRequest) -> str | None:
# Function to handle the response and stop the loop
selected_path = None
def on_file_select(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.open_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected file or directory: {e}")
finally:
main_loop.quit()
def on_folder_select(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.select_folder_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected directory: {e}")
finally:
main_loop.quit()
def on_save_finish(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.save_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected file: {e}")
finally:
main_loop.quit()
dialog = Gtk.FileDialog()
if file_request.title:
dialog.set_title(file_request.title)
if file_request.filters:
filters = Gio.ListStore.new(Gtk.FileFilter)
file_filters = Gtk.FileFilter()
if file_request.filters.title:
file_filters.set_name(file_request.filters.title)
# Create and configure a filter for image files
if file_request.filters.mime_types:
for mime in file_request.filters.mime_types:
file_filters.add_mime_type(mime)
filters.append(file_filters)
if file_request.filters.patterns:
for pattern in file_request.filters.patterns:
file_filters.add_pattern(pattern)
if file_request.filters.suffixes:
for suffix in file_request.filters.suffixes:
file_filters.add_suffix(suffix)
filters.append(file_filters)
dialog.set_filters(filters)
main_loop = GLib.MainLoop()
# if select_folder
if file_request.mode == "select_folder":
dialog.select_folder(
callback=lambda dialog, task: on_folder_select(dialog, task, main_loop),
)
elif file_request.mode == "open_file":
dialog.open(
callback=lambda dialog, task: on_file_select(dialog, task, main_loop)
)
elif file_request.mode == "save":
dialog.save(
callback=lambda dialog, task: on_save_finish(dialog, task, main_loop)
)
# Wait for the user to select a file or directory
main_loop.run() # type: ignore
return selected_path
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
@ -294,7 +199,7 @@ class WebView:
method_name: str,
) -> None:
with self.mutex_lock:
log.debug("Executing... ", method_name)
log.debug(f"Executing... {method_name}")
log.debug(f"{data}")
if data is None:
result = handler_fn()

View File

@ -8,17 +8,14 @@ from clan_cli.history.list import list_history
from clan_app.components.interfaces import ClanConfig
from clan_app.singletons.toast import ToastOverlay
from clan_app.singletons.use_views import ViewStack
from clan_app.singletons.use_vms import ClanStore
from clan_app.views.details import Details
from clan_app.views.list import ClanList
from clan_app.views.logs import Logs
from clan_app.views.webview import WebView, open_file
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GLib
from clan_app.components.trayicon import TrayIcon
log = logging.getLogger(__name__)
@ -41,15 +38,8 @@ class MainWindow(Adw.ApplicationWindow):
app = Gio.Application.get_default()
assert app is not None
self.tray_icon: TrayIcon = TrayIcon(app)
# Initialize all ClanStore
threading.Thread(target=self._populate_vms).start()
stack_view = ViewStack.use().view
stack_view.add_named(ClanList(config), "list")
stack_view.add_named(Details(), "details")
stack_view.add_named(Logs(), "logs")
# Override platform specific functions
API.register(open_file)
@ -63,23 +53,6 @@ class MainWindow(Adw.ApplicationWindow):
self.connect("destroy", self.on_destroy)
def _set_clan_store_ready(self) -> bool:
ClanStore.use().emit("is_ready")
return GLib.SOURCE_REMOVE
def _populate_vms(self) -> None:
# Execute `clan flakes add <path>` to democlan for this to work
# TODO: Make list_history a generator function
for entry in list_history():
GLib.idle_add(ClanStore.use().create_vm_task, entry)
GLib.idle_add(self._set_clan_store_ready)
def kill_vms(self) -> None:
log.debug("Killing all VMs")
ClanStore.use().kill_all()
def on_destroy(self, source: "Adw.ApplicationWindow") -> None:
log.info("====Destroying Adw.ApplicationWindow===")
ClanStore.use().kill_all()
self.tray_icon.destroy()
log.debug("====Destroying Adw.ApplicationWindow===")