Merge pull request 'clan-config: improve and add flake-parts module for clan-config' (#152) from DavHau-clan-config into main
All checks were successful
build / test (push) Successful in 20s

Reviewed-on: #152
This commit is contained in:
DavHau 2023-08-22 21:17:47 +00:00
commit 7b2e9cc46e
6 changed files with 183 additions and 62 deletions

View File

@ -31,8 +31,9 @@
./lib/flake-module.nix
({ self, lib, ... }: {
flake.nixosModules = lib.mapAttrs (_: nix: { imports = [ nix ]; }) (self.lib.findNixFiles ./nixosModules);
flake.flakeModules = lib.mapAttrs (_: nix: { imports = [ nix ]; }) (self.lib.findNixFiles ./flakeModules);
flake.clanModules = lib.mapAttrs (_: nix: { imports = [ nix ]; }) (self.lib.findNixFiles ./clanModules);
flake.nixosModules = lib.mapAttrs (_: nix: { imports = [ nix ]; }) (self.lib.findNixFiles ./nixosModules);
})
];
});

View File

@ -0,0 +1,40 @@
{ self, inputs, ... }:
let
# take the default nixos configuration
options = self.nixosConfigurations.default.options;
# this is actually system independent as it uses toFile
docs = inputs.nixpkgs.legacyPackages.x86_64-linux.nixosOptionsDoc {
inherit options;
};
optionsJSONFile = docs.optionsJSON.options;
warnIfNoDefaultConfig = return:
if ! self ? nixosConfigurations.default
then
builtins.trace
"WARNING: .#nixosConfigurations.default could not be found. Please define it."
return
else return;
in
{
flake.clanOptions = warnIfNoDefaultConfig optionsJSONFile;
flake.clanSettings = self + /clan-settings.json;
perSystem = { pkgs, inputs', ... }: {
devShells.clan-config = pkgs.mkShell {
packages = [
inputs'.clan-core.packages.clan-cli
];
shellHook = ''
export CLAN_OPTIONS_FILE=$(nix eval --raw .#clanOptions)
export XDG_DATA_DIRS="${inputs'.clan-core.packages.clan-cli}/share''${XDG_DATA_DIRS:+:$XDG_DATA_DIRS}"
export fish_complete_path="${inputs'.clan-core.packages.clan-cli}/share/fish/vendor_completions.d''${fish_complete_path:+:$fish_complete_path}"
'';
};
};
}

View File

@ -1,9 +1,11 @@
# !/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any, Optional, Type
from typing import Any, Optional, Type, Union
from clan_cli.errors import ClanError
@ -14,7 +16,11 @@ script_dir = Path(__file__).parent
def map_type(type: str) -> Type:
if type == "boolean":
return bool
elif type in ["integer", "signed integer"]:
elif type in [
"integer",
"signed integer",
"16 bit unsigned integer; between 0 and 65535 (both inclusive)",
]:
return int
elif type == "string":
return str
@ -28,14 +34,19 @@ def map_type(type: str) -> Type:
raise ClanError(f"Unknown type {type}")
class Kwargs:
def __init__(self) -> None:
self.type: Optional[Type] = None
self.default: Any = None
self.required: bool = False
self.help: Optional[str] = None
self.action: Optional[str] = None
self.choices: Optional[list] = None
# merge two dicts recursively
def merge(a: dict, b: dict, path: list[str] = []) -> dict:
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
merge(a[key], b[key], path + [str(key)])
elif isinstance(a[key], list) and isinstance(b[key], list):
a[key].extend(b[key])
elif a[key] != b[key]:
raise Exception("Conflict at " + ".".join(path + [str(key)]))
else:
a[key] = b[key]
return a
# A container inheriting from list, but overriding __contains__ to return True
@ -46,20 +57,58 @@ class AllContainer(list):
return True
def process_args(option: str, value: Any, options: dict) -> None:
# value is always a list, as the arg parser cannot know the type upfront
# and therefore always allows multiple arguments.
def cast(value: Any, type: Type, opt_description: str) -> Any:
try:
# handle bools
if isinstance(type(), bool):
if value[0] in ["true", "True", "yes", "y", "1"]:
return True
elif value[0] in ["false", "False", "no", "n", "0"]:
return False
else:
raise ClanError(f"Invalid value {value} for boolean")
# handle lists
elif isinstance(type(), list):
subtype = type.__args__[0]
return [cast([x], subtype, opt_description) for x in value]
# handle dicts
elif isinstance(type(), dict):
if not isinstance(value, dict):
raise ClanError(
f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
)
subtype = type.__args__[1]
return {k: cast(v, subtype, opt_description) for k, v in value.items()}
else:
if len(value) > 1:
raise ClanError(f"Too many values for {opt_description}")
return type(value[0])
except ValueError:
raise ClanError(
f"Invalid type for option {opt_description} (expected {type.__name__})"
)
def process_args(
option: str, value: Any, options: dict, out_file: Path, option_description: str = ""
) -> None:
option_path = option.split(".")
# if the option cannot be found, then likely the type is attrs and we need to
# find the parent option
if option not in options:
if len(option_path) == 1:
raise ClanError(f"Option {option} not found")
raise ClanError(f"Option {option_description} not found")
option_parent = option_path[:-1]
attr = option_path[-1]
return process_args(
option=".".join(option_parent),
value={attr: value},
options=options,
out_file=out_file,
option_description=option,
)
target_type = map_type(options[option]["type"])
@ -72,52 +121,48 @@ def process_args(option: str, value: Any, options: dict) -> None:
current = current[part]
current[option_path[-1]] = value
# value is always a list, as the arg parser cannot know the type upfront
# and therefore always allows multiple arguments.
def cast(value: Any, type: Type) -> Any:
try:
# handle bools
if isinstance(type(), bool):
if value == "true":
return True
elif value == "false":
return False
else:
raise ClanError(f"Invalid value {value} for boolean")
# handle lists
elif isinstance(type(), list):
subtype = type.__args__[0]
return [cast([x], subtype) for x in value]
# handle dicts
elif isinstance(type(), dict):
if not isinstance(value, dict):
raise ClanError(
f"Cannot set {option} directly. Specify a suboption like {option}.<name>"
)
subtype = type.__args__[1]
return {k: cast(v, subtype) for k, v in value.items()}
else:
if len(value) > 1:
raise ClanError(f"Too many values for {option}")
return type(value[0])
except ValueError:
raise ClanError(
f"Invalid type for option {option} (expected {type.__name__})"
)
casted = cast(value, target_type)
casted = cast(value, target_type, option)
current[option_path[-1]] = casted
# print the result as json
print(json.dumps(result, indent=2))
# check if there is an existing config file
if os.path.exists(out_file):
with open(out_file) as f:
current_config = json.load(f)
else:
current_config = {}
# merge and save the new config file
new_config = merge(current_config, result)
with open(out_file, "w") as f:
json.dump(new_config, f, indent=2)
print("New config:")
print(json.dumps(new_config, indent=2))
def register_parser(
parser: argparse.ArgumentParser,
file: Path = Path(f"{script_dir}/jsonschema/options.json"),
optionsFile: Optional[Union[str, Path]] = os.environ.get("CLAN_OPTIONS_FILE"),
) -> None:
options = json.loads(file.read_text())
if not optionsFile:
# use nix eval to evaluate .#clanOptions
# this will give us the evaluated config with the options attribute
proc = subprocess.run(
[
"nix",
"eval",
"--raw",
".#clanOptions",
],
check=True,
capture_output=True,
text=True,
)
file = proc.stdout.strip()
with open(file) as f:
options = json.load(f)
else:
with open(optionsFile) as f:
options = json.load(f)
return _register_parser(parser, options)
@ -134,10 +179,22 @@ def _register_parser(
# inject callback function to process the input later
parser.set_defaults(
func=lambda args: process_args(
option=args.option, value=args.value, options=options
option=args.option,
value=args.value,
options=options,
out_file=args.out_file,
)
)
# add argument to pass output file
parser.add_argument(
"--out-file",
"-o",
help="Output file",
type=Path,
default=Path("clan-settings.json"),
)
# add single positional argument for the option (e.g. "foo.bar")
parser.add_argument(
"option",

View File

@ -21,6 +21,10 @@
, rsync
}:
let
# This provides dummy options for testing clan config and prevents it from
# evaluating the flake .#
CLAN_OPTIONS_FILE = ./clan_cli/config/jsonschema/options.json;
dependencies = [ argcomplete ];
testDependencies = [
@ -47,6 +51,9 @@ python3.pkgs.buildPythonPackage {
name = "clan-cli";
src = source;
format = "pyproject";
inherit CLAN_OPTIONS_FILE;
nativeBuildInputs = [
setuptools
installShellFiles
@ -55,6 +62,7 @@ python3.pkgs.buildPythonPackage {
passthru.tests = {
clan-mypy = runCommand "clan-mypy" { } ''
export CLAN_OPTIONS_FILE="${CLAN_OPTIONS_FILE}"
cp -r ${source} ./src
chmod +w -R ./src
cd ./src
@ -65,6 +73,7 @@ python3.pkgs.buildPythonPackage {
{
nativeBuildInputs = [ age zerotierone bubblewrap sops nix openssh rsync stdenv.cc ];
} ''
export CLAN_OPTIONS_FILE="${CLAN_OPTIONS_FILE}"
cp -r ${source} ./src
chmod +w -R ./src
cd ./src

View File

@ -20,6 +20,9 @@ pkgs.mkShell {
];
# sets up an editable install and add enty points to $PATH
CLAN_FLAKE = self;
# This provides dummy options for testing clan config and prevents it from
# evaluating the flake .#
CLAN_OPTIONS_FILE = ./clan_cli/config/jsonschema/options.json;
shellHook = ''
tmp_path=$(realpath ./.pythonenv)
repo_root=$(realpath .)

View File

@ -1,6 +1,7 @@
import argparse
import json
import sys
import tempfile
from pathlib import Path
from typing import Any
@ -9,7 +10,7 @@ import pytest
from clan_cli import config
from clan_cli.config import parsing
example_schema = f"{Path(config.__file__).parent}/jsonschema/options.json"
example_options = f"{Path(config.__file__).parent}/jsonschema/options.json"
# use pytest.parametrize
@ -32,15 +33,20 @@ def test_set_some_option(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# monkeypatch sys.argv
monkeypatch.setattr(sys, "argv", [""] + argv)
parser = argparse.ArgumentParser()
config.register_parser(parser=parser, file=Path(example_schema))
args = parser.parse_args()
args.func(args)
captured = capsys.readouterr()
print(captured.out)
json_out = json.loads(captured.out)
assert json_out == expected
# create temporary file for out_file
with tempfile.NamedTemporaryFile() as out_file:
with open(out_file.name, "w") as f:
json.dump({}, f)
monkeypatch.setattr(sys, "argv", ["", "--out-file", out_file.name] + argv)
parser = argparse.ArgumentParser()
config._register_parser(
parser=parser,
options=json.loads(Path(example_options).read_text()),
)
args = parser.parse_args()
args.func(args)
json_out = json.loads(open(out_file.name).read())
assert json_out == expected
def test_walk_jsonschema_all_types() -> None:
@ -148,3 +154,8 @@ def test_type_from_schema_path_dynamic_attrs() -> None:
)
assert parsing.type_from_schema_path(schema, ["age"]) == int
assert parsing.type_from_schema_path(schema, ["users", "foo"]) == str
# test the cast function with simple types
def test_cast_simple() -> None:
assert config.cast(["true"], bool, "foo-option") is True