merge main
Some checks failed
checks-impure / test (pull_request) Successful in 1m42s
checks / test (pull_request) Failing after 2m1s

This commit is contained in:
Johannes Kirschbauer 2023-11-11 15:06:54 +01:00
commit 7a02483534
Signed by: hsjobeki
GPG Key ID: F62ED8B8BF204685
62 changed files with 1380 additions and 652 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
.direnv
***/.hypothesis
.coverage.*
**/qubeclan
**/testdir

View File

@ -1,6 +1,6 @@
{ ... }: {
perSystem = { pkgs, lib, ... }: {
packages = {
packages = rec {
# a script that executes all other checks
impure-checks = pkgs.writeShellScriptBin "impure-checks" ''
#!${pkgs.bash}/bin/bash
@ -13,7 +13,53 @@
]}"
ROOT=$(git rev-parse --show-toplevel)
cd "$ROOT/pkgs/clan-cli"
nix develop "$ROOT#clan-cli" -c bash -c 'TMPDIR=/tmp python -m pytest -m impure -s ./tests'
nix develop "$ROOT#clan-cli" -c bash -c "TMPDIR=/tmp python -m pytest -m impure -s ./tests $@"
'';
runMockApi = pkgs.writeShellScriptBin "run-mock-api" ''
#!${pkgs.bash}/bin/bash
set -euo pipefail
export PATH="${lib.makeBinPath [
pkgs.gitMinimal
pkgs.nix
pkgs.rsync # needed to have rsync installed on the dummy ssh server
pkgs.coreutils
pkgs.procps
]}"
ROOT=$(git rev-parse --show-toplevel)
cd "$ROOT/pkgs/clan-cli"
nix develop "$ROOT#clan-cli" -c bash -c 'TMPDIR=/tmp python -m clan_cli webui --no-open --port 2979'
'';
runSchemaTests = pkgs.writeShellScriptBin "runSchemaTests" ''
#!${pkgs.bash}/bin/bash
set -euo pipefail
${runMockApi}/bin/run-mock-api &
MOCK_API_PID=$!
echo "Started mock api with pid $MOCK_API_PID"
function cleanup {
echo "Stopping server..."
pkill -9 -f "python -m clan_cli webui"
}
trap cleanup EXIT
export PATH="${lib.makeBinPath [
pkgs.gitMinimal
pkgs.nix
pkgs.rsync # needed to have rsync installed on the dummy ssh server
pkgs.procps
pkgs.coreutils
]}"
sleep 3
ROOT=$(git rev-parse --show-toplevel)
cd "$ROOT/pkgs/clan-cli"
nix develop "$ROOT#clan-cli" -c bash -c 'TMPDIR=/tmp st auth login RHtr8nLtz77tqRP8yUGyf-Flv_9SLI'
nix develop "$ROOT#clan-cli" -c bash -c 'TMPDIR=/tmp st run http://localhost:2979/openapi.json --experimental=openapi-3.1 --report --workers 8 --max-response-time=50 --request-timeout=1000 -M GET'
'';
};
};

View File

@ -1,13 +1,11 @@
{ self, lib, ... }: {
{ inputs, ... }: {
flake.clanModules = {
diskLayouts = lib.mapAttrs'
(name: _: lib.nameValuePair (lib.removeSuffix ".nix" name) {
imports = [
self.inputs.disko.nixosModules.disko
./diskLayouts/${name}
];
})
(builtins.readDir ./diskLayouts);
diskLayouts = {
imports = [
./diskLayouts.nix
inputs.disko.nixosModules.default
];
};
deltachat = ./deltachat.nix;
xfce = ./xfce.nix;
};

View File

@ -172,6 +172,14 @@ nix build .#checks.x86_64-linux.clan-pytest --rebuild
This command will run all pure test functions.
### Running schemathesis fuzzer on GET requests
```bash
nix run .#runSchemaTests
```
If you want to test more request types edit the file `checks/impure/flake-module.nix`
### Inspecting the Nix Sandbox
If you need to inspect the Nix sandbox while running tests, follow these steps:
@ -192,6 +200,8 @@ If you need to inspect the Nix sandbox while running tests, follow these steps:
These debugging and testing methods will help you identify and fix issues in your backend code efficiently, ensuring the reliability and robustness of your application.
For more information on testing read [property and contract based testing](testing.md)
# Using this Template
To make the most of this template:

111
docs/testing.md Normal file
View File

@ -0,0 +1,111 @@
# Property vs Contract based testing
In this section, we'll explore the importance of testing the backend of your FastAPI application, specifically focusing on the advantages of using contract-based testing with property-based testing frameworks.
## Why Use Property-Based Testing?
Property-based testing is a powerful approach to test your APIs, offering several key benefits:
### 1. Scope
Instead of having to write numerous test cases for various input arguments, property-based testing enables you to test a range of arguments for each parameter using a single test. This approach significantly enhances the robustness of your test suite while reducing redundancy in your testing code. In short, your test code becomes cleaner, more DRY (Don't Repeat Yourself), and more efficient. It also becomes more effective as you can easily test numerous edge cases.
### 2. Reproducibility
Property-based testing tools retain test cases and their results, allowing you to reproduce and replay tests in case of failure. This feature is invaluable for debugging and ensuring the stability of your application over time.
## Frameworks for Property-Based Testing
To implement property-based testing in FastAPI, you can use the following framework:
- [Hypothesis: Property-Based Testing](https://hypothesis.readthedocs.io/en/latest/quickstart.html)
- [Schemathesis](https://schemathesis.readthedocs.io/en/stable/#id2)
## Example
Running schemathesis fuzzer on GET requests
```bash
nix run .#runSchemaTests
```
If you want to test more request types edit the file [flake-module.nix](../checks/impure/flake-module.nix)
After a run it will upload the results to `schemathesis.io` and give you a link to the report.
The credentials to the account are `Username: schemathesis@qube.email` and `Password:6tv4eP96WXsarF`
## Why Schemas Are Not Contracts
A schema is a description of the data structure of your API, whereas a contract defines not only the structure but also the expected behavior and constraints. The following resource explains why schemas are not contracts in more detail:
- [Why Schemas Are Not Contracts](https://pactflow.io/blog/schemas-are-not-contracts/)
In a nutshell, schemas may define the data structure but often fail to capture complex constraints and the expected interactions between different API endpoints. Contracts fill this gap by specifying both the structure and behavior of your API.
## Why Use Contract-Driven Testing?
Contract-driven testing combines the benefits of type annotations and property-based testing, providing a robust approach to ensuring the correctness of your APIs.
- Contracts become an integral part of the function signature and can be checked statically, ensuring that the API adheres to the defined contract.
- Contracts, like property-based tests, allow you to specify conditions and constraints, with the testing framework automatically generating test cases and verifying call results.
### Frameworks for Contract-Driven Testing
To implement contract-driven testing in FastAPI, consider the following framework and extension:
- [Deal: Contract Driven Development](https://deal.readthedocs.io/)
By adopting contract-driven testing, you can ensure that your FastAPI application not only has a well-defined structure but also behaves correctly, making it more robust and reliable.
- [Whitepaper: Python by contract](https://users.ece.utexas.edu/~gligoric/papers/ZhangETAL22PythonByContractDataset.pdf) This paper goes more into detail how it works
## Examples
You can annotate functions with `@deal.raises(ClanError)` to say that they can _only_ raise a ClanError Exception.
```python
import deal
@deal.raises(ClanError)
def get_task(uuid: UUID) -> BaseTask:
global POOL
return POOL[uuid]
```
To say that it can raise multiple exceptions just add after one another separated with a `,`
```python
import deal
@deal.raises(ClanError, IndexError, ZeroDivisionError)
def get_task(uuid: UUID) -> BaseTask:
global POOL
return POOL[uuid]
```
### Adding deal annotated functions to pytest
```python
from clan_cli.task_manager import get_task
import deal
@deal.cases(get_task) # <--- Add function get_task to testing corpus
def test_get_task(case: deal.TestCase) -> None:
case() # <--- Call testing framework with function
```
### Adding example input for deeper testing
You can combine hypothesis annotations with deal annotations to add example inputs to the function so that the verifier can reach deeper parts of the function.
```python
import deal
@deal.example(lambda: get_task(UUID("5c2061e0-4512-4b30-aa8e-7be4a75b8b45"))) # type: ignore
@deal.example(lambda: get_task(UUID("7c2061e6-4512-4b30-aa8e-7be4a75b8b45"))) # type: ignore
@deal.raises(ClanError)
def get_task(uuid: UUID) -> BaseTask:
global POOL
return POOL[uuid]
```
You can also add `pre` and `post` conditions. A `pre` condition must be true before the function is executed. A `post` condition must be true after the function was executed. For more information read the [Writing Contracts Section](https://deal.readthedocs.io/basic/values.html).
Or read the [API doc of Deal](https://deal.readthedocs.io/details/api.html)

View File

@ -98,16 +98,32 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1699007274,
"narHash": "sha256-m0NH2trnW8cOhona6m3hWkeDZ28BV/wAGPd/YWik23g=",
"owner": "Mic92",
"lastModified": 1699354722,
"narHash": "sha256-abmqUReg4PsyQSwv4d0zjcWpMHrd3IFJiTb2tZpfF04=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "fcb19bae00e9d3fd5ecf4a1f80cf33248bf7f714",
"rev": "cfbb29d76949ae53c457f152c52c173ea4bdd862",
"type": "github"
},
"original": {
"owner": "Mic92",
"ref": "deltachat",
"owner": "NixOS",
"ref": "nixos-unstable-small",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-for-deal": {
"locked": {
"lastModified": 1699470058,
"narHash": "sha256-//c1SEENoNFEDtp8x5lokNxsU9lZjyNkEf5k3OJADTs=",
"owner": "Luis-Hebendanz",
"repo": "nixpkgs",
"rev": "842a157b727ad9712d41a80d1e1564e4e6bbe697",
"type": "github"
},
"original": {
"owner": "Luis-Hebendanz",
"ref": "fix_python_deal",
"repo": "nixpkgs",
"type": "github"
}
@ -119,6 +135,7 @@
"floco": "floco",
"nixos-generators": "nixos-generators",
"nixpkgs": "nixpkgs",
"nixpkgs-for-deal": "nixpkgs-for-deal",
"sops-nix": "sops-nix",
"treefmt-nix": "treefmt-nix"
}

View File

@ -5,9 +5,11 @@
nixConfig.extra-trusted-public-keys = [ "cache.clan.lol-1:3KztgSAB5R1M+Dz7vzkBGzXdodizbgLXGXKXlcQLA28=" ];
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable-small";
#nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable-small";
# https://github.com/NixOS/nixpkgs/pull/265024
nixpkgs.url = "github:Mic92/nixpkgs/deltachat";
# https://github.com/NixOS/nixpkgs/pull/265872
nixpkgs-for-deal.url = "github:Luis-Hebendanz/nixpkgs/fix_python_deal";
floco.url = "github:aakropotkin/floco";
floco.inputs.nixpkgs.follows = "nixpkgs";
disko.url = "github:nix-community/disko";
@ -31,6 +33,7 @@
"aarch64-darwin"
];
imports = [
./checks/flake-module.nix
./devShell.nix
./formatter.nix

View File

@ -1,9 +0,0 @@
{ lib, ... }: {
options.clan.bloatware = lib.mkOption {
type = lib.types.submodule {
imports = [
../../../lib/jsonschema/example-interface.nix
];
};
};
}

View File

@ -7,7 +7,6 @@
./networking.nix
inputs.sops-nix.nixosModules.sops
# just some example options. Can be removed later
./bloatware
./vm.nix
./options.nix
];

View File

@ -18,7 +18,8 @@
};
options.clanCore.secretsUploadDirectory = lib.mkOption {
type = lib.types.path;
type = lib.types.nullOr lib.types.path;
default = null;
description = ''
The directory where secrets are uploaded into, This is backend specific.
'';

View File

@ -33,7 +33,6 @@ in
import sys
from clan_cli.secrets.sops_generate import generate_secrets_from_nix
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; secret_submodules = config.clanCore.secrets; })})
args["flake_name"] = sys.argv[1]
generate_secrets_from_nix(**args)
'';
uploadSecrets = pkgs.writeScript "upload-secrets" ''
@ -43,7 +42,6 @@ in
from clan_cli.secrets.sops_generate import upload_age_key_from_nix
# the second toJSON is needed to escape the string for the python
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; })})
args["flake_name"] = sys.argv[1]
upload_age_key_from_nix(**args)
'';
};

View File

