Init: Autogenerate classes from nix interfaces

This commit is contained in:
Johannes Kirschbauer 2024-07-17 23:11:15 +02:00
parent fd0ebc7ec0
commit 7e84eaa4b3
Signed by: hsjobeki
SSH Key Fingerprint: SHA256:vX3utDqig7Ph5L0JPv87ZTPb/w7cMzREKVZzzLFg9qU
8 changed files with 416 additions and 192 deletions

View File

@ -26,9 +26,6 @@
},
"roles": {
"default": {
"config": {
"packages": ["vim"]
},
"imports": [],
"machines": ["test-inventory-machine"],
"tags": []

View File

@ -41,6 +41,7 @@ in
options = {
inherit (metaOptions) name description icon;
tags = lib.mkOption {
default = [ ];
apply = lib.unique;
type = types.listOf types.str;
@ -49,16 +50,10 @@ in
default = null;
type = types.nullOr types.str;
};
deploy = lib.mkOption {
default = { };
type = types.submodule {
options = {
targetHost = lib.mkOption {
default = null;
type = types.nullOr types.str;
};
};
};
deploy.targetHost = lib.mkOption {
description = "Configuration for the deployment of the machine";
default = null;
type = types.nullOr types.str;
};
};
}

View File

@ -55,6 +55,7 @@ let
inventorySchema.properties.services.additionalProperties.additionalProperties.properties.meta;
config = {
title = "${moduleName}-config";
default = { };
} // moduleSchema;
roles = {
type = "object";
@ -69,6 +70,7 @@ let
{
properties.config = {
title = "${moduleName}-config";
default = { };
} // moduleSchema;
};
}) (rolesOf moduleName)
@ -80,6 +82,7 @@ let
{
additionalProperties.properties.config = {
title = "${moduleName}-config";
default = { };
} // moduleSchema;
};
};

View File

