clan-config: improve jsonschema arg parsing
All checks were successful
build / test (pull_request) Successful in 21s

- output json to stdout after success
- expect args in the style: `foo.bar = baz`
- handle different input types
- cast input types on best effort basis
- throw meaningful errors
This commit is contained in:
DavHau 2023-08-11 00:05:52 +02:00
parent d04278e9b1
commit 0a9b914ad5
4 changed files with 218 additions and 93 deletions

View File

@ -6,11 +6,22 @@ import sys
from pathlib import Path
from typing import Any, Optional, Type, Union
import jsonschema
from clan_cli.errors import ClanError
script_dir = Path(__file__).parent
type_map: dict[str, type] = {
"array": list,
"boolean": bool,
"integer": int,
"number": float,
"string": str,
}
class Kwargs:
def __init__(self) -> None:
self.type: Optional[Type] = None
@ -21,6 +32,14 @@ class Kwargs:
self.choices: Optional[list] = None
# A container inheriting from list, but overriding __contains__ to return True
# for all values.
# This is used to allow any value for the "choices" field of argparse
class AllContainer(list):
def __contains__(self, item: Any) -> bool:
return True
def schema_from_module_file(
file: Union[str, Path] = f"{script_dir}/jsonschema/example-schema.json",
) -> dict[str, Any]:
@ -41,6 +60,75 @@ def schema_from_module_file(
)
def options_types_from_schema(schema: dict[str, Any]) -> dict[str, Type]:
result: dict[str, Type] = {}
for name, value in schema.get("properties", {}).items():
assert isinstance(value, dict)
type_ = value["type"]
if type_ == "object":
# handle additionalProperties
if "additionalProperties" in value:
sub_type = value["additionalProperties"].get("type")
if sub_type not in type_map:
raise ClanError(
f"Unsupported object type {sub_type} (field {name})"
)
result[f"{name}.<name>"] = type_map[sub_type]
continue
# handle properties
sub_result = options_types_from_schema(value)
for sub_name, sub_type in sub_result.items():
result[f"{name}.{sub_name}"] = sub_type
continue
elif type_ == "array":
if "items" not in value:
raise ClanError(f"Untyped arrays are not supported (field: {name})")
sub_type = value["items"].get("type")
if sub_type not in type_map:
raise ClanError(f"Unsupported list type {sub_type} (field {name})")
sub_type_: type = type_map[sub_type]
result[name] = list[sub_type_] # type: ignore
continue
result[name] = type_map[type_]
return result
def process_args(args: argparse.Namespace, schema: dict) -> None:
option = args.option
value_arg = args.value
option_path = option.split(".")
# construct a nested dict from the option path and set the value
result: dict[str, Any] = {}
current = result
for part in option_path[:-1]:
current[part] = {}
current = current[part]
current[option_path[-1]] = value_arg
# validate the result against the schema and cast the value to the expected type
try:
jsonschema.validate(result, schema)
except jsonschema.ValidationError as e:
schema_type = type_map[e.schema["type"]]
# we use nargs="+", so we need to unwrap non-list values
if isinstance(e.instance, list) and schema_type != list:
instance_unwrapped = e.instance[0]
else:
instance_unwrapped = e.instance
# try casting the value to the expected type
try:
value_casted = schema_type(instance_unwrapped)
except TypeError:
raise ClanError(
f"Invalid value for {'.'.join(e.relative_path)}: {instance_unwrapped} (expected type: {schema_type})"
) from e
current[option_path[-1]] = value_casted
# print the result as json
print(json.dumps(result, indent=2))
def register_parser(
parser: argparse.ArgumentParser,
file: Path = Path(f"{script_dir}/jsonschema/example-schema.json"),
@ -63,87 +151,46 @@ def _register_parser(
if schema["type"] != "object":
raise ClanError("Schema is not an object")
required_set = set(schema.get("required", []))
type_map: dict[str, Type] = {
"array": list,
"boolean": bool,
"integer": int,
"number": float,
"string": str,
}
if parser is None:
parser = argparse.ArgumentParser(description=schema.get("description"))
subparsers = parser.add_subparsers(
title="more options",
description="Other options to configure",
help="the option to configure",
required=True,
# get all possible options from the schema
options = options_types_from_schema(schema)
# inject callback function to process the input later
parser.set_defaults(func=lambda args: process_args(args, schema=schema))
# add single positional argument for the option (e.g. "foo.bar")
parser.add_argument(
"option",
# force this arg to be set
nargs="?",
help="Option to configure",
type=str,
choices=AllContainer(list(options.keys())),
)
for name, value in schema.get("properties", {}).items():
assert isinstance(value, dict)
type_ = value.get("type")
# TODO: add support for nested objects
if type_ == "object":
subparser = subparsers.add_parser(name, help=value.get("description"))
_register_parser(parser=subparser, schema=value)
continue
# elif value.get("type") == "array":
# subparser = parser.add_subparsers(dest=name)
# register_parser(subparser, value)
# continue
kwargs = Kwargs()
kwargs.default = value.get("default")
kwargs.help = value.get("description")
kwargs.required = name in required_set
if kwargs.default is not None:
kwargs.help = f"{kwargs.help}, [{kwargs.default}] in default"
if "enum" in value:
enum_list = value["enum"]
if len(enum_list) == 0:
raise ClanError("Enum List is Empty")
arg_type = type(enum_list[0])
if not all(arg_type is type(item) for item in enum_list):
raise ClanError(f"Items in [{enum_list}] with Different Types")
kwargs.type = arg_type
kwargs.choices = enum_list
elif type_ in type_map:
kwargs.type = type_map[type_]
del kwargs.choices
else:
raise ClanError(f"Unsupported Type '{type_}' in schema")
name = f"--{name}"
if kwargs.type is bool:
if kwargs.default:
raise ClanError("Boolean have to be False in default")
kwargs.default = False
kwargs.action = "store_true"
del kwargs.type
else:
del kwargs.action
parser.add_argument(name, **vars(kwargs))
# add a single optional argument for the value
parser.add_argument(
"value",
# force this arg to be set
nargs="+",
help="Value to set",
)
def main() -> None:
def main(argv: Optional[list[str]] = None) -> None:
if argv is None:
argv = sys.argv
parser = argparse.ArgumentParser()
parser.add_argument(
"schema",
help="The schema to use for the configuration",
type=str,
type=Path,
)
args = parser.parse_args(sys.argv[1:2])
args = parser.parse_args(argv[1:2])
register_parser(parser, args.schema)
parser.parse_args(sys.argv[2:])
parser.parse_args(argv[2:])
if __name__ == "__main__":

View File

@ -1,26 +1,27 @@
{ python3
, ruff
, runCommand
, installShellFiles
, zerotierone
, bubblewrap
, sops
, age
, black
, nix
, mypy
, setuptools
, self
{ age
, argcomplete
, black
, bubblewrap
, installShellFiles
, jsonschema
, mypy
, nix
, openssh
, pytest
, pytest-cov
, pytest-subprocess
, openssh
, python3
, ruff
, runCommand
, self
, setuptools
, sops
, stdenv
, wheel
, zerotierone
}:
let
dependencies = [ argcomplete ];
dependencies = [ argcomplete jsonschema ];
testDependencies = [
pytest

View File

@ -27,7 +27,11 @@ disallow_untyped_defs = true
no_implicit_optional = true
[[tool.mypy.overrides]]
module = "setuptools.*"
module = "argcomplete.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "jsonschema.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
@ -35,7 +39,7 @@ module = "pytest.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "argcomplete.*"
module = "setuptools.*"
ignore_missing_imports = true
[tool.ruff]

View File

@ -1,4 +1,6 @@
import argparse
import json
import sys
from pathlib import Path
from typing import Any
@ -6,15 +8,12 @@ import pytest
from clan_cli import config
base_args = [
"",
f"{Path(config.__file__).parent}/jsonschema/example-schema.json",
]
example_schema = f"{Path(config.__file__).parent}/jsonschema/example-schema.json"
# use pytest.parametrize
@pytest.mark.parametrize(
"args,expected",
"argv,expected",
[
(["name", "DavHau"], {"name": "DavHau"}),
(
@ -26,11 +25,85 @@ base_args = [
],
)
def test_set_some_option(
args: list[str],
argv: list[str],
expected: dict[str, Any],
capsys: pytest.CaptureFixture,
monkeypatch: pytest.MonkeyPatch,
) -> None:
config.main(base_args + args)
captured = capsys.readout()
# monkeypatch sys.argv
monkeypatch.setattr(sys, "argv", [""] + argv)
parser = argparse.ArgumentParser()
config.register_parser(parser=parser, file=Path(example_schema))
args = parser.parse_args()
args.func(args)
captured = capsys.readouterr()
print(captured.out)
json_out = json.loads(captured.out)
assert json_out == expected
def test_walk_jsonschema_all_types() -> None:
schema = dict(
type="object",
properties=dict(
array=dict(
type="array",
items=dict(
type="string",
),
),
boolean=dict(type="boolean"),
integer=dict(type="integer"),
number=dict(type="number"),
string=dict(type="string"),
),
)
expected = {
"array": list[str],
"boolean": bool,
"integer": int,
"number": float,
"string": str,
}
assert config.options_types_from_schema(schema) == expected
def test_walk_jsonschema_nested() -> None:
schema = dict(
type="object",
properties=dict(
name=dict(
type="object",
properties=dict(
first=dict(type="string"),
last=dict(type="string"),
),
),
age=dict(type="integer"),
),
)
expected = {
"age": int,
"name.first": str,
"name.last": str,
}
assert config.options_types_from_schema(schema) == expected
# test walk_jsonschema with dynamic attributes (e.g. "additionalProperties")
def test_walk_jsonschema_dynamic_attrs() -> None:
schema = dict(
type="object",
properties=dict(
age=dict(type="integer"),
users=dict(
type="object",
additionalProperties=dict(type="string"),
),
),
)
expected = {
"age": int,
"users.<name>": str, # <name> is a placeholder for any string
}
assert config.options_types_from_schema(schema) == expected