clan-app: Add Webview to python async backend

This commit is contained in:
Luis Hebendanz 2024-07-15 15:13:08 +02:00
parent bb9058f5ef
commit 25fea331d0
8 changed files with 352 additions and 262 deletions

View File

@ -0,0 +1,122 @@
import importlib
import inspect
import logging
import pkgutil
from collections.abc import Callable
from types import ModuleType
from typing import Any, ClassVar, Generic, ParamSpec, TypeVar
from gi.repository import GLib, GObject
log = logging.getLogger(__name__)
class GResult(GObject.Object):
result: Any
op_key: str
method_name: str
def __init__(self, result: Any, method_name: str, op_key: str) -> None:
super().__init__()
self.op_key = op_key
self.result = result
self.method_name = method_name
B = TypeVar('B')
P = ParamSpec('P')
class ImplFunc(GObject.Object, Generic[P, B]):
op_key: str | None = None
__gsignals__: ClassVar = {
"returns": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object]),
}
def returns(self, result: B) -> None:
self.emit("returns", GResult(result, self.__class__.__name__, self.op_key))
def await_result(self, fn: Callable[[GObject.Object, B], None]) -> None:
self.connect("returns", fn)
def async_run(self, *args: P.args, **kwargs: P.kwargs) -> bool:
raise NotImplementedError("Method 'async_run' must be implemented")
def _async_run(self, data: Any, op_key: str) -> bool:
self.op_key = op_key
result = GLib.SOURCE_REMOVE
try:
result = self.async_run(**data)
except Exception as e:
log.exception(e)
# TODO: send error to js
finally:
return result
def is_gobject_subclass(obj: object) -> bool:
return inspect.isclass(obj) and issubclass(obj, ImplFunc) and obj is not ImplFunc
def check_module_for_gobject_classes(module: ModuleType, found_classes: list[type[GObject.Object]] | None = None) -> list[type[GObject.Object]]:
if found_classes is None:
found_classes = []
for name, obj in inspect.getmembers(module):
if is_gobject_subclass(obj):
found_classes.append(obj)
if hasattr(module, '__path__'): # Check if the module has submodules
for _, submodule_name, _ in pkgutil.iter_modules(module.__path__, module.__name__ + '.'):
submodule = importlib.import_module(submodule_name)
check_module_for_gobject_classes(submodule, found_classes)
return found_classes
class ImplApi:
def __init__(self) -> None:
self._obj_registry: dict[str, type[ImplFunc]] = {}
def register_all(self, module: ModuleType) -> None:
objects = check_module_for_gobject_classes(module)
for obj in objects:
self.register(obj)
def register(self, obj: type[ImplFunc]) -> None:
fn_name = obj.__name__
if fn_name in self._obj_registry:
raise ValueError(f"Function '{fn_name}' already registered")
self._obj_registry[fn_name] = obj
def validate(self,
abstr_methods: dict[str, dict[str, Any]]
) -> None:
impl_fns = self._obj_registry
# iterate over the methods and check if all are implemented
for abstr_name, abstr_annotations in abstr_methods.items():
if abstr_name not in impl_fns:
raise NotImplementedError(
f"Abstract method '{abstr_name}' is not implemented"
)
else:
# check if the signature of the abstract method matches the implementation
# abstract signature
values = list(abstr_annotations.values())
expected_signature = (tuple(values[:-1]), values[-1:][0])
# implementation signature
obj = dict(impl_fns[abstr_name].__dict__)
obj_type = obj["__orig_bases__"][0]
got_signature = obj_type.__args__
if expected_signature != got_signature:
log.error(f"Expected signature: {expected_signature}")
log.error(f"Actual signature: {got_signature}")
raise ValueError(
f"Abstract method '{abstr_name}' has different signature than the implementation"
)
def get_obj(self, name: str) -> type[ImplFunc] | None:
return self._obj_registry.get(name)

View File