@ -94,16 +94,38 @@ in
# only the controller needs to have the key in the repo, the other clients can be dynamic
# we generate the zerotier code manually for the controller, since it's part of the bootstrap command
clanCore.secrets.zerotier = {
facts.zerotier-ip = { };
facts.zerotier-meshname = { };
facts.zerotier-network-id = { };
secrets.zerotier-identity-secret = { };
generator = ''
export PATH=${lib.makeBinPath [ config.services.zerotierone.package pkgs.fakeroot ]}
${pkgs.python3.interpreter} ${./generate-network.py} "$facts/zerotier-network-id" "$secrets/zerotier-identity-secret"
${pkgs.python3.interpreter} ${./generate.py} --mode network \
--ip "$facts/zerotier-ip" \
--meshname "$facts/zerotier-meshname" \
--identity-secret "$secrets/zerotier-identity-secret" \
--network-id "$facts/zerotier-network-id"
'';
};
environment.systemPackages = [ config.clanCore.clanPkgs.zerotier-members ];
})
(lib.mkIf ((config.clanCore.secrets ? zerotier) && (facts.zerotier-network-id.value != null)) {
(lib.mkIf (config.clanCore.secretsUploadDirectory != null && !cfg.controller.enable && cfg.networkId != null) {
clanCore.secrets.zerotier = {
facts.zerotier-ip = { };
facts.zerotier-meshname = { };
secrets.zerotier-identity-secret = { };
generator = ''
export PATH=${lib.makeBinPath [ config.services.zerotierone.package ]}
${pkgs.python3.interpreter} ${./generate.py} --mode identity \
--ip "$facts/zerotier-ip" \
--meshname "$facts/zerotier-meshname" \
--identity-secret "$secrets/zerotier-identity-secret" \
--network-id ${cfg.networkId}
'';
};
})
(lib.mkIf (cfg.controller.enable && config.clanCore.secrets ? zerotier && facts.zerotier-network-id.value != null) {
clan.networking.zerotier.networkId = facts.zerotier-network-id.value;
environment.etc."zerotier/network-id".text = facts.zerotier-network-id.value;

View File

@ -1,11 +1,14 @@
import argparse
import base64
import contextlib
import ipaddress
import json
import socket
import subprocess
import time
import urllib.request
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Iterator, Optional
@ -41,12 +44,25 @@ def find_free_port() -> Optional[int]:
return sock.getsockname()[1]
class Identity:
def __init__(self, path: Path) -> None:
self.public = (path / "identity.public").read_text()
self.private = (path / "identity.secret").read_text()
def node_id(self) -> str:
nid = self.public.split(":")[0]
assert (
len(nid) == 10
), f"node_id must be 10 characters long, got {len(nid)}: {nid}"
return nid
class ZerotierController:
def __init__(self, port: int, home: Path) -> None:
self.port = port
self.home = home
self.authtoken = (home / "authtoken.secret").read_text()
self.secret = (home / "identity.secret").read_text()
self.identity = Identity(home)
def _http_request(
self,
@ -70,10 +86,10 @@ class ZerotierController:
return self._http_request("/status")
def create_network(self, data: dict[str, Any] = {}) -> dict[str, Any]:
identity = (self.home / "identity.public").read_text()
node_id = identity.split(":")[0]
return self._http_request(
f"/controller/network/{node_id}______", method="POST", data=data
f"/controller/network/{self.identity.node_id()}______",
method="POST",
data=data,
)
def get_network(self, id: str) -> dict[str, Any]:
@ -118,25 +134,91 @@ def zerotier_controller() -> Iterator[ZerotierController]:
p.wait()
@dataclass
class NetworkController:
networkid: str
identity: Identity
# TODO: allow merging more network configuration here
def create_network() -> dict:
def create_network_controller() -> NetworkController:
with zerotier_controller() as controller:
network = controller.create_network()
return {
"secret": controller.secret,
"networkid": network["nwid"],
}
return NetworkController(network["nwid"], controller.identity)
def create_identity() -> Identity:
with TemporaryDirectory() as d:
tmpdir = Path(d)
private = tmpdir / "identity.secret"
public = tmpdir / "identity.public"
subprocess.run(["zerotier-idtool", "generate", private, public])
return Identity(tmpdir)
def compute_zerotier_ip(network_id: str, identity: Identity) -> ipaddress.IPv6Address:
assert (
len(network_id) == 16
), "network_id must be 16 characters long, got {network_id}"
nwid = int(network_id, 16)
node_id = int(identity.node_id(), 16)
addr_parts = bytearray(
[
0xFD,
(nwid >> 56) & 0xFF,
(nwid >> 48) & 0xFF,
(nwid >> 40) & 0xFF,
(nwid >> 32) & 0xFF,
(nwid >> 24) & 0xFF,
(nwid >> 16) & 0xFF,
(nwid >> 8) & 0xFF,
(nwid) & 0xFF,
0x99,
0x93,
(node_id >> 32) & 0xFF,
(node_id >> 24) & 0xFF,
(node_id >> 16) & 0xFF,
(node_id >> 8) & 0xFF,
(node_id) & 0xFF,
]
)
return ipaddress.IPv6Address(bytes(addr_parts))
def compute_zerotier_meshname(ip: ipaddress.IPv6Address) -> str:
return base64.b32encode(ip.packed)[0:26].decode("ascii").lower()
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("network_id")
parser.add_argument("identity_secret")
parser.add_argument(
"--mode", choices=["network", "identity"], required=True, type=str
)
parser.add_argument("--ip", type=Path, required=True)
parser.add_argument("--meshname", type=Path, required=True)
parser.add_argument("--identity-secret", type=Path, required=True)
parser.add_argument("--network-id", type=str, required=False)
args = parser.parse_args()
zerotier = create_network()
Path(args.network_id).write_text(zerotier["networkid"])
Path(args.identity_secret).write_text(zerotier["secret"])
match args.mode:
case "network":
if args.network_id is None:
raise ValueError("network_id parameter is required")
controller = create_network_controller()
identity = controller.identity
network_id = controller.networkid
Path(args.network_id).write_text(network_id)
case "identity":
identity = create_identity()
network_id = args.network_id
case _:
raise ValueError(f"unknown mode {args.mode}")
ip = compute_zerotier_ip(network_id, identity)
meshname = compute_zerotier_meshname(ip)
args.identity_secret.write_text(identity.private)
args.ip.write_text(ip.compressed)
args.meshname.write_text(meshname)
if __name__ == "__main__":

View File

@ -6,6 +6,7 @@ from typing import Any, Optional, Sequence
from . import config, flakes, join, machines, secrets, vms, webui
from .custom_logger import setup_logging
from .dirs import get_clan_flake_toplevel
from .ssh import cli as ssh_cli
log = logging.getLogger(__name__)
@ -53,6 +54,12 @@ def create_parser(prog: Optional[str] = None) -> argparse.ArgumentParser:
default=[],
)
parser.add_argument(
"--flake",
help="path to the flake where the clan resides in",
default=None,
)
subparsers = parser.add_subparsers()
parser_flake = subparsers.add_parser(
@ -100,6 +107,9 @@ def main() -> None:
setup_logging(logging.DEBUG)
log.debug("Debug log activated")
if args.flake is None:
args.flake = get_clan_flake_toplevel()
if not hasattr(args, "func"):
return

View File

@ -1,7 +1,6 @@
import json
import os
import subprocess
import sys
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Optional
@ -10,7 +9,6 @@ from fastapi import HTTPException
from clan_cli.dirs import (
machine_settings_file,
nixpkgs_source,
specific_flake_dir,
specific_machine_dir,
)
@ -91,51 +89,3 @@ def set_config_for_machine(
if repo_dir is not None:
commit_file(settings_path, repo_dir)
def schema_for_machine(
flake_name: FlakeName, machine_name: str, config: Optional[dict] = None
) -> dict:
flake = specific_flake_dir(flake_name)
# use nix eval to lib.evalModules .#nixosConfigurations.<machine_name>.options.clan
with NamedTemporaryFile(mode="w", dir=flake) as clan_machine_settings_file:
env = os.environ.copy()
inject_config_flags = []
if config is not None:
json.dump(config, clan_machine_settings_file, indent=2)
clan_machine_settings_file.seek(0)
env["CLAN_MACHINE_SETTINGS_FILE"] = clan_machine_settings_file.name
inject_config_flags = [
"--impure", # needed to access CLAN_MACHINE_SETTINGS_FILE
]
proc = subprocess.run(
nix_eval(
flags=inject_config_flags
+ [
"--impure",
"--show-trace",
"--expr",
f"""
let
flake = builtins.getFlake (toString {flake});
lib = import {nixpkgs_source()}/lib;
options = flake.nixosConfigurations.{machine_name}.options;
clanOptions = options.clan;
jsonschemaLib = import {Path(__file__).parent / "jsonschema"} {{ inherit lib; }};
jsonschema = jsonschemaLib.parseOptions clanOptions;
in
jsonschema
""",
],
),
capture_output=True,
text=True,
cwd=flake,
env=env,
)
if proc.returncode != 0:
print(proc.stderr, file=sys.stderr)
raise Exception(
f"Failed to read schema for machine {machine_name}:\n{proc.stderr}"
)
return json.loads(proc.stdout)

View File

@ -0,0 +1,122 @@
import json
import os
import subprocess
import sys
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Optional
from fastapi import HTTPException
from clan_cli.dirs import (
nixpkgs_source,
specific_flake_dir,
)
from clan_cli.errors import ClanError
from clan_cli.nix import nix_eval
from ..types import FlakeName
def machine_schema(
flake_name: FlakeName,
config: dict,
clan_imports: Optional[list[str]] = None,
) -> dict:
flake = specific_flake_dir(flake_name)
# use nix eval to lib.evalModules .#nixosConfigurations.<machine_name>.options.clan
with NamedTemporaryFile(mode="w", dir=flake) as clan_machine_settings_file:
env = os.environ.copy()
if clan_imports is not None:
config["clanImports"] = clan_imports
# dump config to file
json.dump(config, clan_machine_settings_file, indent=2)
clan_machine_settings_file.seek(0)
env["CLAN_MACHINE_SETTINGS_FILE"] = clan_machine_settings_file.name
# ensure that the requested clanImports exist
proc = subprocess.run(
nix_eval(
flags=[
"--impure",
"--show-trace",
"--expr",
f"""
let
b = builtins;
system = b.currentSystem;
flake = b.getFlake (toString {flake});
clan-core = flake.inputs.clan-core;
config = b.fromJSON (b.readFile (b.getEnv "CLAN_MACHINE_SETTINGS_FILE"));
modules_not_found =
b.filter
(modName: ! clan-core.clanModules ? ${{modName}})
config.clanImports or [];
in
modules_not_found
""",
]
),
capture_output=True,
text=True,
cwd=flake,
env=env,
)
if proc.returncode != 0:
print(proc.stderr, file=sys.stderr)
raise ClanError(
f"Failed to check clanImports for existence:\n{proc.stderr}"
)
modules_not_found = json.loads(proc.stdout)
if len(modules_not_found) > 0:
raise HTTPException(
status_code=400,
detail={
"msg": "Some requested clan modules could not be found",
"modules_not_found": modules_not_found,
},
)
# get the schema
proc = subprocess.run(
nix_eval(
flags=[
"--impure",
"--show-trace",
"--expr",
f"""
let
system = builtins.currentSystem;
flake = builtins.getFlake (toString {flake});
clan-core = flake.inputs.clan-core;
nixpkgsSrc = flake.inputs.nixpkgs or {nixpkgs_source()};
lib = import (nixpkgsSrc + /lib);
pkgs = import nixpkgsSrc {{ inherit system; }};
config = lib.importJSON (builtins.getEnv "CLAN_MACHINE_SETTINGS_FILE");
fakeMachine = pkgs.nixos {{
imports =
[
clan-core.nixosModules.clanCore
# potentially the config might affect submodule options,
# therefore we need to import it
config
]
# add all clan modules specified via clanImports
++ (map (name: clan-core.clanModules.${{name}}) config.clanImports or []);
}};
clanOptions = fakeMachine.options.clan;
jsonschemaLib = import {Path(__file__).parent / "jsonschema"} {{ inherit lib; }};
jsonschema = jsonschemaLib.parseOptions clanOptions;
in
jsonschema
""",
],
),
capture_output=True,
text=True,
cwd=flake,
env=env,
)
if proc.returncode != 0:
print(proc.stderr, file=sys.stderr)
raise ClanError(f"Failed to read schema:\n{proc.stderr}")
return json.loads(proc.stdout)

View File

@ -0,0 +1,18 @@
from types import ModuleType
from typing import Callable
class FakeDeal:
def __getattr__(self, _name: str) -> "FakeDeal":
return FakeDeal()
def __call__(self, func: Callable) -> Callable:
return func
try:
import deal as real_deal
deal: ModuleType | FakeDeal = real_deal
except ImportError:
deal = FakeDeal()

View File

@ -51,7 +51,7 @@ def breakpoint_container(
def breakpoint_shell(
work_dir: Path,
work_dir: Path = Path(os.getcwd()),
env: Optional[Dict[str, str]] = None,
cmd: Optional[List[str]] = None,
) -> None:

View File

@ -10,7 +10,7 @@ from .types import FlakeName
log = logging.getLogger(__name__)
def get_clan_flake_toplevel() -> Path:
def get_clan_flake_toplevel() -> Optional[Path]:
return find_toplevel([".clan-flake", ".git", ".hg", ".svn", "flake.nix"])
@ -21,7 +21,7 @@ def find_git_repo_root() -> Optional[Path]:
return None
def find_toplevel(top_level_files: list[str]) -> Path:
def find_toplevel(top_level_files: list[str]) -> Optional[Path]:
"""Returns the path to the toplevel of the clan flake"""
for project_file in top_level_files:
initial_path = Path(os.getcwd())
@ -30,7 +30,7 @@ def find_toplevel(top_level_files: list[str]) -> Path:
if (path / project_file).exists():
return path
path = path.parent
raise ClanError("Could not find clan flake toplevel directory")
return None
def user_config_dir() -> Path:
@ -42,43 +42,22 @@ def user_config_dir() -> Path:
return Path(os.getenv("XDG_CONFIG_HOME", os.path.expanduser("~/.config")))
def user_data_dir() -> Path:
if sys.platform == "win32":
return Path(os.getenv("APPDATA", os.path.expanduser("~\\AppData\\Roaming\\")))
elif sys.platform == "darwin":
return Path(os.path.expanduser("~/Library/Application Support/"))
else:
return Path(os.getenv("XDG_DATA_HOME", os.path.expanduser("~/.local/state")))
def clan_data_dir() -> Path:
path = user_data_dir() / "clan"
if not path.exists():
log.debug(f"Creating path with parents {path}")
path.mkdir(parents=True)
return path.resolve()
def clan_config_dir() -> Path:
path = user_config_dir() / "clan"
if not path.exists():
log.debug(f"Creating path with parents {path}")
path.mkdir(parents=True)
path.mkdir(parents=True, exist_ok=True)
return path.resolve()
def clan_flakes_dir() -> Path:
path = clan_data_dir() / "flake"
if not path.exists():
log.debug(f"Creating path with parents {path}")
path.mkdir(parents=True)
path = clan_config_dir() / "flakes"
path.mkdir(parents=True, exist_ok=True)
return path.resolve()
def specific_flake_dir(flake_name: FlakeName) -> Path:
flake_dir = clan_flakes_dir() / flake_name
if not flake_dir.exists():
raise ClanError(f"Flake '{flake_name}' does not exist in {flake_dir}")
raise ClanError(f"Flake '{flake_name}' does not exist in {clan_flakes_dir()}")
return flake_dir

View File

@ -16,7 +16,7 @@ def install_nixos(machine: Machine, flake_name: FlakeName) -> None:
flake_attr = h.meta.get("flake_attr", "")
generate_secrets(machine, flake_name)
generate_secrets(machine)
with TemporaryDirectory() as tmpdir_:
tmpdir = Path(tmpdir_)

View File

@ -71,7 +71,7 @@ class Machine:
env["SECRETS_DIR"] = str(secrets_dir)
print(f"uploading secrets... {self.upload_secrets}")
proc = subprocess.run(
[self.upload_secrets, self.flake_dir.name],
[self.upload_secrets],
env=env,
stdout=subprocess.PIPE,
text=True,

View File

@ -4,13 +4,12 @@ import os
import subprocess
from pathlib import Path
from ..dirs import get_clan_flake_toplevel
from ..errors import ClanError
from ..machines.machines import Machine
from ..nix import nix_build, nix_command, nix_config
from ..secrets.generate import generate_secrets
from ..secrets.upload import upload_secrets
from ..ssh import Host, HostGroup, HostKeyCheck, parse_deployment_address
from ..types import FlakeName
def deploy_nixos(hosts: HostGroup, clan_dir: Path) -> None:
@ -41,7 +40,7 @@ def deploy_nixos(hosts: HostGroup, clan_dir: Path) -> None:
flake_attr = h.meta.get("flake_attr", "")
generate_secrets(h.meta["machine"], FlakeName(clan_dir.name))
generate_secrets(h.meta["machine"])
upload_secrets(h.meta["machine"])
target_host = h.meta.get("target_host")
@ -117,11 +116,9 @@ def get_selected_machines(machine_names: list[str], flake_dir: Path) -> HostGrou
# FIXME: we want some kind of inventory here.
def update(args: argparse.Namespace) -> None:
if args.flake is None:
flake_dir = get_clan_flake_toplevel()
else:
flake_dir = args.flake
raise ClanError("Could not find clan flake toplevel directory")
if len(args.machines) == 1 and args.target_host is not None:
machine = Machine(name=args.machines[0], flake_dir=flake_dir)
machine = Machine(name=args.machines[0], flake_dir=args.flake)
machine.deployment_address = args.target_host
host = parse_deployment_address(
args.machines[0],
@ -135,11 +132,11 @@ def update(args: argparse.Namespace) -> None:
exit(1)
else:
if len(args.machines) == 0:
machines = get_all_machines(flake_dir)
machines = get_all_machines(args.flake)
else:
machines = get_selected_machines(args.machines, flake_dir)
machines = get_selected_machines(args.machines, args.flake)
deploy_nixos(machines, flake_dir)
deploy_nixos(machines, args.flake)
def register_update_parser(parser: argparse.ArgumentParser) -> None:

View File

@ -3,18 +3,16 @@ import shutil
from pathlib import Path
from typing import Callable
from ..dirs import specific_flake_dir
from ..errors import ClanError
from ..types import FlakeName
def get_sops_folder(flake_name: FlakeName) -> Path:
return specific_flake_dir(flake_name) / "sops"
def get_sops_folder(flake_dir: Path) -> Path:
return flake_dir / "sops"
def gen_sops_subfolder(subdir: str) -> Callable[[FlakeName], Path]:
def folder(flake_name: FlakeName) -> Path:
return specific_flake_dir(flake_name) / "sops" / subdir
def gen_sops_subfolder(subdir: str) -> Callable[[Path], Path]:
def folder(flake_dir: Path) -> Path:
return flake_dir / "sops" / subdir
return folder

View File

@ -6,21 +6,19 @@ import sys
from clan_cli.errors import ClanError
from ..dirs import specific_flake_dir
from ..machines.machines import Machine
from ..types import FlakeName
log = logging.getLogger(__name__)
def generate_secrets(machine: Machine, flake_name: FlakeName) -> None:
def generate_secrets(machine: Machine) -> None:
env = os.environ.copy()
env["CLAN_DIR"] = str(machine.flake_dir)
env["PYTHONPATH"] = ":".join(sys.path) # TODO do this in the clanCore module
print(f"generating secrets... {machine.generate_secrets}")
proc = subprocess.run(
[machine.generate_secrets, flake_name],
[machine.generate_secrets],
env=env,
)
@ -31,8 +29,8 @@ def generate_secrets(machine: Machine, flake_name: FlakeName) -> None:
def generate_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake))
generate_secrets(machine, args.flake)
machine = Machine(name=args.machine, flake_dir=args.flake)
generate_secrets(machine)
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
@ -40,9 +38,4 @@ def register_generate_parser(parser: argparse.ArgumentParser) -> None:
"machine",
help="The machine to generate secrets for",
)
parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser.set_defaults(func=generate_command)

View File

@ -4,7 +4,6 @@ from pathlib import Path
from ..errors import ClanError
from ..machines.types import machine_name_type, validate_hostname
from ..types import FlakeName
from . import secrets
from .folders import (
sops_groups_folder,
@ -21,27 +20,27 @@ from .types import (
)
def machines_folder(flake_name: FlakeName, group: str) -> Path:
return sops_groups_folder(flake_name) / group / "machines"
def machines_folder(flake_dir: Path, group: str) -> Path:
return sops_groups_folder(flake_dir) / group / "machines"
def users_folder(flake_name: FlakeName, group: str) -> Path:
return sops_groups_folder(flake_name) / group / "users"
def users_folder(flake_dir: Path, group: str) -> Path:
return sops_groups_folder(flake_dir) / group / "users"
class Group:
def __init__(
self, flake_name: FlakeName, name: str, machines: list[str], users: list[str]
self, flake_dir: Path, name: str, machines: list[str], users: list[str]
) -> None:
self.name = name
self.machines = machines
self.users = users
self.flake_name = flake_name
self.flake_dir = flake_dir
def list_groups(flake_name: FlakeName) -> list[Group]:
def list_groups(flake_dir: Path) -> list[Group]:
groups: list[Group] = []
folder = sops_groups_folder(flake_name)
folder = sops_groups_folder(flake_dir)
if not folder.exists():
return groups
@ -49,24 +48,24 @@ def list_groups(flake_name: FlakeName) -> list[Group]:
group_folder = folder / name
if not group_folder.is_dir():
continue
machines_path = machines_folder(flake_name, name)
machines_path = machines_folder(flake_dir, name)
machines = []
if machines_path.is_dir():
for f in machines_path.iterdir():
if validate_hostname(f.name):
machines.append(f.name)
users_path = users_folder(flake_name, name)
users_path = users_folder(flake_dir, name)
users = []
if users_path.is_dir():
for f in users_path.iterdir():
if VALID_USER_NAME.match(f.name):
users.append(f.name)
groups.append(Group(flake_name, name, machines, users))
groups.append(Group(flake_dir, name, machines, users))
return groups
def list_command(args: argparse.Namespace) -> None:
for group in list_groups(args.flake):
for group in list_groups(Path(args.flake)):
print(group.name)
if group.machines:
print("machines:")
@ -88,9 +87,9 @@ def list_directory(directory: Path) -> str:
return msg
def update_group_keys(flake_name: FlakeName, group: str) -> None:
for secret_ in secrets.list_secrets(flake_name):
secret = sops_secrets_folder(flake_name) / secret_
def update_group_keys(flake_dir: Path, group: str) -> None:
for secret_ in secrets.list_secrets(flake_dir):
secret = sops_secrets_folder(flake_dir) / secret_
if (secret / "groups" / group).is_symlink():
update_keys(
secret,
@ -99,7 +98,7 @@ def update_group_keys(flake_name: FlakeName, group: str) -> None:
def add_member(
flake_name: FlakeName, group_folder: Path, source_folder: Path, name: str
flake_dir: Path, group_folder: Path, source_folder: Path, name: str
) -> None:
source = source_folder / name
if not source.exists():
@ -115,10 +114,10 @@ def add_member(
)
os.remove(user_target)
user_target.symlink_to(os.path.relpath(source, user_target.parent))
update_group_keys(flake_name, group_folder.parent.name)
update_group_keys(flake_dir, group_folder.parent.name)
def remove_member(flake_name: FlakeName, group_folder: Path, name: str) -> None:
def remove_member(flake_dir: Path, group_folder: Path, name: str) -> None:
target = group_folder / name
if not target.exists():
msg = f"{name} does not exist in group in {group_folder}: "
@ -127,7 +126,7 @@ def remove_member(flake_name: FlakeName, group_folder: Path, name: str) -> None:
os.remove(target)
if len(os.listdir(group_folder)) > 0:
update_group_keys(flake_name, group_folder.parent.name)
update_group_keys(flake_dir, group_folder.parent.name)
if len(os.listdir(group_folder)) == 0:
os.rmdir(group_folder)
@ -136,65 +135,65 @@ def remove_member(flake_name: FlakeName, group_folder: Path, name: str) -> None:
os.rmdir(group_folder.parent)
def add_user(flake_name: FlakeName, group: str, name: str) -> None:
def add_user(flake_dir: Path, group: str, name: str) -> None:
add_member(
flake_name, users_folder(flake_name, group), sops_users_folder(flake_name), name
flake_dir, users_folder(flake_dir, group), sops_users_folder(flake_dir), name
)
def add_user_command(args: argparse.Namespace) -> None:
add_user(args.flake, args.group, args.user)
add_user(Path(args.flake), args.group, args.user)
def remove_user(flake_name: FlakeName, group: str, name: str) -> None:
remove_member(flake_name, users_folder(flake_name, group), name)
def remove_user(flake_dir: Path, group: str, name: str) -> None:
remove_member(flake_dir, users_folder(flake_dir, group), name)
def remove_user_command(args: argparse.Namespace) -> None:
remove_user(args.flake, args.group, args.user)
remove_user(Path(args.flake), args.group, args.user)
def add_machine(flake_name: FlakeName, group: str, name: str) -> None:
def add_machine(flake_dir: Path, group: str, name: str) -> None:
add_member(
flake_name,
machines_folder(flake_name, group),
sops_machines_folder(flake_name),
flake_dir,
machines_folder(flake_dir, group),
sops_machines_folder(flake_dir),
name,
)
def add_machine_command(args: argparse.Namespace) -> None:
add_machine(args.flake, args.group, args.machine)
add_machine(Path(args.flake), args.group, args.machine)
def remove_machine(flake_name: FlakeName, group: str, name: str) -> None:
remove_member(flake_name, machines_folder(flake_name, group), name)
def remove_machine(flake_dir: Path, group: str, name: str) -> None:
remove_member(flake_dir, machines_folder(flake_dir, group), name)
def remove_machine_command(args: argparse.Namespace) -> None:
remove_machine(args.flake, args.group, args.machine)
remove_machine(Path(args.flake), args.group, args.machine)
def add_group_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument("group", help="the name of the secret", type=group_name_type)
def add_secret(flake_name: FlakeName, group: str, name: str) -> None:
def add_secret(flake_dir: Path, group: str, name: str) -> None:
secrets.allow_member(
secrets.groups_folder(flake_name, name), sops_groups_folder(flake_name), group
secrets.groups_folder(flake_dir, name), sops_groups_folder(flake_dir), group
)
def add_secret_command(args: argparse.Namespace) -> None:
add_secret(args.flake, args.group, args.secret)
add_secret(Path(args.flake), args.group, args.secret)
def remove_secret(flake_name: FlakeName, group: str, name: str) -> None:
secrets.disallow_member(secrets.groups_folder(flake_name, name), group)
def remove_secret(flake_dir: Path, group: str, name: str) -> None:
secrets.disallow_member(secrets.groups_folder(flake_dir, name), group)
def remove_secret_command(args: argparse.Namespace) -> None:
remove_secret(args.flake, args.group, args.secret)
remove_secret(Path(args.flake), args.group, args.secret)
def register_groups_parser(parser: argparse.ArgumentParser) -> None:
@ -207,11 +206,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
# List groups
list_parser = subparser.add_parser("list", help="list groups")
list_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
list_parser.set_defaults(func=list_command)
# Add user
@ -222,11 +216,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
add_machine_parser.add_argument(
"machine", help="the name of the machines to add", type=machine_name_type
)
add_machine_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
add_machine_parser.set_defaults(func=add_machine_command)
# Remove machine
@ -237,11 +226,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
remove_machine_parser.add_argument(
"machine", help="the name of the machines to remove", type=machine_name_type
)
remove_machine_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
remove_machine_parser.set_defaults(func=remove_machine_command)
# Add user
@ -250,11 +234,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
add_user_parser.add_argument(
"user", help="the name of the user to add", type=user_name_type
)
add_user_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
add_user_parser.set_defaults(func=add_user_command)
# Remove user
@ -265,11 +244,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
remove_user_parser.add_argument(
"user", help="the name of the user to remove", type=user_name_type
)
remove_user_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
remove_user_parser.set_defaults(func=remove_user_command)
# Add secret
@ -282,11 +256,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
add_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
add_secret_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
add_secret_parser.set_defaults(func=add_secret_command)
# Remove secret
@ -299,9 +268,4 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
remove_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
remove_secret_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
remove_secret_parser.set_defaults(func=remove_secret_command)

View File

@ -36,15 +36,15 @@ def import_sops(args: argparse.Namespace) -> None:
file=sys.stderr,
)
continue
if (sops_secrets_folder(args.flake) / k / "secret").exists():
if (sops_secrets_folder(Path(args.flake)) / k / "secret").exists():
print(
f"WARNING: {k} already exists, skipping",
file=sys.stderr,
)
continue
encrypt_secret(
args.flake,
sops_secrets_folder(args.flake) / k,
Path(args.flake),
sops_secrets_folder(Path(args.flake)) / k,
v,
add_groups=args.group,
add_machines=args.machine,
@ -91,10 +91,5 @@ def register_import_sops_parser(parser: argparse.ArgumentParser) -> None:
type=str,
help="the sops file to import (- for stdin)",
)
parser.add_argument(
"flake",
type=str,
help="name of the flake",
)
parser.set_defaults(func=import_sops)

View File

@ -1,74 +1,87 @@
import argparse
from pathlib import Path
from ..errors import ClanError
from ..machines.types import machine_name_type, validate_hostname
from ..types import FlakeName
from . import secrets
from .folders import list_objects, remove_object, sops_machines_folder
from .sops import read_key, write_key
from .types import public_or_private_age_key_type, secret_name_type
def add_machine(flake_name: FlakeName, name: str, key: str, force: bool) -> None:
write_key(sops_machines_folder(flake_name) / name, key, force)
def add_machine(flake_dir: Path, name: str, key: str, force: bool) -> None:
write_key(sops_machines_folder(flake_dir) / name, key, force)
def remove_machine(flake_name: FlakeName, name: str) -> None:
remove_object(sops_machines_folder(flake_name), name)
def remove_machine(flake_dir: Path, name: str) -> None:
remove_object(sops_machines_folder(flake_dir), name)
def get_machine(flake_name: FlakeName, name: str) -> str:
return read_key(sops_machines_folder(flake_name) / name)
def get_machine(flake_dir: Path, name: str) -> str:
return read_key(sops_machines_folder(flake_dir) / name)
def has_machine(flake_name: FlakeName, name: str) -> bool:
return (sops_machines_folder(flake_name) / name / "key.json").exists()
def has_machine(flake_dir: Path, name: str) -> bool:
return (sops_machines_folder(flake_dir) / name / "key.json").exists()
def list_machines(flake_name: FlakeName) -> list[str]:
path = sops_machines_folder(flake_name)
def list_machines(flake_dir: Path) -> list[str]:
path = sops_machines_folder(flake_dir)
def validate(name: str) -> bool:
return validate_hostname(name) and has_machine(flake_name, name)
return validate_hostname(name) and has_machine(flake_dir, name)
return list_objects(path, validate)
def add_secret(flake_name: FlakeName, machine: str, secret: str) -> None:
def add_secret(flake_dir: Path, machine: str, secret: str) -> None:
secrets.allow_member(
secrets.machines_folder(flake_name, secret),
sops_machines_folder(flake_name),
secrets.machines_folder(flake_dir, secret),
sops_machines_folder(flake_dir),
machine,
)
def remove_secret(flake_name: FlakeName, machine: str, secret: str) -> None:
secrets.disallow_member(secrets.machines_folder(flake_name, secret), machine)
def remove_secret(flake_dir: Path, machine: str, secret: str) -> None:
secrets.disallow_member(secrets.machines_folder(flake_dir, secret), machine)
def list_command(args: argparse.Namespace) -> None:
lst = list_machines(args.flake)
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
lst = list_machines(Path(args.flake))
if len(lst) > 0:
print("\n".join(lst))
def add_command(args: argparse.Namespace) -> None:
add_machine(args.flake, args.machine, args.key, args.force)
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
add_machine(Path(args.flake), args.machine, args.key, args.force)
def get_command(args: argparse.Namespace) -> None:
print(get_machine(args.flake, args.machine))
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
print(get_machine(Path(args.flake), args.machine))
def remove_command(args: argparse.Namespace) -> None:
remove_machine(args.flake, args.machine)
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
remove_machine(Path(args.flake), args.machine)
def add_secret_command(args: argparse.Namespace) -> None:
add_secret(args.flake, args.machine, args.secret)
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
add_secret(Path(args.flake), args.machine, args.secret)
def remove_secret_command(args: argparse.Namespace) -> None:
remove_secret(args.flake, args.machine, args.secret)
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
remove_secret(Path(args.flake), args.machine, args.secret)
def register_machines_parser(parser: argparse.ArgumentParser) -> None:
@ -80,11 +93,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
)
# Parser
list_parser = subparser.add_parser("list", help="list machines")
list_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
list_parser.set_defaults(func=list_command)
# Parser
@ -104,11 +112,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
help="public key or private key of the user",
type=public_or_private_age_key_type,
)
add_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
add_parser.set_defaults(func=add_command)
# Parser
@ -116,11 +119,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
get_parser.add_argument(
"machine", help="the name of the machine", type=machine_name_type
)
get_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
get_parser.set_defaults(func=get_command)
# Parser
@ -128,11 +126,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
remove_parser.add_argument(
"machine", help="the name of the machine", type=machine_name_type
)
remove_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
remove_parser.set_defaults(func=remove_command)
# Parser
@ -145,11 +138,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
add_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
add_secret_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
add_secret_parser.set_defaults(func=add_secret_command)
# Parser
@ -162,9 +150,4 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
remove_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
remove_secret_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
remove_secret_parser.set_defaults(func=remove_secret_command)

View File

@ -8,7 +8,6 @@ from typing import IO
from .. import tty
from ..errors import ClanError
from ..types import FlakeName
from .folders import (
list_objects,
sops_groups_folder,
@ -54,36 +53,36 @@ def collect_keys_for_path(path: Path) -> set[str]:
def encrypt_secret(
flake_name: FlakeName,
flake_dir: Path,
secret: Path,
value: IO[str] | str | None,
add_users: list[str] = [],
add_machines: list[str] = [],
add_groups: list[str] = [],
) -> None:
key = ensure_sops_key(flake_name)
key = ensure_sops_key(flake_dir)
keys = set([])
for user in add_users:
allow_member(
users_folder(flake_name, secret.name),
sops_users_folder(flake_name),
users_folder(flake_dir, secret.name),
sops_users_folder(flake_dir),
user,
False,
)
for machine in add_machines:
allow_member(
machines_folder(flake_name, secret.name),
sops_machines_folder(flake_name),
machines_folder(flake_dir, secret.name),
sops_machines_folder(flake_dir),
machine,
False,
)
for group in add_groups:
allow_member(
groups_folder(flake_name, secret.name),
sops_groups_folder(flake_name),
groups_folder(flake_dir, secret.name),
sops_groups_folder(flake_dir),
group,
False,
)
@ -93,8 +92,8 @@ def encrypt_secret(
if key.pubkey not in keys:
keys.add(key.pubkey)
allow_member(
users_folder(flake_name, secret.name),
sops_users_folder(flake_name),
users_folder(flake_dir, secret.name),
sops_users_folder(flake_dir),
key.username,
False,
)
@ -102,31 +101,31 @@ def encrypt_secret(
encrypt_file(secret / "secret", value, list(sorted(keys)))
def remove_secret(flake_name: FlakeName, secret: str) -> None:
path = sops_secrets_folder(flake_name) / secret
def remove_secret(flake_dir: Path, secret: str) -> None:
path = sops_secrets_folder(flake_dir) / secret
if not path.exists():
raise ClanError(f"Secret '{secret}' does not exist")
shutil.rmtree(path)
def remove_command(args: argparse.Namespace) -> None:
remove_secret(args.flake, args.secret)
remove_secret(Path(args.flake), args.secret)
def add_secret_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument("secret", help="the name of the secret", type=secret_name_type)
def machines_folder(flake_name: FlakeName, group: str) -> Path:
return sops_secrets_folder(flake_name) / group / "machines"
def machines_folder(flake_dir: Path, group: str) -> Path:
return sops_secrets_folder(flake_dir) / group / "machines"
def users_folder(flake_name: FlakeName, group: str) -> Path:
return sops_secrets_folder(flake_name) / group / "users"
def users_folder(flake_dir: Path, group: str) -> Path:
return sops_secrets_folder(flake_dir) / group / "users"
def groups_folder(flake_name: FlakeName, group: str) -> Path:
return sops_secrets_folder(flake_name) / group / "groups"
def groups_folder(flake_dir: Path, group: str) -> Path:
return sops_secrets_folder(flake_dir) / group / "groups"
def list_directory(directory: Path) -> str:
@ -189,37 +188,35 @@ def disallow_member(group_folder: Path, name: str) -> None:
)
def has_secret(flake_name: FlakeName, secret: str) -> bool:
return (sops_secrets_folder(flake_name) / secret / "secret").exists()
def has_secret(flake_dir: Path, secret: str) -> bool:
return (sops_secrets_folder(flake_dir) / secret / "secret").exists()
def list_secrets(flake_name: FlakeName) -> list[str]:
path = sops_secrets_folder(flake_name)
def list_secrets(flake_dir: Path) -> list[str]:
path = sops_secrets_folder(flake_dir)
def validate(name: str) -> bool:
return VALID_SECRET_NAME.match(name) is not None and has_secret(
flake_name, name
)
return VALID_SECRET_NAME.match(name) is not None and has_secret(flake_dir, name)
return list_objects(path, validate)
def list_command(args: argparse.Namespace) -> None:
lst = list_secrets(args.flake)
lst = list_secrets(Path(args.flake))
if len(lst) > 0:
print("\n".join(lst))
def decrypt_secret(flake_name: FlakeName, secret: str) -> str:
ensure_sops_key(flake_name)
secret_path = sops_secrets_folder(flake_name) / secret / "secret"
def decrypt_secret(flake_dir: Path, secret: str) -> str:
ensure_sops_key(flake_dir)
secret_path = sops_secrets_folder(flake_dir) / secret / "secret"
if not secret_path.exists():
raise ClanError(f"Secret '{secret}' does not exist")
return decrypt_file(secret_path)
def get_command(args: argparse.Namespace) -> None:
print(decrypt_secret(args.flake, args.secret), end="")
print(decrypt_secret(Path(args.flake), args.secret), end="")
def set_command(args: argparse.Namespace) -> None:
@ -232,8 +229,8 @@ def set_command(args: argparse.Namespace) -> None:
elif tty.is_interactive():
secret_value = getpass.getpass(prompt="Paste your secret: ")
encrypt_secret(
args.flake,
sops_secrets_folder(args.flake) / args.secret,
Path(args.flake),
sops_secrets_folder(Path(args.flake)) / args.secret,
secret_value,
args.user,
args.machine,
@ -242,8 +239,8 @@ def set_command(args: argparse.Namespace) -> None:
def rename_command(args: argparse.Namespace) -> None:
old_path = sops_secrets_folder(args.flake) / args.secret
new_path = sops_secrets_folder(args.flake) / args.new_name
old_path = sops_secrets_folder(Path(args.flake)) / args.secret
new_path = sops_secrets_folder(Path(args.flake)) / args.new_name
if not old_path.exists():
raise ClanError(f"Secret '{args.secret}' does not exist")
if new_path.exists():
@ -253,20 +250,10 @@ def rename_command(args: argparse.Namespace) -> None:
def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
parser_list = subparser.add_parser("list", help="list secrets")
parser_list.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser_list.set_defaults(func=list_command)
parser_get = subparser.add_parser("get", help="get a secret")
add_secret_argument(parser_get)
parser_get.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser_get.set_defaults(func=get_command)
parser_set = subparser.add_parser("set", help="set a secret")
@ -299,28 +286,13 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
default=False,
help="edit the secret with $EDITOR instead of pasting it",
)
parser_set.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser_set.set_defaults(func=set_command)
parser_rename = subparser.add_parser("rename", help="rename a secret")
add_secret_argument(parser_rename)
parser_rename.add_argument("new_name", type=str, help="the new name of the secret")
parser_rename.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser_rename.set_defaults(func=rename_command)
parser_remove = subparser.add_parser("remove", help="remove a secret")
add_secret_argument(parser_remove)
parser_remove.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser_remove.set_defaults(func=remove_command)

View File

@ -10,7 +10,6 @@ from typing import IO, Iterator
from ..dirs import user_config_dir
from ..errors import ClanError
from ..nix import nix_shell
from ..types import FlakeName
from .folders import sops_machines_folder, sops_users_folder
@ -52,7 +51,7 @@ def generate_private_key() -> tuple[str, str]:
raise ClanError("Failed to generate private sops key") from e
def get_user_name(flake_name: FlakeName, user: str) -> str:
def get_user_name(flake_dir: Path, user: str) -> str:
"""Ask the user for their name until a unique one is provided."""
while True:
name = input(
@ -60,14 +59,14 @@ def get_user_name(flake_name: FlakeName, user: str) -> str:
)
if name:
user = name
if not (sops_users_folder(flake_name) / user).exists():
if not (flake_dir / user).exists():
return user
print(f"{sops_users_folder(flake_name) / user} already exists")
print(f"{flake_dir / user} already exists")
def ensure_user_or_machine(flake_name: FlakeName, pub_key: str) -> SopsKey:
def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey:
key = SopsKey(pub_key, username="")
folders = [sops_users_folder(flake_name), sops_machines_folder(flake_name)]
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
for folder in folders:
if folder.exists():
for user in folder.iterdir():
@ -91,13 +90,13 @@ def default_sops_key_path() -> Path:
return user_config_dir() / "sops" / "age" / "keys.txt"
def ensure_sops_key(flake_name: FlakeName) -> SopsKey:
def ensure_sops_key(flake_dir: Path) -> SopsKey:
key = os.environ.get("SOPS_AGE_KEY")
if key:
return ensure_user_or_machine(flake_name, get_public_key(key))
return ensure_user_or_machine(flake_dir, get_public_key(key))
path = default_sops_key_path()
if path.exists():
return ensure_user_or_machine(flake_name, get_public_key(path.read_text()))
return ensure_user_or_machine(flake_dir, get_public_key(path.read_text()))
else:
raise ClanError(
"No sops key found. Please generate one with 'clan secrets key generate'."

View File

@ -10,9 +10,7 @@ from typing import Any
from clan_cli.nix import nix_shell
from ..dirs import specific_flake_dir
from ..errors import ClanError
from ..types import FlakeName
from .folders import sops_secrets_folder
from .machines import add_machine, has_machine
from .secrets import decrypt_secret, encrypt_secret, has_secret
@ -21,29 +19,29 @@ from .sops import generate_private_key
log = logging.getLogger(__name__)
def generate_host_key(flake_name: FlakeName, machine_name: str) -> None:
if has_machine(flake_name, machine_name):
def generate_host_key(flake_dir: Path, machine_name: str) -> None:
if has_machine(flake_dir, machine_name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
flake_name,
sops_secrets_folder(flake_name) / f"{machine_name}-age.key",
flake_dir,
sops_secrets_folder(flake_dir) / f"{machine_name}-age.key",
priv_key,
)
add_machine(flake_name, machine_name, pub_key, False)
add_machine(flake_dir, machine_name, pub_key, False)
def generate_secrets_group(
flake_name: FlakeName,
flake_dir: Path,
secret_group: str,
machine_name: str,
tempdir: Path,
secret_options: dict[str, Any],
) -> None:
clan_dir = specific_flake_dir(flake_name)
clan_dir = flake_dir
secrets = secret_options["secrets"]
needs_regeneration = any(
not has_secret(flake_name, f"{machine_name}-{secret['name']}")
not has_secret(flake_dir, f"{machine_name}-{secret['name']}")
for secret in secrets.values()
)
generator = secret_options["generator"]
@ -74,8 +72,8 @@ export secrets={shlex.quote(str(secrets_dir))}
msg += text
raise ClanError(msg)
encrypt_secret(
flake_name,
sops_secrets_folder(flake_name) / f"{machine_name}-{secret['name']}",
flake_dir,
sops_secrets_folder(flake_dir) / f"{machine_name}-{secret['name']}",
secret_file.read_text(),
add_machines=[machine_name],
)
@ -92,21 +90,19 @@ export secrets={shlex.quote(str(secrets_dir))}
# this is called by the sops.nix clan core module
def generate_secrets_from_nix(
flake_name: FlakeName,
machine_name: str,
secret_submodules: dict[str, Any],
) -> None:
generate_host_key(flake_name, machine_name)
flake_dir = Path(os.environ["CLAN_DIR"])
generate_host_key(flake_dir, machine_name)
errors = {}
log.debug(
"Generating secrets for machine %s and flake %s", machine_name, flake_name
)
log.debug("Generating secrets for machine %s and flake %s", machine_name, flake_dir)
with TemporaryDirectory() as d:
# if any of the secrets are missing, we regenerate all connected facts/secrets
for secret_group, secret_options in secret_submodules.items():
try:
generate_secrets_group(
flake_name, secret_group, machine_name, Path(d), secret_options
flake_dir, secret_group, machine_name, Path(d), secret_options
)
except ClanError as e:
errors[secret_group] = e
@ -119,16 +115,16 @@ def generate_secrets_from_nix(
# this is called by the sops.nix clan core module
def upload_age_key_from_nix(
flake_name: FlakeName,
machine_name: str,
) -> None:
log.debug("Uploading secrets for machine %s and flake %s", machine_name, flake_name)
flake_dir = Path(os.environ["CLAN_DIR"])
log.debug("Uploading secrets for machine %s and flake %s", machine_name, flake_dir)
secret_name = f"{machine_name}-age.key"
if not has_secret(
flake_name, secret_name
flake_dir, secret_name
): # skip uploading the secret, not managed by us
return
secret = decrypt_secret(flake_name, secret_name)
secret = decrypt_secret(flake_dir, secret_name)
secrets_dir = Path(os.environ["SECRETS_DIR"])
(secrets_dir / "key.txt").write_text(secret)

View File

@ -4,7 +4,6 @@ import subprocess
from pathlib import Path
from tempfile import TemporaryDirectory
from ..dirs import specific_flake_dir
from ..machines.machines import Machine
from ..nix import nix_shell
@ -38,7 +37,7 @@ def upload_secrets(machine: Machine) -> None:
def upload_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake))
machine = Machine(name=args.machine, flake_dir=args.flake)
upload_secrets(machine)
@ -47,9 +46,4 @@ def register_upload_parser(parser: argparse.ArgumentParser) -> None:
"machine",
help="The machine to upload secrets to",
)
parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser.set_defaults(func=upload_command)

View File

@ -1,6 +1,7 @@
import argparse
from pathlib import Path
from ..types import FlakeName
from ..errors import ClanError
from . import secrets
from .folders import list_objects, remove_object, sops_users_folder
from .sops import read_key, write_key
@ -12,20 +13,20 @@ from .types import (
)
def add_user(flake_name: FlakeName, name: str, key: str, force: bool) -> None:
write_key(sops_users_folder(flake_name) / name, key, force)
def add_user(flake_dir: Path, name: str, key: str, force: bool) -> None:
write_key(sops_users_folder(flake_dir) / name, key, force)
def remove_user(flake_name: FlakeName, name: str) -> None:
remove_object(sops_users_folder(flake_name), name)
def remove_user(flake_dir: Path, name: str) -> None:
remove_object(sops_users_folder(flake_dir), name)
def get_user(flake_name: FlakeName, name: str) -> str:
return read_key(sops_users_folder(flake_name) / name)
def get_user(flake_dir: Path, name: str) -> str:
return read_key(sops_users_folder(flake_dir) / name)
def list_users(flake_name: FlakeName) -> list[str]:
path = sops_users_folder(flake_name)
def list_users(flake_dir: Path) -> list[str]:
path = sops_users_folder(flake_dir)
def validate(name: str) -> bool:
return (
@ -36,40 +37,52 @@ def list_users(flake_name: FlakeName) -> list[str]:
return list_objects(path, validate)
def add_secret(flake_name: FlakeName, user: str, secret: str) -> None:
def add_secret(flake_dir: Path, user: str, secret: str) -> None:
secrets.allow_member(
secrets.users_folder(flake_name, secret), sops_users_folder(flake_name), user
secrets.users_folder(flake_dir, secret), sops_users_folder(flake_dir), user
)
def remove_secret(flake_name: FlakeName, user: str, secret: str) -> None:
secrets.disallow_member(secrets.users_folder(flake_name, secret), user)
def remove_secret(flake_dir: Path, user: str, secret: str) -> None:
secrets.disallow_member(secrets.users_folder(flake_dir, secret), user)
def list_command(args: argparse.Namespace) -> None:
lst = list_users(args.flake)
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
lst = list_users(Path(args.flake))
if len(lst) > 0:
print("\n".join(lst))
def add_command(args: argparse.Namespace) -> None:
add_user(args.flake, args.user, args.key, args.force)
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
add_user(Path(args.flake), args.user, args.key, args.force)
def get_command(args: argparse.Namespace) -> None:
print(get_user(args.flake, args.user))
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
print(get_user(Path(args.flake), args.user))
def remove_command(args: argparse.Namespace) -> None:
remove_user(args.flake, args.user)
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
remove_user(Path(args.flake), args.user)
def add_secret_command(args: argparse.Namespace) -> None:
add_secret(args.flake, args.user, args.secret)
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
add_secret(Path(args.flake), args.user, args.secret)
def remove_secret_command(args: argparse.Namespace) -> None:
remove_secret(args.flake, args.user, args.secret)
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
remove_secret(Path(args.flake), args.user, args.secret)
def register_users_parser(parser: argparse.ArgumentParser) -> None:
@ -80,11 +93,6 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
required=True,
)
list_parser = subparser.add_parser("list", help="list users")
list_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
list_parser.set_defaults(func=list_command)
add_parser = subparser.add_parser("add", help="add a user")
@ -98,29 +106,14 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
type=public_or_private_age_key_type,
)
add_parser.set_defaults(func=add_command)
add_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
get_parser = subparser.add_parser("get", help="get a user public key")
get_parser.add_argument("user", help="the name of the user", type=user_name_type)
get_parser.set_defaults(func=get_command)
get_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
remove_parser = subparser.add_parser("remove", help="remove a user")
remove_parser.add_argument("user", help="the name of the user", type=user_name_type)
remove_parser.set_defaults(func=remove_command)
remove_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
add_secret_parser = subparser.add_parser(
"add-secret", help="allow a user to access a secret"
@ -131,11 +124,6 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
add_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
add_secret_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
add_secret_parser.set_defaults(func=add_secret_command)
remove_secret_parser = subparser.add_parser(
@ -147,9 +135,4 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
remove_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
remove_secret_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
remove_secret_parser.set_defaults(func=remove_secret_command)

View File

@ -13,6 +13,7 @@ from typing import Any, Iterator, Optional, Type, TypeVar
from uuid import UUID, uuid4
from .custom_logger import ThreadFormatter, get_caller
from .deal import deal
from .errors import ClanError
@ -161,6 +162,8 @@ class TaskPool:
def __getitem__(self, uuid: UUID) -> BaseTask:
with self.lock:
if uuid not in self.pool:
raise ClanError(f"Task with uuid {uuid} does not exist")
return self.pool[uuid]
def __setitem__(self, uuid: UUID, task: BaseTask) -> None:
@ -175,6 +178,7 @@ class TaskPool:
POOL: TaskPool = TaskPool()
@deal.raises(ClanError)
def get_task(uuid: UUID) -> BaseTask:
global POOL
return POOL[uuid]

View File

@ -101,7 +101,7 @@ class BuildVmTask(BaseTask):
cmd = next(cmds)
cmd.run(
[vm_config["uploadSecrets"], clan_name],
[vm_config["uploadSecrets"]],
env=env,
)
@ -193,7 +193,7 @@ def create_vm(vm: VmConfig, nix_options: list[str] = []) -> BuildVmTask:
def create_command(args: argparse.Namespace) -> None:
flake_url = args.flake
if not is_flake_url(args.flake):
if not is_flake_url(str(args.flake)):
flake_url = specific_flake_dir(args.flake)
vm = asyncio.run(inspect_vm(flake_url=flake_url, flake_attr=args.machine))
@ -203,10 +203,5 @@ def create_command(args: argparse.Namespace) -> None:
def register_create_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument("machine", type=str)
parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser.add_argument("machine", type=str, help="machine in the flake to create")
parser.set_defaults(func=create_command)

View File

@ -0,0 +1,10 @@
import logging
from pydantic import BaseModel
log = logging.getLogger(__name__)
class MissingClanImports(BaseModel):
missing_clan_imports: list[str] = []
msg: str = "Some requested clan modules could not be found"

View File

@ -2,23 +2,15 @@ import logging
from pathlib import Path
from typing import Any
from pydantic import AnyUrl, BaseModel, validator
from pydantic import AnyUrl, BaseModel, Extra, validator
from ..dirs import clan_data_dir, clan_flakes_dir
from ..dirs import clan_flakes_dir
from ..flakes.create import DEFAULT_URL
from ..types import validate_path
log = logging.getLogger(__name__)
class ClanDataPath(BaseModel):
directory: Path
@validator("directory")
def check_directory(cls: Any, v: Path) -> Path: # noqa
return validate_path(clan_data_dir(), v)
class ClanFlakePath(BaseModel):
flake_name: Path
@ -29,3 +21,12 @@ class ClanFlakePath(BaseModel):
class FlakeCreateInput(ClanFlakePath):
url: AnyUrl = DEFAULT_URL
class MachineConfig(BaseModel):
clanImports: list[str] = [] # noqa: N815
clan: dict = {}
# allow extra fields to cover the full spectrum of a nixos config
class Config:
extra = Extra.allow

View File

@ -1,7 +1,7 @@
from enum import Enum
from typing import Dict, List
from pydantic import BaseModel, Field
from pydantic import BaseModel, Extra, Field
from ..async_cmd import CmdOut
from ..task_manager import TaskStatus
@ -36,7 +36,12 @@ class MachineResponse(BaseModel):
class ConfigResponse(BaseModel):
config: dict
clanImports: list[str] = [] # noqa: N815
clan: dict = {}
# allow extra fields to cover the full spectrum of a nixos config
class Config:
extra = Extra.allow
class SchemaResponse(BaseModel):

View File

@ -4,12 +4,15 @@ from typing import Annotated
from fastapi import APIRouter, Body
from clan_cli.webui.api_errors import MissingClanImports
from clan_cli.webui.api_inputs import MachineConfig
from ...config.machine import (
config_for_machine,
schema_for_machine,
set_config_for_machine,
verify_machine_config,
)
from ...config.schema import machine_schema
from ...machines.create import create_machine as _create_machine
from ...machines.list import list_machines as _list_machines
from ...types import FlakeName
@ -55,28 +58,26 @@ async def get_machine(flake_name: FlakeName, name: str) -> MachineResponse:
@router.get("/api/{flake_name}/machines/{name}/config", tags=[Tags.machine])
async def get_machine_config(flake_name: FlakeName, name: str) -> ConfigResponse:
config = config_for_machine(flake_name, name)
return ConfigResponse(config=config)
return ConfigResponse(**config)
@router.put("/api/{flake_name}/machines/{name}/config", tags=[Tags.machine])
async def set_machine_config(
flake_name: FlakeName, name: str, config: Annotated[dict, Body()]
) -> ConfigResponse:
set_config_for_machine(flake_name, name, config)
return ConfigResponse(config=config)
flake_name: FlakeName, name: str, config: Annotated[MachineConfig, Body()]
) -> None:
conf = dict(config)
set_config_for_machine(flake_name, name, conf)
@router.get("/api/{flake_name}/machines/{name}/schema", tags=[Tags.machine])
async def get_machine_schema(flake_name: FlakeName, name: str) -> SchemaResponse:
schema = schema_for_machine(flake_name, name)
return SchemaResponse(schema=schema)
@router.put("/api/{flake_name}/machines/{name}/schema", tags=[Tags.machine])
async def set_machine_schema(
flake_name: FlakeName, name: str, config: Annotated[dict, Body()]
@router.put(
"/api/{flake_name}/schema",
tags=[Tags.machine],
responses={400: {"model": MissingClanImports}},
)
async def get_machine_schema(
flake_name: FlakeName, config: Annotated[dict, Body()]
) -> SchemaResponse:
schema = schema_for_machine(flake_name, name, config)
schema = machine_schema(flake_name, config=config)
return SchemaResponse(schema=schema)

View File

@ -34,7 +34,10 @@
, gnupg
, e2fsprogs
, mypy
, cntr
, deal
, schemathesis
, rope
, clan-core-path
}:
let
@ -50,6 +53,8 @@ let
pytest-subprocess
pytest-xdist
pytest-timeout
deal
schemathesis
remote-pdb
ipdb
openssh
@ -129,14 +134,30 @@ python3.pkgs.buildPythonApplication {
propagatedBuildInputs = dependencies;
# also re-expose dependencies so we test them in CI
passthru.tests = (lib.mapAttrs' (n: lib.nameValuePair "clan-dep-${n}") runtimeDependenciesAsSet) // {
clan-pytest = runCommand "clan-pytest" { nativeBuildInputs = [ checkPython ] ++ pytestDependencies; } ''
passthru.tests = (lib.mapAttrs' (n: lib.nameValuePair "clan-dep-${n}") runtimeDependenciesAsSet) // rec {
clan-pytest-without-core = runCommand "clan-pytest-without-core" { nativeBuildInputs = [ checkPython ] ++ pytestDependencies; } ''
cp -r ${source} ./src
chmod +w -R ./src
cd ./src
export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1
${checkPython}/bin/python -m pytest -m "not impure" -s ./tests
${checkPython}/bin/python -m pytest -m "not impure and not with_core" -s ./tests
touch $out
'';
# separate the tests that can never be cached
clan-pytest-with-core = runCommand "clan-pytest-with-core" { nativeBuildInputs = [ checkPython ] ++ pytestDependencies; } ''
cp -r ${source} ./src
chmod +w -R ./src
cd ./src
export CLAN_CORE=${clan-core-path}
export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1
${checkPython}/bin/python -m pytest -m "not impure and with_core" -s ./tests
touch $out
'';
clan-pytest = runCommand "clan-pytest" { } ''
echo ${clan-pytest-without-core}
echo ${clan-pytest-with-core}
touch $out
'';
check-for-breakpoints = runCommand "breakpoints" { } ''
@ -161,6 +182,7 @@ python3.pkgs.buildPythonApplication {
passthru.checkPython = checkPython;
passthru.devDependencies = [
rope
setuptools
wheel
] ++ pytestDependencies;

View File

@ -1,19 +1,49 @@
{ inputs, ... }:
{ inputs, self, lib, ... }:
{
perSystem = { self', pkgs, ... }: {
devShells.clan-cli = pkgs.callPackage ./shell.nix {
inherit (self'.packages) clan-cli ui-assets nix-unit;
};
packages = {
clan-cli = pkgs.python3.pkgs.callPackage ./default.nix {
inherit (self'.packages) ui-assets;
inherit (inputs) nixpkgs;
perSystem = { self', pkgs, system, ... }:
let
flakeLock = lib.importJSON (self + /flake.lock);
flakeInputs = (builtins.removeAttrs inputs [ "self" ]);
flakeLockVendoredDeps = flakeLock // {
nodes = flakeLock.nodes // (
lib.flip lib.mapAttrs flakeInputs (name: _: flakeLock.nodes.${name} // {
locked = {
inherit (flakeLock.nodes.${name}.locked) narHash;
lastModified =
# lol, nixpkgs has a different timestamp on the fs???
if name == "nixpkgs"
then 0
else 1;
path = "${inputs.${name}}";
type = "path";
};
})
);
};
flakeLockFile = builtins.toFile "clan-core-flake.lock"
(builtins.toJSON flakeLockVendoredDeps);
clanCoreWithVendoredDeps = lib.trace flakeLockFile pkgs.runCommand "clan-core-with-vendored-deps" { } ''
cp -r ${self} $out
chmod +w -R $out
cp ${flakeLockFile} $out/flake.lock
'';
in
{
devShells.clan-cli = pkgs.callPackage ./shell.nix {
inherit (self'.packages) clan-cli ui-assets nix-unit;
};
packages = {
clan-cli = pkgs.python3.pkgs.callPackage ./default.nix {
inherit (self'.packages) ui-assets;
inherit (inputs) nixpkgs;
inherit (inputs.nixpkgs-for-deal.legacyPackages.${system}.python3Packages) deal schemathesis;
clan-core-path = clanCoreWithVendoredDeps;
};
inherit (self'.packages.clan-cli) clan-openapi;
default = self'.packages.clan-cli;
};
inherit (self'.packages.clan-cli) clan-openapi;
default = self'.packages.clan-cli;
};
checks = self'.packages.clan-cli.tests;
};
checks = self'.packages.clan-cli.tests;
};
}

View File

@ -26,6 +26,7 @@ norecursedirs = "tests/helpers"
markers = [ "impure" ]
[tool.mypy]
plugins = ["deal.mypy"]
python_version = "3.10"
warn_redundant_casts = true
disallow_untyped_calls = true

View File

@ -55,7 +55,7 @@ def create_flake(
template = Path(__file__).parent / flake_name
# copy the template to a new temporary location
flake = temporary_home / ".local/state/clan/flake" / flake_name
flake = temporary_home / ".config/clan/flakes" / flake_name
shutil.copytree(template, flake)
# lookup the requested machines in ./test_machines and include them

View File

@ -4,6 +4,7 @@ import shlex
from clan_cli import create_parser
from clan_cli.custom_logger import get_caller
from clan_cli.dirs import get_clan_flake_toplevel
log = logging.getLogger(__name__)
@ -17,6 +18,8 @@ class Cli:
log.debug(f"$ {cmd}")
log.debug(f"Caller {get_caller()}")
parsed = self.parser.parse_args(args)
if parsed.flake is None:
parsed.flake = get_clan_flake_toplevel()
if hasattr(parsed, "func"):
parsed.func(parsed)
return parsed

View File

@ -21,6 +21,7 @@ def temporary_home(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]:
else:
with tempfile.TemporaryDirectory(prefix="pytest-") as dirpath:
monkeypatch.setenv("HOME", str(dirpath))
monkeypatch.setenv("XDG_CONFIG_HOME", str(Path(dirpath) / ".config"))
monkeypatch.chdir(str(dirpath))
log.debug("Temp HOME directory: %s", str(dirpath))
yield Path(dirpath)

View File

@ -3,7 +3,7 @@ from api import TestClient
from fixtures_flakes import FlakeForTest
@pytest.mark.impure()
@pytest.mark.with_core
def test_configure_machine(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
# retrieve the list of available clanModules
response = api.get(f"/api/{test_flake_with_core.name}/clan_modules")

View File

@ -27,7 +27,7 @@ def test_inspect_ok(api: TestClient, test_flake_with_core: FlakeForTest) -> None
assert response.status_code == 200, "Failed to inspect vm"
data = response.json()
print("Data: ", data)
assert data.get("flake_attrs") == ["vm1"]
assert data.get("flake_attrs") == ["vm1", "vm2"]
@pytest.mark.impure

View File

@ -31,6 +31,13 @@
'';
};
};
vm2 = { lib, ... }: {
clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__";
system.stateVersion = lib.version;
sops.age.keyFile = "__CLAN_SOPS_KEY_PATH__";
clanCore.secretsUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.networking.zerotier.networkId = "82b44b162ec6c013";
};
};
};
in

View File

@ -20,16 +20,66 @@ def test_import_sops(
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[1].privkey)
cli.run(
["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name]
[
"--flake",
str(test_flake.path),
"secrets",
"machines",
"add",
"machine1",
age_keys[0].pubkey,
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"users",
"add",
"user1",
age_keys[1].pubkey,
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"users",
"add",
"user2",
age_keys[2].pubkey,
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-user",
"group1",
"user1",
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-user",
"group1",
"user2",
]
)
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey, test_flake.name])
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey, test_flake.name])
cli.run(["secrets", "groups", "add-user", "group1", "user1", test_flake.name])
cli.run(["secrets", "groups", "add-user", "group1", "user2", test_flake.name])
# To edit:
# SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml
cmd = [
"--flake",
str(test_flake.path),
"secrets",
"import-sops",
"--group",
@ -37,15 +87,14 @@ def test_import_sops(
"--machine",
"machine1",
str(test_root.joinpath("data", "secrets.yaml")),
test_flake.name,
]
cli.run(cmd)
capsys.readouterr()
cli.run(["secrets", "users", "list", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "users", "list"])
users = sorted(capsys.readouterr().out.rstrip().split())
assert users == ["user1", "user2"]
capsys.readouterr()
cli.run(["secrets", "get", "secret-key", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "get", "secret-key"])
assert capsys.readouterr().out == "secret-value"

View File

@ -1,3 +1,4 @@
import pytest
from api import TestClient
from fixtures_flakes import FlakeForTest
@ -21,34 +22,76 @@ def test_machines(api: TestClient, test_flake: FlakeForTest) -> None:
assert response.json() == {"machines": [{"name": "test", "status": "unknown"}]}
def test_configure_machine(api: TestClient, test_flake: FlakeForTest) -> None:
@pytest.mark.with_core
def test_schema_errors(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
# make sure that eval errors do not raise an internal server error
response = api.put(
f"/api/{test_flake_with_core.name}/schema",
json={"imports": ["some-inavlid-import"]},
)
assert response.status_code == 422
assert (
"error: string 'some-inavlid-import' doesn't represent an absolute path"
in response.json()["detail"][0]["msg"]
)
@pytest.mark.with_core
def test_schema_invalid_clan_imports(
api: TestClient, test_flake_with_core: FlakeForTest
) -> None:
response = api.put(
f"/api/{test_flake_with_core.name}/schema",
json={"clanImports": ["non-existing-clan-module"]},
)
assert response.status_code == 400
assert (
"Some requested clan modules could not be found"
in response.json()["detail"]["msg"]
)
assert "non-existing-clan-module" in response.json()["detail"]["modules_not_found"]
@pytest.mark.with_core
def test_configure_machine(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
# ensure error 404 if machine does not exist when accessing the config
response = api.get(f"/api/{test_flake.name}/machines/machine1/config")
response = api.get(f"/api/{test_flake_with_core.name}/machines/machine1/config")
assert response.status_code == 404
# ensure error 404 if machine does not exist when writing to the config
response = api.put(f"/api/{test_flake.name}/machines/machine1/config", json={})
response = api.put(
f"/api/{test_flake_with_core.name}/machines/machine1/config", json={}
)
assert response.status_code == 404
# create the machine
response = api.post(f"/api/{test_flake.name}/machines", json={"name": "machine1"})
response = api.post(
f"/api/{test_flake_with_core.name}/machines", json={"name": "machine1"}
)
assert response.status_code == 201
# ensure an empty config is returned by default for a new machine
response = api.get(f"/api/{test_flake.name}/machines/machine1/config")
response = api.get(f"/api/{test_flake_with_core.name}/machines/machine1/config")
assert response.status_code == 200
assert response.json() == {"config": {}}
assert response.json() == {
"clanImports": [],
"clan": {},
}
# get jsonschema for machine
response = api.get(f"/api/{test_flake.name}/machines/machine1/schema")
# get jsonschema for without imports
response = api.put(
f"/api/{test_flake_with_core.name}/schema",
json={"clanImports": []},
)
assert response.status_code == 200
json_response = response.json()
assert "schema" in json_response and "properties" in json_response["schema"]
# an invalid config missing the fileSystems
invalid_config = dict(
clan=dict(
jitsi=dict(
clan=dict(),
services=dict(
nginx=dict(
enable=True,
),
),
@ -56,7 +99,7 @@ def test_configure_machine(api: TestClient, test_flake: FlakeForTest) -> None:
# verify an invalid config (fileSystems missing) fails
response = api.put(
f"/api/{test_flake.name}/machines/machine1/verify",
f"/api/{test_flake_with_core.name}/machines/machine1/verify",
json=invalid_config,
)
assert response.status_code == 200
@ -67,15 +110,15 @@ def test_configure_machine(api: TestClient, test_flake: FlakeForTest) -> None:
# set come invalid config (fileSystems missing)
response = api.put(
f"/api/{test_flake.name}/machines/machine1/config",
f"/api/{test_flake_with_core.name}/machines/machine1/config",
json=invalid_config,
)
assert response.status_code == 200
# ensure the config has actually been updated
response = api.get(f"/api/{test_flake.name}/machines/machine1/config")
response = api.get(f"/api/{test_flake_with_core.name}/machines/machine1/config")
assert response.status_code == 200
assert response.json() == {"config": invalid_config}
assert response.json() == dict(clanImports=[], **invalid_config)
# the part of the config that makes the evaluation pass
fs_config = dict(
@ -96,8 +139,9 @@ def test_configure_machine(api: TestClient, test_flake: FlakeForTest) -> None:
# set some valid config
config2 = dict(
clan=dict(
jitsi=dict(
clan=dict(),
services=dict(
nginx=dict(
enable=True,
),
),
@ -105,45 +149,57 @@ def test_configure_machine(api: TestClient, test_flake: FlakeForTest) -> None:
)
response = api.put(
f"/api/{test_flake.name}/machines/machine1/config",
f"/api/{test_flake_with_core.name}/machines/machine1/config",
json=config2,
)
assert response.status_code == 200
assert response.json() == {"config": config2}
# ensure the config has been applied
response = api.get(
f"/api/{test_flake_with_core.name}/machines/machine1/config",
)
assert response.status_code == 200
assert response.json() == dict(clanImports=[], **config2)
# get the config again
response = api.get(f"/api/{test_flake.name}/machines/machine1/config")
response = api.get(f"/api/{test_flake_with_core.name}/machines/machine1/config")
assert response.status_code == 200
assert response.json() == {"config": config2}
assert response.json() == {"clanImports": [], **config2}
# ensure PUT on the config is idempotent by passing the config again
# For example, this should not result in the boot.loader.grub.devices being
# set twice (eg. merged)
response = api.put(
f"/api/{test_flake.name}/machines/machine1/config",
f"/api/{test_flake_with_core.name}/machines/machine1/config",
json=config2,
)
assert response.status_code == 200
assert response.json() == {"config": config2}
# ensure the config has been applied
response = api.get(
f"/api/{test_flake_with_core.name}/machines/machine1/config",
)
assert response.status_code == 200
assert response.json() == dict(clanImports=[], **config2)
# verify the machine config evaluates
response = api.get(f"/api/{test_flake.name}/machines/machine1/verify")
response = api.get(f"/api/{test_flake_with_core.name}/machines/machine1/verify")
assert response.status_code == 200
assert response.json() == {"success": True, "error": None}
# get the schema with an extra module imported
response = api.put(
f"/api/{test_flake.name}/machines/machine1/schema",
json={"clanImports": ["fake-module"]},
f"/api/{test_flake_with_core.name}/schema",
json={"clanImports": ["diskLayouts"]},
)
# expect the result schema to contain the fake-module.fake-flag option
# expect the result schema to contain the deltachat option
assert response.status_code == 200
assert (
response.json()["schema"]["properties"]["fake-module"]["properties"][
"fake-flag"
]["type"]
== "boolean"
response.json()["schema"]["properties"]["diskLayouts"]["properties"][
"singleDiskExt4"
]["properties"]["device"]["type"]
== "string"
)
# new config importing an extra clanModule (clanModules.fake-module)
@ -159,20 +215,24 @@ def test_configure_machine(api: TestClient, test_flake: FlakeForTest) -> None:
# set the fake-module.fake-flag option to true
response = api.put(
f"/api/{test_flake.name}/machines/machine1/config",
f"/api/{test_flake_with_core.name}/machines/machine1/config",
json=config_with_imports,
)
assert response.status_code == 200
# ensure the config has been applied
response = api.get(
f"/api/{test_flake_with_core.name}/machines/machine1/config",
)
assert response.status_code == 200
assert response.json() == {
"config": {
"clanImports": ["fake-module"],
"clan": {
"fake-module": {
"fake-flag": True,
},
"clanImports": ["fake-module"],
"clan": {
"fake-module": {
"fake-flag": True,
},
**fs_config,
}
},
**fs_config,
}
# remove the import from the config
@ -181,8 +241,18 @@ def test_configure_machine(api: TestClient, test_flake: FlakeForTest) -> None:
**fs_config,
)
response = api.put(
f"/api/{test_flake.name}/machines/machine1/config",
f"/api/{test_flake_with_core.name}/machines/machine1/config",
json=config_with_empty_imports,
)
assert response.status_code == 200
assert response.json() == {"config": config_with_empty_imports}
# ensure the config has been applied
response = api.get(
f"/api/{test_flake_with_core.name}/machines/machine1/config",
)
assert response.status_code == 200
assert response.json() == {
"clanImports": ["fake-module"],
"clan": {},
**config_with_empty_imports,
}

View File

@ -1,8 +1,10 @@
import pytest
from fixtures_flakes import FlakeForTest
from clan_cli.config import machine
from clan_cli.config.schema import machine_schema
def test_schema_for_machine(test_flake: FlakeForTest) -> None:
schema = machine.schema_for_machine(test_flake.name, "machine1")
@pytest.mark.with_core
def test_schema_for_machine(test_flake_with_core: FlakeForTest) -> None:
schema = machine_schema(test_flake_with_core.name, config={})
assert "properties" in schema

View File

@ -24,41 +24,61 @@ def _test_identities(
cli = Cli()
sops_folder = test_flake.path / "sops"
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey, test_flake.name])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
what,
"add",
"foo",
age_keys[0].pubkey,
]
)
assert (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError):
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey, test_flake.name])
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
what,
"add",
"-f",
"foo",
age_keys[0].privkey,
test_flake.name,
]
)
capsys.readouterr() # empty the buffer
cli.run(["secrets", what, "get", "foo", test_flake.name])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
what,
"get",
"foo",
]
)
out = capsys.readouterr() # empty the buffer
assert age_keys[0].pubkey in out.out
capsys.readouterr() # empty the buffer
cli.run(["secrets", what, "list", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", what, "list"])
out = capsys.readouterr() # empty the buffer
assert "foo" in out.out
cli.run(["secrets", what, "remove", "foo", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", what, "remove", "foo"])
assert not (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError): # already removed
cli.run(["secrets", what, "remove", "foo", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", what, "remove", "foo"])
capsys.readouterr()
cli.run(["secrets", what, "list", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", what, "list"])
out = capsys.readouterr()
assert "foo" not in out.out
@ -80,35 +100,119 @@ def test_groups(
) -> None:
cli = Cli()
capsys.readouterr() # empty the buffer
cli.run(["secrets", "groups", "list", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "groups", "list"])
assert capsys.readouterr().out == ""
with pytest.raises(ClanError): # machine does not exist yet
cli.run(
["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name]
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-machine",
"group1",
"machine1",
]
)
with pytest.raises(ClanError): # user does not exist yet
cli.run(["secrets", "groups", "add-user", "groupb1", "user1", test_flake.name])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-user",
"groupb1",
"user1",
]
)
cli.run(
["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name]
[
"--flake",
str(test_flake.path),
"secrets",
"machines",
"add",
"machine1",
age_keys[0].pubkey,
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-machine",
"group1",
"machine1",
]
)
cli.run(["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name])
# Should this fail?
cli.run(["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-machine",
"group1",
"machine1",
]
)
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake.name])
cli.run(["secrets", "groups", "add-user", "group1", "user1", test_flake.name])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"users",
"add",
"user1",
age_keys[0].pubkey,
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-user",
"group1",
"user1",
]
)
capsys.readouterr() # empty the buffer
cli.run(["secrets", "groups", "list", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "groups", "list"])
out = capsys.readouterr().out
assert "user1" in out
assert "machine1" in out
cli.run(["secrets", "groups", "remove-user", "group1", "user1", test_flake.name])
cli.run(
["secrets", "groups", "remove-machine", "group1", "machine1", test_flake.name]
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"remove-user",
"group1",
"user1",
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"remove-machine",
"group1",
"machine1",
]
)
groups = os.listdir(test_flake.path / "sops" / "groups")
assert len(groups) == 0
@ -134,107 +238,249 @@ def test_secrets(
) -> None:
cli = Cli()
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "list"])
assert capsys.readouterr().out == ""
monkeypatch.setenv("SOPS_NIX_SECRET", "foo")
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key"))
cli.run(["secrets", "key", "generate"])
cli.run(["--flake", str(test_flake.path), "secrets", "key", "generate"])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "key", "show"])
cli.run(["--flake", str(test_flake.path), "secrets", "key", "show"])
key = capsys.readouterr().out
assert key.startswith("age1")
cli.run(["secrets", "users", "add", "testuser", key, test_flake.name])
cli.run(
["--flake", str(test_flake.path), "secrets", "users", "add", "testuser", key]
)
with pytest.raises(ClanError): # does not exist yet
cli.run(["secrets", "get", "nonexisting", test_flake.name])
cli.run(["secrets", "set", "initialkey", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "get", "nonexisting"])
cli.run(["--flake", str(test_flake.path), "secrets", "set", "initialkey"])
capsys.readouterr()
cli.run(["secrets", "get", "initialkey", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "get", "initialkey"])
assert capsys.readouterr().out == "foo"
capsys.readouterr()
cli.run(["secrets", "users", "list", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "users", "list"])
users = capsys.readouterr().out.rstrip().split("\n")
assert len(users) == 1, f"users: {users}"
owner = users[0]
monkeypatch.setenv("EDITOR", "cat")
cli.run(["secrets", "set", "--edit", "initialkey", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "set", "--edit", "initialkey"])
monkeypatch.delenv("EDITOR")
cli.run(["secrets", "rename", "initialkey", "key", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "rename", "initialkey", "key"])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "list"])
assert capsys.readouterr().out == "key\n"
cli.run(
["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name]
[
"--flake",
str(test_flake.path),
"secrets",
"machines",
"add",
"machine1",
age_keys[0].pubkey,
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"machines",
"add-secret",
"machine1",
"key",
]
)
cli.run(["secrets", "machines", "add-secret", "machine1", "key", test_flake.name])
capsys.readouterr()
cli.run(["secrets", "machines", "list", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "machines", "list"])
assert capsys.readouterr().out == "machine1\n"
with use_key(age_keys[0].privkey, monkeypatch):
capsys.readouterr()
cli.run(["secrets", "get", "key", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
assert capsys.readouterr().out == "foo"
cli.run(
["secrets", "machines", "remove-secret", "machine1", "key", test_flake.name]
[
"--flake",
str(test_flake.path),
"secrets",
"machines",
"remove-secret",
"machine1",
"key",
]
)
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey, test_flake.name])
cli.run(["secrets", "users", "add-secret", "user1", "key", test_flake.name])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"users",
"add",
"user1",
age_keys[1].pubkey,
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"users",
"add-secret",
"user1",
"key",
]
)
capsys.readouterr()
with use_key(age_keys[1].privkey, monkeypatch):
cli.run(["secrets", "get", "key", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
assert capsys.readouterr().out == "foo"
cli.run(["secrets", "users", "remove-secret", "user1", "key", test_flake.name])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"users",
"remove-secret",
"user1",
"key",
]
)
with pytest.raises(ClanError): # does not exist yet
cli.run(
["secrets", "groups", "add-secret", "admin-group", "key", test_flake.name]
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-secret",
"admin-group",
"key",
]
)
cli.run(["secrets", "groups", "add-user", "admin-group", "user1", test_flake.name])
cli.run(["secrets", "groups", "add-user", "admin-group", owner, test_flake.name])
cli.run(["secrets", "groups", "add-secret", "admin-group", "key", test_flake.name])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-user",
"admin-group",
"user1",
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-user",
"admin-group",
owner,
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-secret",
"admin-group",
"key",
]
)
capsys.readouterr() # empty the buffer
cli.run(["secrets", "set", "--group", "admin-group", "key2", test_flake.name])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"set",
"--group",
"admin-group",
"key2",
]
)
with use_key(age_keys[1].privkey, monkeypatch):
capsys.readouterr()
cli.run(["secrets", "get", "key", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
assert capsys.readouterr().out == "foo"
# extend group will update secrets
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey, test_flake.name])
cli.run(["secrets", "groups", "add-user", "admin-group", "user2", test_flake.name])
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"users",
"add",
"user2",
age_keys[2].pubkey,
]
)
cli.run(
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"add-user",
"admin-group",
"user2",
]
)
with use_key(age_keys[2].privkey, monkeypatch): # user2
capsys.readouterr()
cli.run(["secrets", "get", "key", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
assert capsys.readouterr().out == "foo"
cli.run(
["secrets", "groups", "remove-user", "admin-group", "user2", test_flake.name]
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"remove-user",
"admin-group",
"user2",
]
)
with pytest.raises(ClanError), use_key(age_keys[2].privkey, monkeypatch):
# user2 is not in the group anymore
capsys.readouterr()
cli.run(["secrets", "get", "key", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
print(capsys.readouterr().out)
cli.run(
["secrets", "groups", "remove-secret", "admin-group", "key", test_flake.name]
[
"--flake",
str(test_flake.path),
"secrets",
"groups",
"remove-secret",
"admin-group",
"key",
]
)
cli.run(["secrets", "remove", "key", test_flake.name])
cli.run(["secrets", "remove", "key2", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "remove", "key"])
cli.run(["--flake", str(test_flake.path), "secrets", "remove", "key2"])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", test_flake.name])
cli.run(["--flake", str(test_flake.path), "secrets", "list"])
assert capsys.readouterr().out == ""

View File

@ -1,3 +1,4 @@
import ipaddress
from typing import TYPE_CHECKING
import pytest
@ -23,43 +24,41 @@ def test_generate_secret(
cli = Cli()
cli.run(
[
"--flake",
str(test_flake_with_core.path),
"secrets",
"users",
"add",
"user1",
age_keys[0].pubkey,
test_flake_with_core.name,
]
)
cli.run(["secrets", "generate", "vm1", test_flake_with_core.name])
has_secret(test_flake_with_core.name, "vm1-age.key")
has_secret(test_flake_with_core.name, "vm1-zerotier-identity-secret")
cli.run(["--flake", str(test_flake_with_core.path), "secrets", "generate", "vm1"])
has_secret(test_flake_with_core.path, "vm1-age.key")
has_secret(test_flake_with_core.path, "vm1-zerotier-identity-secret")
network_id = machine_get_fact(
test_flake_with_core.name, "vm1", "zerotier-network-id"
)
assert len(network_id) == 16
age_key = (
sops_secrets_folder(test_flake_with_core.name)
.joinpath("vm1-age.key")
.joinpath("secret")
)
identity_secret = (
sops_secrets_folder(test_flake_with_core.name)
.joinpath("vm1-zerotier-identity-secret")
.joinpath("secret")
)
secrets_folder = sops_secrets_folder(test_flake_with_core.path)
age_key = secrets_folder / "vm1-age.key" / "secret"
identity_secret = secrets_folder / "vm1-zerotier-identity-secret" / "secret"
age_key_mtime = age_key.lstat().st_mtime_ns
secret1_mtime = identity_secret.lstat().st_mtime_ns
# test idempotency
cli.run(["secrets", "generate", "vm1", test_flake_with_core.name])
cli.run(["secrets", "generate", "vm1"])
assert age_key.lstat().st_mtime_ns == age_key_mtime
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
machine_path = (
sops_secrets_folder(test_flake_with_core.name)
.joinpath("vm1-zerotier-identity-secret")
.joinpath("machines")
.joinpath("vm1")
)
assert machine_path.exists()
assert (
secrets_folder / "vm1-zerotier-identity-secret" / "machines" / "vm1"
).exists()
cli.run(["secrets", "generate", "vm2"])
assert has_secret(test_flake_with_core.path, "vm2-age.key")
assert has_secret(test_flake_with_core.path, "vm2-zerotier-identity-secret")
ip = machine_get_fact(test_flake_with_core.name, "vm1", "zerotier-ip")
assert ipaddress.IPv6Address(ip).is_private
meshname = machine_get_fact(test_flake_with_core.name, "vm1", "zerotier-meshname")
assert len(meshname) == 26

View File

@ -39,7 +39,7 @@ def test_upload_secret(
check=True,
)
subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True)
cli.run(["secrets", "generate", "vm1", test_flake_with_core_and_pass.name])
cli.run(["secrets", "generate", "vm1"])
network_id = machine_get_fact(
test_flake_with_core_and_pass.name, "vm1", "zerotier-network-id"
)
@ -50,7 +50,7 @@ def test_upload_secret(
secret1_mtime = identity_secret.lstat().st_mtime_ns
# test idempotency
cli.run(["secrets", "generate", "vm1", test_flake_with_core_and_pass.name])
cli.run(["secrets", "generate", "vm1"])
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
flake = test_flake_with_core_and_pass.path.joinpath("flake.nix")
@ -58,7 +58,7 @@ def test_upload_secret(
addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}"
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
flake.write_text(new_text)
cli.run(["secrets", "upload", "vm1", test_flake_with_core_and_pass.name])
cli.run(["secrets", "upload", "vm1"])
zerotier_identity_secret = (
test_flake_with_core_and_pass.path / "secrets" / "zerotier-identity-secret"
)

View File

@ -23,27 +23,31 @@ def test_secrets_upload(
cli = Cli()
cli.run(
[
"--flake",
str(test_flake_with_core.path),
"secrets",
"users",
"add",
"user1",
age_keys[0].pubkey,
test_flake_with_core.name,
]
)
cli.run(
[
"--flake",
str(test_flake_with_core.path),
"secrets",
"machines",
"add",
"vm1",
age_keys[1].pubkey,
test_flake_with_core.name,
]
)
monkeypatch.setenv("SOPS_NIX_SECRET", age_keys[0].privkey)
cli.run(["secrets", "set", "vm1-age.key", test_flake_with_core.name])
cli.run(
["--flake", str(test_flake_with_core.path), "secrets", "set", "vm1-age.key"]
)
flake = test_flake_with_core.path.joinpath("flake.nix")
host = host_group.hosts[0]
@ -51,7 +55,7 @@ def test_secrets_upload(
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
flake.write_text(new_text)
cli.run(["secrets", "upload", "vm1", test_flake_with_core.name])
cli.run(["--flake", str(test_flake_with_core.path), "secrets", "upload", "vm1"])
# the flake defines this path as the location where the sops key should be installed
sops_key = test_flake_with_core.path.joinpath("key.txt")

View File

@ -92,12 +92,13 @@ def test_create_local(
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli()
cmd = [
"--flake",
str(flake_with_vm_with_secrets.path),
"secrets",
"users",
"add",
"user1",
age_keys[0].pubkey,
flake_with_vm_with_secrets.name,
]
cli.run(cmd)

View File

@ -38,7 +38,6 @@ def test_create(
"add",
"user1",
age_keys[0].pubkey,
test_flake_with_core.name,
]
)
cli.run(["vms", "create", "vm1", test_flake_with_core.name])
cli.run(["vms", "create", "vm1"])

View File

@ -0,0 +1,8 @@
import deal
from clan_cli.task_manager import get_task
@deal.cases(get_task)
def test_get_task(case: deal.TestCase) -> None:
case()

View File

@ -14,6 +14,7 @@
inherit (config.packages) tea-create-pr;
};
nix-unit = pkgs.callPackage ./nix-unit { };
meshname = pkgs.callPackage ./meshname { };
inherit (pkgs.callPackages ./node-packages { }) prettier-plugin-tailwindcss;
};
};

30
pkgs/meshname/default.nix Normal file
View File

@ -0,0 +1,30 @@
{ lib
, buildGoModule
, fetchFromGitHub
}:
buildGoModule {
pname = "meshname";
version = "unstable-2023-11-08";
src = fetchFromGitHub {
owner = "Mic92";
repo = "meshname";
rev = "b3c0ec1cafcb91ae7801b139777ff5ffad4c8fed";
hash = "sha256-uPon66nc5vw2QbbrNPXcCkO7T0l0foYovyR1adL9JBg=";
};
subPackages = [ "cmd/meshnamed" ];
vendorHash = "sha256-kiNxB2R3Z6Z/Resr3r4jKCImVhyoOY55dEiV+JRUjDk=";
ldflags = [ "-s" "-w" ];
meta = with lib; {
description = "Meshname, a universal naming system for all IPv6-based mesh networks, including CJDNS and Yggdrasil";
homepage = "https://github.com/Mic92/meshname";
license = licenses.mit;
maintainers = with maintainers; [ mic92 ];
mainProgram = "meshnamed";
};
}

View File

@ -1,5 +1,5 @@
{ fetchzip }:
fetchzip {
url = "https://git.clan.lol/api/packages/clan/generic/ui/0hmpxzvxx9f4ic593hcw6gf7zf2ycwdw5bcxmfab3mkwb1s0yh1h/assets.tar.gz";
sha256 = "0hmpxzvxx9f4ic593hcw6gf7zf2ycwdw5bcxmfab3mkwb1s0yh1h";
url = "https://git.clan.lol/api/packages/clan/generic/ui/18kbw5nhyzawcn4jw4kmggfszg854sxdxpf8ghcciblxs66wif0i/assets.tar.gz";
sha256 = "18kbw5nhyzawcn4jw4kmggfszg854sxdxpf8ghcciblxs66wif0i";
}

View File

@ -1,4 +1,4 @@
import { setMachineSchema } from "@/api/machine/machine";
import { getMachineSchema } from "@/api/machine/machine";
import { useListClanModules } from "@/api/modules/modules";
import {
Alert,
@ -49,7 +49,7 @@ const updateSchema = ({
setSchemaError,
}: IupdateSchema) => {
formHooks.setValue("isSchemaLoading", true);
setMachineSchema(clanName, "example_machine", {
getMachineSchema(clanName, {
clanImports: modules,
})
.then((response) => {
@ -102,11 +102,12 @@ export default function ClanModules(props: ClanModulesProps) {
const [schemaError, setSchemaError] = useState<string | null>(null);
const selectedModules = formHooks.watch("modules");
useEffect(() => {
updateSchema({
clanName,
modules: formHooks.watch("modules"),
formHooks,
setSchemaError,
getMachineSchema(clanName, {
clanImports: [],
}).then((response) => {
if (response.statusText == "OK") {
formHooks.setValue("schema", response.data.schema);
}
});
// Only re-run if global clanName has changed
@ -122,13 +123,19 @@ export default function ClanModules(props: ClanModulesProps) {
} = event;
const newValue = typeof value === "string" ? value.split(",") : value;
formHooks.setValue("modules", newValue);
updateSchema({
clanName,
modules: newValue,
formHooks,
setSchemaError,
});
getMachineSchema(clanName, {
clanImports: newValue,
})
.then((response) => {
if (response.statusText == "OK") {
formHooks.setValue("schema", response.data.schema);
}
})
.catch((error) => {
formHooks.setValue("schema", {});
console.error({ error });
toast.error(`${error.message}`);
});
};
return (