diff --git a/pkgs/clan-cli/clan_cli/config/__init__.py b/pkgs/clan-cli/clan_cli/config/__init__.py index 090526ab..3e9c36dd 100644 --- a/pkgs/clan-cli/clan_cli/config/__init__.py +++ b/pkgs/clan-cli/clan_cli/config/__init__.py @@ -6,11 +6,22 @@ import sys from pathlib import Path from typing import Any, Optional, Type, Union +import jsonschema + from clan_cli.errors import ClanError script_dir = Path(__file__).parent +type_map: dict[str, type] = { + "array": list, + "boolean": bool, + "integer": int, + "number": float, + "string": str, +} + + class Kwargs: def __init__(self) -> None: self.type: Optional[Type] = None @@ -21,6 +32,14 @@ class Kwargs: self.choices: Optional[list] = None +# A container inheriting from list, but overriding __contains__ to return True +# for all values. +# This is used to allow any value for the "choices" field of argparse +class AllContainer(list): + def __contains__(self, item: Any) -> bool: + return True + + def schema_from_module_file( file: Union[str, Path] = f"{script_dir}/jsonschema/example-schema.json", ) -> dict[str, Any]: @@ -41,6 +60,75 @@ def schema_from_module_file( ) +def options_types_from_schema(schema: dict[str, Any]) -> dict[str, Type]: + result: dict[str, Type] = {} + for name, value in schema.get("properties", {}).items(): + assert isinstance(value, dict) + type_ = value["type"] + if type_ == "object": + # handle additionalProperties + if "additionalProperties" in value: + sub_type = value["additionalProperties"].get("type") + if sub_type not in type_map: + raise ClanError( + f"Unsupported object type {sub_type} (field {name})" + ) + result[f"{name}."] = type_map[sub_type] + continue + # handle properties + sub_result = options_types_from_schema(value) + for sub_name, sub_type in sub_result.items(): + result[f"{name}.{sub_name}"] = sub_type + continue + elif type_ == "array": + if "items" not in value: + raise ClanError(f"Untyped arrays are not supported (field: {name})") + sub_type = value["items"].get("type") + if sub_type not in type_map: + raise ClanError(f"Unsupported list type {sub_type} (field {name})") + sub_type_: type = type_map[sub_type] + result[name] = list[sub_type_] # type: ignore + continue + result[name] = type_map[type_] + return result + + +def process_args(args: argparse.Namespace, schema: dict) -> None: + option = args.option + value_arg = args.value + + option_path = option.split(".") + # construct a nested dict from the option path and set the value + result: dict[str, Any] = {} + current = result + for part in option_path[:-1]: + current[part] = {} + current = current[part] + current[option_path[-1]] = value_arg + + # validate the result against the schema and cast the value to the expected type + try: + jsonschema.validate(result, schema) + except jsonschema.ValidationError as e: + schema_type = type_map[e.schema["type"]] + # we use nargs="+", so we need to unwrap non-list values + if isinstance(e.instance, list) and schema_type != list: + instance_unwrapped = e.instance[0] + else: + instance_unwrapped = e.instance + # try casting the value to the expected type + try: + value_casted = schema_type(instance_unwrapped) + except TypeError: + raise ClanError( + f"Invalid value for {'.'.join(e.relative_path)}: {instance_unwrapped} (expected type: {schema_type})" + ) from e + current[option_path[-1]] = value_casted + + # print the result as json + print(json.dumps(result, indent=2)) + + def register_parser( parser: argparse.ArgumentParser, file: Path = Path(f"{script_dir}/jsonschema/example-schema.json"), @@ -63,87 +151,46 @@ def _register_parser( if schema["type"] != "object": raise ClanError("Schema is not an object") - required_set = set(schema.get("required", [])) - - type_map: dict[str, Type] = { - "array": list, - "boolean": bool, - "integer": int, - "number": float, - "string": str, - } - if parser is None: parser = argparse.ArgumentParser(description=schema.get("description")) - subparsers = parser.add_subparsers( - title="more options", - description="Other options to configure", - help="the option to configure", - required=True, + # get all possible options from the schema + options = options_types_from_schema(schema) + + # inject callback function to process the input later + parser.set_defaults(func=lambda args: process_args(args, schema=schema)) + + # add single positional argument for the option (e.g. "foo.bar") + parser.add_argument( + "option", + # force this arg to be set + nargs="?", + help="Option to configure", + type=str, + choices=AllContainer(list(options.keys())), ) - for name, value in schema.get("properties", {}).items(): - assert isinstance(value, dict) - type_ = value.get("type") - - # TODO: add support for nested objects - if type_ == "object": - subparser = subparsers.add_parser(name, help=value.get("description")) - _register_parser(parser=subparser, schema=value) - continue - # elif value.get("type") == "array": - # subparser = parser.add_subparsers(dest=name) - # register_parser(subparser, value) - # continue - kwargs = Kwargs() - kwargs.default = value.get("default") - kwargs.help = value.get("description") - kwargs.required = name in required_set - - if kwargs.default is not None: - kwargs.help = f"{kwargs.help}, [{kwargs.default}] in default" - - if "enum" in value: - enum_list = value["enum"] - if len(enum_list) == 0: - raise ClanError("Enum List is Empty") - arg_type = type(enum_list[0]) - if not all(arg_type is type(item) for item in enum_list): - raise ClanError(f"Items in [{enum_list}] with Different Types") - - kwargs.type = arg_type - kwargs.choices = enum_list - elif type_ in type_map: - kwargs.type = type_map[type_] - del kwargs.choices - else: - raise ClanError(f"Unsupported Type '{type_}' in schema") - - name = f"--{name}" - - if kwargs.type is bool: - if kwargs.default: - raise ClanError("Boolean have to be False in default") - kwargs.default = False - kwargs.action = "store_true" - del kwargs.type - else: - del kwargs.action - - parser.add_argument(name, **vars(kwargs)) + # add a single optional argument for the value + parser.add_argument( + "value", + # force this arg to be set + nargs="+", + help="Value to set", + ) -def main() -> None: +def main(argv: Optional[list[str]] = None) -> None: + if argv is None: + argv = sys.argv parser = argparse.ArgumentParser() parser.add_argument( "schema", help="The schema to use for the configuration", - type=str, + type=Path, ) - args = parser.parse_args(sys.argv[1:2]) + args = parser.parse_args(argv[1:2]) register_parser(parser, args.schema) - parser.parse_args(sys.argv[2:]) + parser.parse_args(argv[2:]) if __name__ == "__main__": diff --git a/pkgs/clan-cli/default.nix b/pkgs/clan-cli/default.nix index cb76ab46..562256c8 100644 --- a/pkgs/clan-cli/default.nix +++ b/pkgs/clan-cli/default.nix @@ -1,26 +1,27 @@ -{ python3 -, ruff -, runCommand -, installShellFiles -, zerotierone -, bubblewrap -, sops -, age -, black -, nix -, mypy -, setuptools -, self +{ age , argcomplete +, black +, bubblewrap +, installShellFiles +, jsonschema +, mypy +, nix +, openssh , pytest , pytest-cov , pytest-subprocess -, openssh +, python3 +, ruff +, runCommand +, self +, setuptools +, sops , stdenv , wheel +, zerotierone }: let - dependencies = [ argcomplete ]; + dependencies = [ argcomplete jsonschema ]; testDependencies = [ pytest diff --git a/pkgs/clan-cli/pyproject.toml b/pkgs/clan-cli/pyproject.toml index f2d65021..e404e6af 100644 --- a/pkgs/clan-cli/pyproject.toml +++ b/pkgs/clan-cli/pyproject.toml @@ -27,7 +27,11 @@ disallow_untyped_defs = true no_implicit_optional = true [[tool.mypy.overrides]] -module = "setuptools.*" +module = "argcomplete.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "jsonschema.*" ignore_missing_imports = true [[tool.mypy.overrides]] @@ -35,7 +39,7 @@ module = "pytest.*" ignore_missing_imports = true [[tool.mypy.overrides]] -module = "argcomplete.*" +module = "setuptools.*" ignore_missing_imports = true [tool.ruff] diff --git a/pkgs/clan-cli/tests/test_config.py b/pkgs/clan-cli/tests/test_config.py index 345a3fa2..c18bf014 100644 --- a/pkgs/clan-cli/tests/test_config.py +++ b/pkgs/clan-cli/tests/test_config.py @@ -1,4 +1,6 @@ +import argparse import json +import sys from pathlib import Path from typing import Any @@ -6,15 +8,12 @@ import pytest from clan_cli import config -base_args = [ - "", - f"{Path(config.__file__).parent}/jsonschema/example-schema.json", -] +example_schema = f"{Path(config.__file__).parent}/jsonschema/example-schema.json" # use pytest.parametrize @pytest.mark.parametrize( - "args,expected", + "argv,expected", [ (["name", "DavHau"], {"name": "DavHau"}), ( @@ -26,11 +25,85 @@ base_args = [ ], ) def test_set_some_option( - args: list[str], + argv: list[str], expected: dict[str, Any], capsys: pytest.CaptureFixture, + monkeypatch: pytest.MonkeyPatch, ) -> None: - config.main(base_args + args) - captured = capsys.readout() + # monkeypatch sys.argv + monkeypatch.setattr(sys, "argv", [""] + argv) + parser = argparse.ArgumentParser() + config.register_parser(parser=parser, file=Path(example_schema)) + args = parser.parse_args() + args.func(args) + captured = capsys.readouterr() + print(captured.out) json_out = json.loads(captured.out) assert json_out == expected + + +def test_walk_jsonschema_all_types() -> None: + schema = dict( + type="object", + properties=dict( + array=dict( + type="array", + items=dict( + type="string", + ), + ), + boolean=dict(type="boolean"), + integer=dict(type="integer"), + number=dict(type="number"), + string=dict(type="string"), + ), + ) + expected = { + "array": list[str], + "boolean": bool, + "integer": int, + "number": float, + "string": str, + } + assert config.options_types_from_schema(schema) == expected + + +def test_walk_jsonschema_nested() -> None: + schema = dict( + type="object", + properties=dict( + name=dict( + type="object", + properties=dict( + first=dict(type="string"), + last=dict(type="string"), + ), + ), + age=dict(type="integer"), + ), + ) + expected = { + "age": int, + "name.first": str, + "name.last": str, + } + assert config.options_types_from_schema(schema) == expected + + +# test walk_jsonschema with dynamic attributes (e.g. "additionalProperties") +def test_walk_jsonschema_dynamic_attrs() -> None: + schema = dict( + type="object", + properties=dict( + age=dict(type="integer"), + users=dict( + type="object", + additionalProperties=dict(type="string"), + ), + ), + ) + expected = { + "age": int, + "users.": str, # is a placeholder for any string + } + assert config.options_types_from_schema(schema) == expected