forked from clan/clan-core
Merge pull request 'hsjobeki-main' (#1562) from hsjobeki-main into main
This commit is contained in:
commit
836754d7ad
@ -1,10 +1,12 @@
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Generic, Literal, TypeVar
|
||||
from functools import wraps
|
||||
from typing import Annotated, Any, Generic, Literal, TypeVar, get_type_hints
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
ResponseDataType = TypeVar("ResponseDataType")
|
||||
|
||||
|
||||
@ -16,22 +18,55 @@ class ApiError:
|
||||
|
||||
|
||||
@dataclass
|
||||
class ApiResponse(Generic[ResponseDataType]):
|
||||
status: Literal["success", "error"]
|
||||
errors: list[ApiError] | None
|
||||
data: ResponseDataType | None
|
||||
class SuccessDataClass(Generic[ResponseDataType]):
|
||||
status: Annotated[Literal["success"], "The status of the response."]
|
||||
data: ResponseDataType
|
||||
|
||||
|
||||
@dataclass
|
||||
class ErrorDataClass:
|
||||
status: Literal["error"]
|
||||
errors: list[ApiError]
|
||||
|
||||
|
||||
ApiResponse = SuccessDataClass[ResponseDataType] | ErrorDataClass
|
||||
|
||||
|
||||
class _MethodRegistry:
|
||||
def __init__(self) -> None:
|
||||
self._orig: dict[str, Callable[[Any], Any]] = {}
|
||||
self._registry: dict[str, Callable[[Any], Any]] = {}
|
||||
|
||||
def register(self, fn: Callable[..., T]) -> Callable[..., T]:
|
||||
self._registry[fn.__name__] = fn
|
||||
self._orig[fn.__name__] = fn
|
||||
|
||||
@wraps(fn)
|
||||
def wrapper(*args: Any, **kwargs: Any) -> ApiResponse[T]:
|
||||
try:
|
||||
data: T = fn(*args, **kwargs)
|
||||
return SuccessDataClass(status="success", data=data)
|
||||
except ClanError as e:
|
||||
return ErrorDataClass(
|
||||
status="error",
|
||||
errors=[
|
||||
ApiError(
|
||||
message=e.msg,
|
||||
description=e.description,
|
||||
location=[fn.__name__, e.location],
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
# @wraps preserves all metadata of fn
|
||||
# we need to update the annotation, because our wrapper changes the return type
|
||||
# This overrides the new return type annotation with the generic typeVar filled in
|
||||
orig_return_type = get_type_hints(fn).get("return")
|
||||
wrapper.__annotations__["return"] = ApiResponse[orig_return_type] # type: ignore
|
||||
|
||||
self._registry[fn.__name__] = wrapper
|
||||
return fn
|
||||
|
||||
def to_json_schema(self) -> dict[str, Any]:
|
||||
# Import only when needed
|
||||
from typing import get_type_hints
|
||||
|
||||
from clan_cli.api.util import type_to_dict
|
||||
|
@ -1,20 +1,92 @@
|
||||
import copy
|
||||
import dataclasses
|
||||
import pathlib
|
||||
from types import NoneType, UnionType
|
||||
from typing import Any, Union
|
||||
from typing import (
|
||||
Annotated,
|
||||
Any,
|
||||
Literal,
|
||||
TypeVar,
|
||||
Union,
|
||||
get_args,
|
||||
get_origin,
|
||||
)
|
||||
|
||||
|
||||
def type_to_dict(t: Any, scope: str = "") -> dict:
|
||||
class JSchemaTypeError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
# Inspect the fields of the parameterized type
|
||||
def inspect_dataclass_fields(t: type) -> dict[TypeVar, type]:
|
||||
"""
|
||||
Returns a map of type variables to actual types for a parameterized type.
|
||||
"""
|
||||
origin = get_origin(t)
|
||||
type_args = get_args(t)
|
||||
if origin is None:
|
||||
return {}
|
||||
|
||||
type_params = origin.__parameters__
|
||||
# Create a map from type parameters to actual type arguments
|
||||
type_map = dict(zip(type_params, type_args))
|
||||
|
||||
return type_map
|
||||
|
||||
|
||||
def apply_annotations(schema: dict[str, Any], annotations: list[Any]) -> dict[str, Any]:
|
||||
"""
|
||||
Add metadata from typing.annotations to the json Schema.
|
||||
The annotations can be a dict, a tuple, or a string and is directly applied to the schema as shown below.
|
||||
No further validation is done, the caller is responsible for following json-schema.
|
||||
|
||||
Examples
|
||||
|
||||
```python
|
||||
# String annotation
|
||||
Annotated[int, "This is an int"] -> {"type": "integer", "description": "This is an int"}
|
||||
|
||||
# Dict annotation
|
||||
Annotated[int, {"minimum": 0, "maximum": 10}] -> {"type": "integer", "minimum": 0, "maximum": 10}
|
||||
|
||||
# Tuple annotation
|
||||
Annotated[int, ("minimum", 0)] -> {"type": "integer", "minimum": 0}
|
||||
```
|
||||
"""
|
||||
for annotation in annotations:
|
||||
if isinstance(annotation, dict):
|
||||
# Assuming annotation is a dict that can directly apply to the schema
|
||||
schema.update(annotation)
|
||||
elif isinstance(annotation, tuple) and len(annotation) == 2:
|
||||
# Assuming a tuple where first element is a keyword (like 'minLength') and the second is the value
|
||||
schema[annotation[0]] = annotation[1]
|
||||
elif isinstance(annotation, str):
|
||||
# String annotations can be used for description
|
||||
schema.update({"description": f"{annotation}"})
|
||||
return schema
|
||||
|
||||
|
||||
def type_to_dict(t: Any, scope: str = "", type_map: dict[TypeVar, type] = {}) -> dict:
|
||||
if t is None:
|
||||
return {"type": "null"}
|
||||
|
||||
if dataclasses.is_dataclass(t):
|
||||
fields = dataclasses.fields(t)
|
||||
properties = {
|
||||
f.name: type_to_dict(f.type, f"{scope} {t.__name__}.{f.name}")
|
||||
f.name: type_to_dict(f.type, f"{scope} {t.__name__}.{f.name}", type_map)
|
||||
for f in fields
|
||||
}
|
||||
required = [pn for pn, pv in properties.items() if "null" not in pv["type"]]
|
||||
|
||||
required = []
|
||||
for pn, pv in properties.items():
|
||||
if pv.get("type") is not None:
|
||||
if "null" not in pv["type"]:
|
||||
required.append(pn)
|
||||
|
||||
elif pv.get("oneOf") is not None:
|
||||
if "null" not in [i["type"] for i in pv.get("oneOf", [])]:
|
||||
required.append(pn)
|
||||
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": properties,
|
||||
@ -22,24 +94,54 @@ def type_to_dict(t: Any, scope: str = "") -> dict:
|
||||
# Dataclasses can only have the specified properties
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
elif type(t) is UnionType:
|
||||
return {
|
||||
"type": [type_to_dict(arg, scope)["type"] for arg in t.__args__],
|
||||
"oneOf": [type_to_dict(arg, scope, type_map) for arg in t.__args__],
|
||||
}
|
||||
|
||||
if isinstance(t, TypeVar):
|
||||
# if t is a TypeVar, look up the type in the type_map
|
||||
# And return the resolved type instead of the TypeVar
|
||||
resolved = type_map.get(t)
|
||||
if not resolved:
|
||||
raise JSchemaTypeError(
|
||||
f"{scope} - TypeVar {t} not found in type_map, map: {type_map}"
|
||||
)
|
||||
return type_to_dict(type_map.get(t), scope, type_map)
|
||||
|
||||
elif hasattr(t, "__origin__"): # Check if it's a generic type
|
||||
origin = getattr(t, "__origin__", None)
|
||||
origin = get_origin(t)
|
||||
args = get_args(t)
|
||||
|
||||
if origin is None:
|
||||
# Non-generic user-defined or built-in type
|
||||
# TODO: handle custom types
|
||||
raise BaseException("Unhandled Type: ", origin)
|
||||
raise JSchemaTypeError("Unhandled Type: ", origin)
|
||||
|
||||
elif origin is Literal:
|
||||
# Handle Literal values for enums in JSON Schema
|
||||
return {
|
||||
"type": "string",
|
||||
"enum": list(args), # assumes all args are strings
|
||||
}
|
||||
|
||||
elif origin is Annotated:
|
||||
base_type, *metadata = get_args(t)
|
||||
schema = type_to_dict(base_type, scope) # Generate schema for the base type
|
||||
return apply_annotations(schema, metadata)
|
||||
|
||||
elif origin is Union:
|
||||
return {"type": [type_to_dict(arg, scope)["type"] for arg in t.__args__]}
|
||||
union_types = [type_to_dict(arg, scope, type_map) for arg in t.__args__]
|
||||
return {
|
||||
"oneOf": union_types,
|
||||
}
|
||||
|
||||
elif issubclass(origin, list):
|
||||
return {"type": "array", "items": type_to_dict(t.__args__[0], scope)}
|
||||
elif origin in {list, set, frozenset}:
|
||||
return {
|
||||
"type": "array",
|
||||
"items": type_to_dict(t.__args__[0], scope, type_map),
|
||||
}
|
||||
|
||||
elif issubclass(origin, dict):
|
||||
value_type = t.__args__[1]
|
||||
@ -48,10 +150,19 @@ def type_to_dict(t: Any, scope: str = "") -> dict:
|
||||
else:
|
||||
return {
|
||||
"type": "object",
|
||||
"additionalProperties": type_to_dict(value_type, scope),
|
||||
"additionalProperties": type_to_dict(value_type, scope, type_map),
|
||||
}
|
||||
# Generic dataclass with type parameters
|
||||
elif dataclasses.is_dataclass(origin):
|
||||
# This behavior should mimic the scoping of typeVars in dataclasses
|
||||
# Once type_to_dict() encounters a TypeVar, it will look up the type in the type_map
|
||||
# When type_to_dict() returns the map goes out of scope.
|
||||
# This behaves like a stack, where the type_map is pushed and popped as we traverse the dataclass fields
|
||||
new_map = copy.deepcopy(type_map)
|
||||
new_map.update(inspect_dataclass_fields(t))
|
||||
return type_to_dict(origin, scope, new_map)
|
||||
|
||||
raise BaseException(f"Error api type not yet supported {t!s}")
|
||||
raise JSchemaTypeError(f"Error api type not yet supported {t!s}")
|
||||
|
||||
elif isinstance(t, type):
|
||||
if t is str:
|
||||
@ -65,7 +176,7 @@ def type_to_dict(t: Any, scope: str = "") -> dict:
|
||||
if t is object:
|
||||
return {"type": "object"}
|
||||
if t is Any:
|
||||
raise BaseException(
|
||||
raise JSchemaTypeError(
|
||||
f"Usage of the Any type is not supported for API functions. In: {scope}"
|
||||
)
|
||||
|
||||
@ -79,6 +190,6 @@ def type_to_dict(t: Any, scope: str = "") -> dict:
|
||||
if t is NoneType:
|
||||
return {"type": "null"}
|
||||
|
||||
raise BaseException(f"Error primitive type not supported {t!s}")
|
||||
raise JSchemaTypeError(f"Error primitive type not supported {t!s}")
|
||||
else:
|
||||
raise BaseException(f"Error type not supported {t!s}")
|
||||
raise JSchemaTypeError(f"Error type not supported {t!s}")
|
||||
|
@ -1,4 +1,5 @@
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from math import floor
|
||||
from pathlib import Path
|
||||
|
||||
@ -15,25 +16,17 @@ def text_heading(heading: str) -> str:
|
||||
return f"{'=' * filler} {heading} {'=' * filler}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CmdOut:
|
||||
def __init__(
|
||||
self,
|
||||
stdout: str,
|
||||
stderr: str,
|
||||
cwd: Path,
|
||||
command: str,
|
||||
returncode: int,
|
||||
msg: str | None,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.stdout = stdout
|
||||
self.stderr = stderr
|
||||
self.cwd = cwd
|
||||
self.command = command
|
||||
self.returncode = returncode
|
||||
self.msg = msg
|
||||
stdout: str
|
||||
stderr: str
|
||||
cwd: Path
|
||||
command: str
|
||||
returncode: int
|
||||
msg: str | None
|
||||
|
||||
self.error_str = f"""
|
||||
def __str__(self) -> str:
|
||||
error_str = f"""
|
||||
{text_heading(heading="Command")}
|
||||
{self.command}
|
||||
{text_heading(heading="Stderr")}
|
||||
@ -45,15 +38,30 @@ Message: {self.msg}
|
||||
Working Directory: '{self.cwd}'
|
||||
Return Code: {self.returncode}
|
||||
"""
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.error_str
|
||||
return error_str
|
||||
|
||||
|
||||
class ClanError(Exception):
|
||||
"""Base class for exceptions in this module."""
|
||||
|
||||
pass
|
||||
description: str | None
|
||||
location: str
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
msg: str | None = None,
|
||||
*,
|
||||
description: str | None = None,
|
||||
location: str | None = None,
|
||||
) -> None:
|
||||
self.description = description
|
||||
self.location = location or "Unknown location"
|
||||
self.msg = msg or ""
|
||||
if self.description:
|
||||
exception_msg = f"{self.location}: {self.msg} - {self.description}"
|
||||
else:
|
||||
exception_msg = f"{self.location}: {self.msg}"
|
||||
super().__init__(exception_msg)
|
||||
|
||||
|
||||
class ClanHttpError(ClanError):
|
||||
|
@ -1,66 +1,90 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.api import API
|
||||
|
||||
from ..cmd import CmdOut, run
|
||||
from ..errors import ClanError
|
||||
from ..nix import nix_command, nix_shell
|
||||
|
||||
DEFAULT_URL: str = "git+https://git.clan.lol/clan/clan-core"
|
||||
DEFAULT_TEMPLATE_URL: str = "git+https://git.clan.lol/clan/clan-core"
|
||||
|
||||
|
||||
def create_flake(directory: Path, url: str) -> dict[str, CmdOut]:
|
||||
@dataclass
|
||||
class CreateClanResponse:
|
||||
git_init: CmdOut
|
||||
git_add: CmdOut
|
||||
git_config: CmdOut
|
||||
flake_update: CmdOut
|
||||
|
||||
|
||||
@API.register
|
||||
def create_clan(directory: Path, template_url: str) -> CreateClanResponse:
|
||||
if not directory.exists():
|
||||
directory.mkdir()
|
||||
else:
|
||||
raise ClanError(f"Flake at '{directory}' already exists")
|
||||
response = {}
|
||||
raise ClanError(
|
||||
location=f"{directory.resolve()}",
|
||||
msg="Cannot create clan",
|
||||
description="Directory already exists",
|
||||
)
|
||||
|
||||
cmd_responses = {}
|
||||
command = nix_command(
|
||||
[
|
||||
"flake",
|
||||
"init",
|
||||
"-t",
|
||||
url,
|
||||
template_url,
|
||||
]
|
||||
)
|
||||
out = run(command, cwd=directory)
|
||||
|
||||
command = nix_shell(["nixpkgs#git"], ["git", "init"])
|
||||
out = run(command, cwd=directory)
|
||||
response["git init"] = out
|
||||
cmd_responses["git init"] = out
|
||||
|
||||
command = nix_shell(["nixpkgs#git"], ["git", "add", "."])
|
||||
out = run(command, cwd=directory)
|
||||
response["git add"] = out
|
||||
cmd_responses["git add"] = out
|
||||
|
||||
command = nix_shell(["nixpkgs#git"], ["git", "config", "user.name", "clan-tool"])
|
||||
out = run(command, cwd=directory)
|
||||
response["git config"] = out
|
||||
cmd_responses["git config"] = out
|
||||
|
||||
command = nix_shell(
|
||||
["nixpkgs#git"], ["git", "config", "user.email", "clan@example.com"]
|
||||
)
|
||||
out = run(command, cwd=directory)
|
||||
response["git config"] = out
|
||||
cmd_responses["git config"] = out
|
||||
|
||||
command = ["nix", "flake", "update"]
|
||||
out = run(command, cwd=directory)
|
||||
response["flake update"] = out
|
||||
cmd_responses["flake update"] = out
|
||||
|
||||
response = CreateClanResponse(
|
||||
git_init=cmd_responses["git init"],
|
||||
git_add=cmd_responses["git add"],
|
||||
git_config=cmd_responses["git config"],
|
||||
flake_update=cmd_responses["flake update"],
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
def create_flake_command(args: argparse.Namespace) -> None:
|
||||
create_flake(args.path, args.url)
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
def register_create_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument(
|
||||
"--url",
|
||||
type=str,
|
||||
help="url for the flake",
|
||||
default=DEFAULT_URL,
|
||||
help="url to the clan template",
|
||||
default=DEFAULT_TEMPLATE_URL,
|
||||
)
|
||||
parser.add_argument("path", type=Path, help="Path to the flake", default=Path("."))
|
||||
parser.add_argument(
|
||||
"path", type=Path, help="Path to the clan directory", default=Path(".")
|
||||
)
|
||||
|
||||
def create_flake_command(args: argparse.Namespace) -> None:
|
||||
create_clan(args.path, args.url)
|
||||
|
||||
parser.set_defaults(func=create_flake_command)
|
||||
|
@ -8,9 +8,8 @@ import {
|
||||
import { OperationResponse, pyApi } from "./message";
|
||||
|
||||
export const makeCountContext = () => {
|
||||
const [machines, setMachines] = createSignal<
|
||||
OperationResponse<"list_machines">
|
||||
>([]);
|
||||
const [machines, setMachines] =
|
||||
createSignal<OperationResponse<"list_machines">>();
|
||||
const [loading, setLoading] = createSignal(false);
|
||||
|
||||
pyApi.list_machines.receive((machines) => {
|
||||
@ -41,7 +40,7 @@ export const CountContext = createContext<CountContextType>([
|
||||
loading: () => false,
|
||||
|
||||
// eslint-disable-next-line
|
||||
machines: () => ([]),
|
||||
machines: () => undefined,
|
||||
},
|
||||
{
|
||||
// eslint-disable-next-line
|
||||
|
@ -72,6 +72,12 @@ const deserialize =
|
||||
// Create the API object
|
||||
|
||||
const pyApi: PyApi = {} as PyApi;
|
||||
|
||||
pyApi.create_clan.receive((r) => {
|
||||
if (r.status === "success") {
|
||||
r.status;
|
||||
}
|
||||
});
|
||||
operationNames.forEach((opName) => {
|
||||
const name = opName as OperationNames;
|
||||
// @ts-expect-error - TODO: Fix this. Typescript is not recognizing the receive function correctly
|
||||
|
@ -1,13 +1,30 @@
|
||||
import { For, Match, Switch, createEffect, type Component } from "solid-js";
|
||||
import {
|
||||
For,
|
||||
Match,
|
||||
Switch,
|
||||
createEffect,
|
||||
createSignal,
|
||||
type Component,
|
||||
} from "solid-js";
|
||||
import { useCountContext } from "../../Config";
|
||||
import { route } from "@/src/App";
|
||||
|
||||
export const MachineListView: Component = () => {
|
||||
const [{ machines, loading }, { getMachines }] = useCountContext();
|
||||
|
||||
const [data, setData] = createSignal<string[]>([]);
|
||||
createEffect(() => {
|
||||
if (route() === "machines") getMachines();
|
||||
});
|
||||
|
||||
createEffect(() => {
|
||||
const response = machines();
|
||||
if (response?.status === "success") {
|
||||
console.log(response.data);
|
||||
setData(response.data);
|
||||
}
|
||||
});
|
||||
|
||||
return (
|
||||
<div class="max-w-screen-lg">
|
||||
<div class="tooltip" data-tip="Refresh ">
|
||||
@ -32,12 +49,12 @@ export const MachineListView: Component = () => {
|
||||
</div>
|
||||
</div>
|
||||
</Match>
|
||||
<Match when={!loading() && machines().length === 0}>
|
||||
<Match when={!loading() && data().length === 0}>
|
||||
No machines found
|
||||
</Match>
|
||||
<Match when={!loading()}>
|
||||
<ul>
|
||||
<For each={machines()}>
|
||||
<For each={data()}>
|
||||
{(entry) => (
|
||||
<li>
|
||||
<div class="card card-side m-2 bg-base-100 shadow-lg">
|
||||
|
Loading…
Reference in New Issue
Block a user