@ -318,7 +318,7 @@ rec {
# return jsonschema property definition for submodule
# then (lib.attrNames (option.type.getSubOptions option.loc).opt)
then
parseOptions' (option.type.getSubOptions option.loc)
example // description // parseOptions' (option.type.getSubOptions option.loc)
# throw error if option type is not supported
else
notSupported option;

View File

@ -1,16 +1,22 @@
# ruff: noqa: N815
# ruff: noqa: N806
import dataclasses
import json
from dataclasses import asdict, dataclass, field, is_dataclass
from dataclasses import asdict, fields, is_dataclass
from pathlib import Path
from typing import Any, Literal
from types import UnionType
from typing import Any, get_args, get_origin
from clan_cli.errors import ClanError
from clan_cli.git import commit_file
from .classes import Inventory as NixInventory
from .classes import Machine, Service
from .classes import Meta as InventoryMeta
__all__ = ["Service", "Machine", "InventoryMeta"]
def sanitize_string(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"')
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def dataclass_to_dict(obj: Any) -> Any:
@ -37,149 +43,132 @@ def dataclass_to_dict(obj: Any) -> Any:
return obj
@dataclass
class DeploymentInfo:
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def get_second_type(type_hint: type[dict]) -> type:
"""
Deployment information for a machine.
Get the value type of a dictionary type hint
"""
args = get_args(type_hint)
if len(args) == 2:
# Return the second argument, which should be the value type (Machine)
return args[1]
targetHost: str | None = None
raise ValueError(f"Invalid type hint for dict: {type_hint}")
@dataclass
class Machine:
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Inventory machine model.
DO NOT EDIT THIS CLASS.
Any changes here must be reflected in the inventory interface file and potentially other nix files.
- Persisted to the inventory.json file
- Source of truth to generate each clan machine.
- For hardware deployment, the machine must declare the host system.
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if data is None:
return None
name: str
deploy: DeploymentInfo = field(default_factory=DeploymentInfo)
description: str | None = None
icon: str | None = None
tags: list[str] = field(default_factory=list)
system: Literal["x86_64-linux"] | str | None = None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
field_value = data.get(field.name)
field_type = get_inner_type(field.type) # type: ignore
@staticmethod
def from_dict(data: dict[str, Any]) -> "Machine":
targetHost = data.get("deploy", {}).get("targetHost", None)
return Machine(
name=data["name"],
description=data.get("description", None),
icon=data.get("icon", None),
tags=data.get("tags", []),
system=data.get("system", None),
deploy=DeploymentInfo(targetHost),
)
if field.name in data:
# The field is present
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
elif get_origin(field_type) is dict and isinstance(field_value, dict):
# The field is a dictionary with a specific type
inner_type = get_second_type(field_type)
field_value = {
k: from_dict(inner_type, v) for k, v in field_value.items()
}
elif get_origin is list and isinstance(field_value, list):
# The field is a list with a specific type
inner_type = get_args(field_type)[0]
field_value = [from_dict(inner_type, v) for v in field_value]
# Set the value
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Fields with default value
# a: Int = 1
# b: list = Field(default_factory=list)
if field.name in data or field_value is not None:
field_values[field.name] = field_value
else:
# Fields without default value
# a: Int
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e}")
return None
@dataclass
class MachineServiceConfig:
config: dict[str, Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
@dataclass
class ServiceMeta:
name: str
description: str | None = None
icon: str | None = None
@dataclass
class Role:
config: dict[str, Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
machines: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
@dataclass
class Service:
meta: ServiceMeta
roles: dict[str, Role]
config: dict[str, Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
machines: dict[str, MachineServiceConfig] = field(default_factory=dict)
@staticmethod
def from_dict(d: dict[str, Any]) -> "Service":
return Service(
meta=ServiceMeta(**d.get("meta", {})),
roles={name: Role(**role) for name, role in d.get("roles", {}).items()},
machines=(
{
name: MachineServiceConfig(**machine)
for name, machine in d.get("machines", {}).items()
}
if d.get("machines")
else {}
),
config=d.get("config", {}),
imports=d.get("imports", []),
)
@dataclass
class InventoryMeta:
name: str
description: str | None = None
icon: str | None = None
@dataclass
class Inventory:
meta: InventoryMeta
machines: dict[str, Machine]
services: dict[str, dict[str, Service]]
nix_inventory: NixInventory
@staticmethod
def from_dict(d: dict[str, Any]) -> "Inventory":
return Inventory(
meta=InventoryMeta(**d.get("meta", {})),
machines={
name: Machine.from_dict(machine)
for name, machine in d.get("machines", {}).items()
},
services={
name: {
role: Service.from_dict(service)
for role, service in services.items()
}
for name, services in d.get("services", {}).items()
},
def __init__(self) -> None:
self.nix_inventory = NixInventory(
meta=InventoryMeta(name="New Clan"), machines={}, services=Service()
)
@staticmethod
def get_path(flake_dir: str | Path) -> Path:
return Path(flake_dir) / "inventory.json"
return (Path(flake_dir) / "inventory.json").resolve()
@staticmethod
def load_file(flake_dir: str | Path) -> "Inventory":
inventory = Inventory(
machines={}, services={}, meta=InventoryMeta(name="New Clan")
inventory = from_dict(
NixInventory,
{
"meta": {"name": "New Clan"},
"machines": {},
"services": {},
},
)
NixInventory(
meta=InventoryMeta(name="New Clan"), machines={}, services=Service()
)
inventory_file = Inventory.get_path(flake_dir)
if inventory_file.exists():
with open(inventory_file) as f:
try:
res = json.load(f)
inventory = Inventory.from_dict(res)
inventory = from_dict(NixInventory, res)
except json.JSONDecodeError as e:
raise ClanError(f"Error decoding inventory file: {e}")
return inventory
res = Inventory()
res.nix_inventory = inventory
return Inventory()
def persist(self, flake_dir: str | Path, message: str) -> None:
inventory_file = Inventory.get_path(flake_dir)
with open(inventory_file, "w") as f:
json.dump(dataclass_to_dict(self), f, indent=2)
json.dump(dataclass_to_dict(self.nix_inventory), f, indent=2)
commit_file(inventory_file, Path(flake_dir), commit_message=message)

View File

@ -0,0 +1,175 @@
# DON NOT EDIT THIS FILE MANUALLY. IT IS GENERATED.
# UPDATE
# ruff: noqa: N815
# ruff: noqa: N806
from dataclasses import dataclass, field
from typing import Any
@dataclass
class MachineDeploy:
targetHost: str | None = None
@dataclass
class Machine:
deploy: MachineDeploy
name: str
description: str | None = None
icon: str | None = None
system: str | None = None
tags: list[str] = field(default_factory=list)
@dataclass
class Meta:
name: str
description: str | None = None
icon: str | None = None
@dataclass
class BorgbackupConfigDestination:
repo: str
name: str
@dataclass
class BorgbackupConfig:
destinations: dict[str, BorgbackupConfigDestination] | dict[str,Any] = field(default_factory=dict)
@dataclass
class ServiceBorgbackupMachine:
config: BorgbackupConfig | dict[str,Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
@dataclass
class ServiceBorgbackupMeta:
name: str
description: str | None = None
icon: str | None = None
@dataclass
class ServiceBorgbackupRoleClient:
config: BorgbackupConfig | dict[str,Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
machines: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
@dataclass
class ServiceBorgbackupRoleServer:
config: BorgbackupConfig | dict[str,Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
machines: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
@dataclass
class ServiceBorgbackupRole:
client: ServiceBorgbackupRoleClient
server: ServiceBorgbackupRoleServer
@dataclass
class ServiceBorgbackup:
meta: ServiceBorgbackupMeta
roles: ServiceBorgbackupRole
config: BorgbackupConfig | dict[str,Any] = field(default_factory=dict)
machines: dict[str, ServiceBorgbackupMachine] | dict[str,Any] = field(default_factory=dict)
@dataclass
class PackagesConfig:
packages: list[str] = field(default_factory=list)
@dataclass
class ServicePackageMachine:
config: dict[str,Any] | PackagesConfig = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
@dataclass
class ServicePackageMeta:
name: str
description: str | None = None
icon: str | None = None
@dataclass
class ServicePackageRoleDefault:
config: dict[str,Any] | PackagesConfig = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
machines: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
@dataclass
class ServicePackageRole:
default: ServicePackageRoleDefault
@dataclass
class ServicePackage:
meta: ServicePackageMeta
roles: ServicePackageRole
config: dict[str,Any] | PackagesConfig = field(default_factory=dict)
machines: dict[str, ServicePackageMachine] | dict[str,Any] = field(default_factory=dict)
@dataclass
class SingleDiskConfig:
device: str
@dataclass
class ServiceSingleDiskMachine:
config: SingleDiskConfig | dict[str,Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
@dataclass
class ServiceSingleDiskMeta:
name: str
description: str | None = None
icon: str | None = None
@dataclass
class ServiceSingleDiskRoleDefault:
config: SingleDiskConfig | dict[str,Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
machines: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
@dataclass
class ServiceSingleDiskRole:
default: ServiceSingleDiskRoleDefault
@dataclass
class ServiceSingleDisk:
meta: ServiceSingleDiskMeta
roles: ServiceSingleDiskRole
config: SingleDiskConfig | dict[str,Any] = field(default_factory=dict)
machines: dict[str, ServiceSingleDiskMachine] | dict[str,Any] = field(default_factory=dict)
@dataclass
class Service:
borgbackup: dict[str, ServiceBorgbackup] = field(default_factory=dict)
packages: dict[str, ServicePackage] = field(default_factory=dict)
single_disk: dict[str, ServiceSingleDisk] = field(default_factory=dict)
@dataclass
class Inventory:
meta: Meta
services: Service
machines: dict[str, Machine] | dict[str,Any] = field(default_factory=dict)

View File

@ -9,7 +9,7 @@
{ self', pkgs, ... }:
let
flakeLock = lib.importJSON (self + /flake.lock);
flakeInputs = (builtins.removeAttrs inputs [ "self" ]);
flakeInputs = builtins.removeAttrs inputs [ "self" ];
flakeLockVendoredDeps = flakeLock // {
nodes =
flakeLock.nodes
@ -38,7 +38,6 @@
'';
in
{
devShells.clan-cli = pkgs.callPackage ./shell.nix {
inherit (self'.packages) clan-cli clan-cli-full;
inherit self';
@ -84,6 +83,35 @@
default = self'.packages.clan-cli;
};
checks = self'.packages.clan-cli.tests;
checks = self'.packages.clan-cli.tests // {
inventory-classes-up-to-date = pkgs.stdenv.mkDerivation {
name = "inventory-classes-up-to-date";
src = ./clan_cli/inventory;
env = {
classFile = "classes.py";
};
installPhase = ''
${self'.packages.classgen}/bin/classgen ${self'.packages.inventory-schema-pretty}/schema.json b_classes.py
file1=$classFile
file2=b_classes.py
echo "Comparing $file1 and $file2"
if cmp -s "$file1" "$file2"; then
echo "Files are identical"
echo "Classes file is up to date"
else
echo "Classes file is out of date or has been modified"
echo "run ./update.sh in the inventory directory to update the classes file"
echo "--------------------------------\n"
diff "$file1" "$file2"
echo "--------------------------------\n\n"
exit 1
fi
touch $out
'';
};
};
};
}

View File

@ -3,26 +3,33 @@ import json
from typing import Any
# Function to map JSON schema types to Python types
def map_json_type(json_type: Any, nested_type: str = "Any") -> str:
# Function to map JSON schemas and types to Python types
def map_json_type(
json_type: Any, nested_types: set[str] = {"Any"}, parent: Any = None
) -> set[str]:
if isinstance(json_type, list):
return " | ".join(map(map_json_type, json_type))
res = set()
for t in json_type:
res |= map_json_type(t)
return res
if isinstance(json_type, dict):
return map_json_type(json_type.get("type"))
elif json_type == "string":
return "str"
return {"str"}
elif json_type == "integer":
return "int"
return {"int"}
elif json_type == "boolean":
return "bool"
return {"bool"}
elif json_type == "array":
return f"list[{nested_type}]" # Further specification can be handled if needed
assert nested_types, f"Array type not found for {parent}"
return {f"""list[{" | ".join(nested_types)}]"""}
elif json_type == "object":
return f"dict[str, {nested_type}]"
assert nested_types, f"dict type not found for {parent}"
return {f"""dict[str, {" | ".join(nested_types)}]"""}
elif json_type == "null":
return "None"
return {"None"}
else:
return "Any"
raise ValueError(f"Python type not found for {json_type}")
known_classes = set()
@ -32,9 +39,9 @@ root_class = "Inventory"
# Recursive function to generate dataclasses from JSON schema
def generate_dataclass(schema: dict[str, Any], class_name: str = root_class) -> str:
properties = schema.get("properties", {})
required = schema.get("required", [])
fields = []
required_fields = []
fields_with_default = []
nested_classes = []
for prop, prop_info in properties.items():
@ -42,77 +49,107 @@ def generate_dataclass(schema: dict[str, Any], class_name: str = root_class) ->
prop_type = prop_info.get("type", None)
union_variants = prop_info.get("oneOf", [])
# Collect all types
field_types = set()
title = prop_info.get("title", prop.removesuffix("s"))
title_sanitized = "".join([p.capitalize() for p in title.split("-")])
nested_class_name = f"""{class_name if class_name != root_class and not prop_info.get("title") else ""}{title_sanitized}"""
# if nested_class_name == "ServiceBorgbackupRoleServerConfig":
# breakpoint()
if (prop_type is None) and (not union_variants):
raise ValueError(f"Type not found for property {prop} {prop_info}")
# Unions fields (oneOf)
# str | int | None
python_type = None
if union_variants:
python_type = map_json_type(union_variants)
field_types = map_json_type(union_variants)
elif prop_type == "array":
item_schema = prop_info.get("items")
if isinstance(item_schema, dict):
python_type = map_json_type(
prop_type,
map_json_type(item_schema),
field_types = map_json_type(
prop_type, map_json_type(item_schema), field_name
)
else:
python_type = map_json_type(
prop_type,
map_json_type([i for i in prop_info.get("items", [])]),
)
assert python_type, f"Python type not found for {prop} {prop_info}"
elif prop_type == "object":
inner_type = prop_info.get("additionalProperties")
if inner_type and inner_type.get("type") == "object":
# Inner type is a class
field_types = map_json_type(prop_type, {nested_class_name}, field_name)
if prop in required:
field_def = f"{prop}: {python_type}"
else:
field_def = f"{prop}: {python_type} | None = None"
#
if nested_class_name not in known_classes:
nested_classes.append(
generate_dataclass(inner_type, nested_class_name)
)
known_classes.add(nested_class_name)
if prop_type == "object":
map_type = prop_info.get("additionalProperties")
if map_type:
# breakpoint()
if map_type.get("type") == "object":
# Non trivial type
if nested_class_name not in known_classes:
nested_classes.append(
generate_dataclass(map_type, nested_class_name)
)
known_classes.add(nested_class_name)
field_def = f"{field_name}: dict[str, {nested_class_name}]"
else:
# Trivial type
field_def = f"{field_name}: dict[str, {map_json_type(map_type)}]"
else:
elif inner_type and inner_type.get("type") != "object":
# Trivial type
field_types = map_json_type(inner_type)
elif not inner_type:
# The type is a class
field_types = {nested_class_name}
if nested_class_name not in known_classes:
nested_classes.append(
generate_dataclass(prop_info, nested_class_name)
)
known_classes.add(nested_class_name)
else:
field_types = map_json_type(
prop_type,
nested_types=set(),
parent=field_name,
)
field_def = f"{field_name}: {nested_class_name}"
assert field_types, f"Python type not found for {prop} {prop_info}"
elif prop_type == "array":
items = prop_info.get("items", {})
if items.get("type") == "object":
nested_class_name = prop.capitalize()
nested_classes.append(generate_dataclass(items, nested_class_name))
field_def = f"{field_name}: List[{nested_class_name}]"
serialised_types = " | ".join(field_types)
field_def = f"{field_name}: {serialised_types}"
fields.append(field_def)
if "default" in prop_info or field_name not in prop_info.get("required", []):
if "default" in prop_info:
default_value = prop_info.get("default")
if default_value is None:
field_types |= {"None"}
serialised_types = " | ".join(field_types)
field_def = f"{field_name}: {serialised_types} = None"
elif isinstance(default_value, list):
field_def = f"{field_def} = field(default_factory=list)"
elif isinstance(default_value, dict):
field_types |= {"dict[str,Any]"}
serialised_types = " | ".join(field_types)
field_def = f"{field_name}: {serialised_types} = field(default_factory=dict)"
elif default_value == "name":
# Special case for nix submodules
pass
else:
# Other default values unhandled yet.
raise ValueError(
f"Unhandled default value for field '{field_name}' - default value: {default_value}"
)
fields_str = "\n ".join(fields)
fields_with_default.append(field_def)
if "default" not in prop_info:
# Field is not required and but also specifies no default value
# Trying to infer default value from type
if "dict" in str(serialised_types):
field_def = f"{field_name}: {serialised_types} = field(default_factory=dict)"
fields_with_default.append(field_def)
elif "list" in str(serialised_types):
field_def = f"{field_name}: {serialised_types} = field(default_factory=list)"
fields_with_default.append(field_def)
elif "None" in str(serialised_types):
field_def = f"{field_name}: {serialised_types} = None"
fields_with_default.append(field_def)
else:
# Field is not required and but also specifies no default value
required_fields.append(field_def)
else:
required_fields.append(field_def)
fields_str = "\n ".join(required_fields + fields_with_default)
nested_classes_str = "\n\n".join(nested_classes)
class_def = f"@dataclass\nclass {class_name}:\n {fields_str}\n"
@ -130,10 +167,10 @@ def run_gen(args: argparse.Namespace) -> None:
f.write(
"""
# DON NOT EDIT THIS FILE MANUALLY. IT IS GENERATED.
# UPDATE:
# UPDATE
# ruff: noqa: N815
# ruff: noqa: N806
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Any\n\n
"""
)