@ -1,103 +1,88 @@
# ruff: noqa: N801
import gi
gi.require_version("WebKit", "6.0")
from gi.repository import Gio, GLib, Gtk, WebKit
gi.require_version("Gtk", "4.0")
import logging
from clan_cli.api.directory import FileRequest
from gi.repository import Gio, GLib, Gtk
from clan_app.api import ImplFunc
log = logging.getLogger(__name__)
# Implement the abstract open_file function
def open_file(file_request: FileRequest) -> str | None:
# Function to handle the response and stop the loop
selected_path = None
# This implements the abstract function open_file with one argument, file_request,
# which is a FileRequest object and returns a string or None.
class open_file(ImplFunc[[FileRequest], str | None]):
def __init__(self) -> None:
super().__init__()
def on_file_select(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.open_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected file or directory: {e}")
finally:
main_loop.quit()
def async_run(self, file_request: FileRequest) -> bool:
def on_file_select(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
try:
gfile = file_dialog.open_finish(task)
if gfile:
selected_path = gfile.get_path()
self.returns(selected_path)
except Exception as e:
print(f"Error getting selected file or directory: {e}")
def on_folder_select(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.select_folder_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected directory: {e}")
finally:
main_loop.quit()
def on_folder_select(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
try:
gfile = file_dialog.select_folder_finish(task)
if gfile:
selected_path = gfile.get_path()
self.returns(selected_path)
except Exception as e:
print(f"Error getting selected directory: {e}")
def on_save_finish(
file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop
) -> None:
try:
gfile = file_dialog.save_finish(task)
if gfile:
nonlocal selected_path
selected_path = gfile.get_path()
except Exception as e:
print(f"Error getting selected file: {e}")
finally:
main_loop.quit()
def on_save_finish(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
try:
gfile = file_dialog.save_finish(task)
if gfile:
selected_path = gfile.get_path()
self.returns(selected_path)
except Exception as e:
print(f"Error getting selected file: {e}")
dialog = Gtk.FileDialog()
dialog = Gtk.FileDialog()
if file_request.title:
dialog.set_title(file_request.title)
if file_request.title:
dialog.set_title(file_request.title)
if file_request.filters:
filters = Gio.ListStore.new(Gtk.FileFilter)
file_filters = Gtk.FileFilter()
if file_request.filters:
filters = Gio.ListStore.new(Gtk.FileFilter)
file_filters = Gtk.FileFilter()
if file_request.filters.title:
file_filters.set_name(file_request.filters.title)
if file_request.filters.title:
file_filters.set_name(file_request.filters.title)
# Create and configure a filter for image files
if file_request.filters.mime_types:
for mime in file_request.filters.mime_types:
file_filters.add_mime_type(mime)
filters.append(file_filters)
if file_request.filters.mime_types:
for mime in file_request.filters.mime_types:
file_filters.add_mime_type(mime)
filters.append(file_filters)
if file_request.filters.patterns:
for pattern in file_request.filters.patterns:
file_filters.add_pattern(pattern)
if file_request.filters.patterns:
for pattern in file_request.filters.patterns:
file_filters.add_pattern(pattern)
if file_request.filters.suffixes:
for suffix in file_request.filters.suffixes:
file_filters.add_suffix(suffix)
if file_request.filters.suffixes:
for suffix in file_request.filters.suffixes:
file_filters.add_suffix(suffix)
filters.append(file_filters)
dialog.set_filters(filters)
filters.append(file_filters)
dialog.set_filters(filters)
main_loop = GLib.MainLoop()
# if select_folder
if file_request.mode == "select_folder":
dialog.select_folder(callback=on_folder_select)
elif file_request.mode == "open_file":
dialog.open(callback=on_file_select)
elif file_request.mode == "save":
dialog.save(callback=on_save_finish)
# if select_folder
if file_request.mode == "select_folder":
dialog.select_folder(
callback=lambda dialog, task: on_folder_select(dialog, task, main_loop),
)
elif file_request.mode == "open_file":
dialog.open(
callback=lambda dialog, task: on_file_select(dialog, task, main_loop)
)
elif file_request.mode == "save":
dialog.save(
callback=lambda dialog, task: on_save_finish(dialog, task, main_loop)
)
return GLib.SOURCE_REMOVE
# Wait for the user to select a file or directory
main_loop.run() # type: ignore
return selected_path

View File

@ -13,7 +13,7 @@ gi.require_version("Adw", "1")
from pathlib import Path
from clan_cli.custom_logger import setup_logging
from gi.repository import Adw, Gdk, Gio, Gtk, GLib, GObject
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
from clan_app.components.interfaces import ClanConfig
@ -69,7 +69,7 @@ class MainApplication(Adw.Application):
log.debug("Shutting down Adw.Application")
if self.get_windows() == []:
log.warning("No windows to destroy")
log.debug("No windows to destroy")
if self.window:
# TODO: Doesn't seem to raise the destroy signal. Need to investigate
# self.get_windows() returns an empty list. Desync between window and application?

View File

@ -0,0 +1,93 @@
import dataclasses
import logging
from dataclasses import fields, is_dataclass
from pathlib import Path
from types import UnionType
from typing import Any, get_args
import gi
gi.require_version("WebKit", "6.0")
log = logging.getLogger(__name__)
def sanitize_string(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if dataclasses.is_dataclass(obj):
return {
sanitize_string(k): dataclass_to_dict(v)
for k, v in dataclasses.asdict(obj).items()
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return str(obj)
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if not data:
return None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
field_value = data.get(field.name)
field_type = get_inner_type(field.type)
if field_value is not None:
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Field has a default value. We cannot set the value to None
if field_value is not None:
field_values[field.name] = field_value
else:
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e}")
return None

View File

@ -9,8 +9,6 @@ gi.require_version("Adw", "1")
from gi.repository import Adw
from clan_app.singletons.use_views import ViewStack
log = logging.getLogger(__name__)
@ -47,7 +45,6 @@ class ToastOverlay:
toast.connect("dismissed", lambda toast: self.active_toasts.remove(key))
class WarningToast:
toast: Adw.Toast

View File

@ -1,108 +1,25 @@
import dataclasses
import json
import gi
import logging
import threading
from collections.abc import Callable
from dataclasses import fields, is_dataclass
from pathlib import Path
from threading import Lock
from types import UnionType
from typing import Any, get_args
from typing import Any
import gi
from clan_cli.api import API
from clan_app.api.file import open_file
import clan_app
from clan_app.api import GResult, ImplApi, ImplFunc
from clan_app.components.serializer import dataclass_to_dict, from_dict
gi.require_version("WebKit", "6.0")
from gi.repository import Gio, GLib, Gtk, WebKit
from gi.repository import GLib, GObject, WebKit
log = logging.getLogger(__name__)
def sanitize_string(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if dataclasses.is_dataclass(obj):
return {
sanitize_string(k): dataclass_to_dict(v)
for k, v in dataclasses.asdict(obj).items()
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return str(obj)
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if not data:
return None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
field_value = data.get(field.name)
field_type = get_inner_type(field.type)
if field_value is not None:
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Field has a default value. We cannot set the value to None
if field_value is not None:
field_values[field.name] = field_value
else:
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e}")
return None
class WebView:
def __init__(self, content_uri: str, methods: dict[str, Callable]) -> None:
self.method_registry: dict[str, Callable] = methods
class WebExecutor(GObject.Object):
def __init__(self, content_uri: str, abstr_methods: dict[str, Callable]) -> None:
super().__init__()
self.webview = WebKit.WebView()
@ -122,9 +39,10 @@ class WebView:
self.webview.load_uri(content_uri)
self.content_uri = content_uri
# global mutex lock to ensure functions run sequentially
self.mutex_lock = Lock()
self.queue_size = 0
self.api = ImplApi()
self.api.register_all(clan_app.api)
#self.api.validate(abstr_methods)
def on_decide_policy(
self,
@ -155,85 +73,59 @@ class WebView:
def on_message_received(
self, user_content_manager: WebKit.UserContentManager, message: Any
) -> None:
payload = json.loads(message.to_json(0))
json_msg = message.to_json(4)
log.debug(f"Webview Request: {json_msg}")
payload = json.loads(json_msg)
method_name = payload["method"]
handler_fn = self.method_registry[method_name]
log.debug(f"Received message: {payload}")
log.debug(f"Queue size: {self.queue_size} (Wait)")
# Get the function gobject from the api
function_obj = self.api.get_obj(method_name)
if function_obj is None:
log.error(f"Method '{method_name}' not found in api")
return
def threaded_wrapper() -> bool:
"""
Ensures only one function is executed at a time
# Create an instance of the function gobject
fn_instance = function_obj()
fn_instance.await_result(self.on_result)
Wait until there is no other function acquiring the global lock.
# Extract the data from the payload
data = payload.get("data")
if data is None:
log.error(f"Method '{method_name}' has no data field. Skipping execution.")
return
Starts a thread with the potentially long running API function within.
"""
if not self.mutex_lock.locked():
thread = threading.Thread(
target=self.threaded_handler,
args=(
handler_fn,
payload.get("data"),
method_name,
),
)
thread.start()
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
# Initialize dataclasses from the payload
reconciled_arguments = {}
op_key = data.pop("op_key", None)
for k, v in data.items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_class = API.get_method_argtype(method_name, k)
if dataclasses.is_dataclass(arg_class):
reconciled_arguments[k] = from_dict(arg_class, v)
else:
reconciled_arguments[k] = v
GLib.idle_add(
threaded_wrapper,
fn_instance._async_run,
reconciled_arguments,
op_key,
)
self.queue_size += 1
def threaded_handler(
self,
handler_fn: Callable[
...,
Any,
],
data: dict[str, Any] | None,
method_name: str,
) -> None:
with self.mutex_lock:
log.debug(f"Executing... {method_name}")
log.debug(f"{data}")
if data is None:
result = handler_fn()
else:
reconciled_arguments = {}
op_key = data.pop("op_key", None)
for k, v in data.items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_class = API.get_method_argtype(method_name, k)
if dataclasses.is_dataclass(arg_class):
reconciled_arguments[k] = from_dict(arg_class, v)
else:
reconciled_arguments[k] = v
r = handler_fn(**reconciled_arguments)
# Parse the result to a serializable dictionary
# Echo back the "op_key" to the js api
result = dataclass_to_dict(r)
result["op_key"] = op_key
def on_result(self, source: ImplFunc, data: GResult) -> None:
result = dict()
result["result"] = dataclass_to_dict(data.result)
result["op_key"] = data.op_key
serialized = json.dumps(result)
# Use idle_add to queue the response call to js on the main GTK thread
GLib.idle_add(self.return_data_to_js, method_name, serialized)
self.queue_size -= 1
log.debug(f"Done: Remaining queue size: {self.queue_size}")
serialized = json.dumps(result, indent=4)
log.debug(f"Result: {serialized}")
# Use idle_add to queue the response call to js on the main GTK thread
self.return_data_to_js(data.method_name, serialized)
def return_data_to_js(self, method_name: str, serialized: str) -> bool:
# This function must be run on the main GTK thread to interact with the webview
# result = method_fn(data) # takes very long
# serialized = result
self.webview.evaluate_javascript(
f"""
window.clan.{method_name}(`{serialized}`);

View File

@ -1,21 +1,16 @@
import logging
import threading
import gi
from clan_cli.api import API
from clan_cli.history.list import list_history
from clan_app.components.interfaces import ClanConfig
from clan_app.singletons.toast import ToastOverlay
from clan_app.singletons.use_views import ViewStack
from clan_app.views.webview import WebView, open_file
from clan_app.views.webview import WebExecutor
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GLib
from gi.repository import Adw, Gio
log = logging.getLogger(__name__)
@ -41,10 +36,10 @@ class MainWindow(Adw.ApplicationWindow):
stack_view = ViewStack.use().view
# Override platform specific functions
API.register(open_file)
webview = WebView(methods=API._registry, content_uri=config.content_uri)
webview = WebExecutor(
abstr_methods=API._orig_annotations, content_uri=config.content_uri
)
stack_view.add_named(webview.get_webview(), "webview")
stack_view.set_visible_child_name(config.initial_view)
@ -54,5 +49,4 @@ class MainWindow(Adw.ApplicationWindow):
self.connect("destroy", self.on_destroy)
def on_destroy(self, source: "Adw.ApplicationWindow") -> None:
log.debug("====Destroying Adw.ApplicationWindow===")
log.debug("Destroying Adw.ApplicationWindow")

View File

@ -52,7 +52,7 @@ def update_wrapper_signature(wrapper: Callable, wrapped: Callable) -> None:
class _MethodRegistry:
def __init__(self) -> None:
self._orig: dict[str, Callable[[Any], Any]] = {}
self._orig_annotations: dict[str, Callable[[Any], Any]] = {}
self._registry: dict[str, Callable[[Any], Any]] = {}
def register_abstract(self, fn: Callable[..., T]) -> Callable[..., T]:
@ -84,7 +84,13 @@ API.register(open_file)
return fn
def register(self, fn: Callable[..., T]) -> Callable[..., T]:
self._orig[fn.__name__] = fn
if fn.__name__ in self._registry:
raise ValueError(f"Function {fn.__name__} already registered")
if fn.__name__ in self._orig_annotations:
raise ValueError(f"Function {fn.__name__} already registered")
# make copy of original function
self._orig_annotations[fn.__name__] = fn.__annotations__.copy()
@wraps(fn)
def wrapper(
@ -118,6 +124,7 @@ API.register(open_file)
update_wrapper_signature(wrapper, fn)
self._registry[fn.__name__] = wrapper
return fn
def to_json_schema(self) -> dict[str, Any]: