get rid of pydantic in cli

This commit is contained in:
Jörg Thalheim 2023-11-21 11:36:50 +01:00 committed by Mic92
parent f9b3fe0765
commit 7afaaf8c5a
10 changed files with 11 additions and 264 deletions

View File

@ -3,20 +3,14 @@ import argparse
from pathlib import Path
from typing import Dict
from pydantic import AnyUrl
from pydantic.tools import parse_obj_as
from ..async_cmd import CmdOut, run, runforcli
from ..errors import ClanError
from ..nix import nix_command, nix_shell
DEFAULT_URL: AnyUrl = parse_obj_as(
AnyUrl,
"git+https://git.clan.lol/clan/clan-core?new-clan",
)
DEFAULT_URL: str = "git+https://git.clan.lol/clan/clan-core?new-clan"
async def create_flake(directory: Path, url: AnyUrl) -> Dict[str, CmdOut]:
async def create_flake(directory: Path, url: str) -> Dict[str, CmdOut]:
if not directory.exists():
directory.mkdir()
else:

View File

@ -5,8 +5,6 @@ import tempfile
from pathlib import Path
from typing import Any
from pydantic import AnyUrl
from .dirs import nixpkgs_flake, nixpkgs_source
@ -14,7 +12,7 @@ def nix_command(flags: list[str]) -> list[str]:
return ["nix", "--extra-experimental-features", "nix-command flakes"] + flags
def nix_flake_show(flake_url: AnyUrl | Path) -> list[str]:
def nix_flake_show(flake_url: str | Path) -> list[str]:
return nix_command(
[
"flake",

View File

@ -1,9 +0,0 @@
import logging
from pathlib import Path
from typing import Union
from pydantic import AnyUrl
log = logging.getLogger(__name__)
FlakeUrl = Union[AnyUrl, Path]

View File

@ -1,16 +1,16 @@
import argparse
import asyncio
import json
from dataclasses import dataclass
from pathlib import Path
from pydantic import AnyUrl, BaseModel
from ..async_cmd import run
from ..nix import nix_config, nix_eval
class VmConfig(BaseModel):
flake_url: AnyUrl | Path
@dataclass
class VmConfig:
flake_url: str | Path
flake_attr: str
cores: int
@ -18,7 +18,7 @@ class VmConfig(BaseModel):
graphics: bool
async def inspect_vm(flake_url: AnyUrl | Path, flake_attr: str) -> VmConfig:
async def inspect_vm(flake_url: str | Path, flake_attr: str) -> VmConfig:
config = nix_config()
system = config["system"]

View File

@ -1,6 +1,6 @@
import logging
from pydantic import AnyUrl, BaseModel, Extra
from pydantic import AnyUrl, BaseModel, Extra, parse_obj_as
from ..flakes.create import DEFAULT_URL
@ -8,7 +8,7 @@ log = logging.getLogger(__name__)
class FlakeCreateInput(BaseModel):
url: AnyUrl = DEFAULT_URL
url: AnyUrl = parse_obj_as(AnyUrl, DEFAULT_URL)
class MachineConfig(BaseModel):

View File

@ -5,7 +5,6 @@ from pydantic import BaseModel, Extra, Field
from ..async_cmd import CmdOut
from ..task_manager import TaskStatus
from ..vms.inspect import VmConfig
class Status(Enum):
@ -62,10 +61,6 @@ class FlakeAttrResponse(BaseModel):
flake_attrs: list[str]
class VmInspectResponse(BaseModel):
config: VmConfig
class FlakeAction(BaseModel):
id: str
uri: str

View File

@ -7,7 +7,7 @@ from fastapi.staticfiles import StaticFiles
from .assets import asset_path
from .error_handlers import clan_error_handler
from .routers import clan_modules, flake, health, machines, root, vms
from .routers import clan_modules, flake, health, machines, root
from .settings import settings
from .tags import tags_metadata
@ -31,7 +31,6 @@ def setup_app() -> FastAPI:
app.include_router(flake.router)
app.include_router(health.router)
app.include_router(machines.router)
app.include_router(vms.router)
# Needs to be last in register. Because of wildcard route
app.include_router(root.router)

View File

@ -1,68 +0,0 @@
import logging
from pathlib import Path
from typing import Annotated, Iterator
from uuid import UUID
from fastapi import APIRouter, Body, status
from fastapi.exceptions import HTTPException
from fastapi.responses import StreamingResponse
from pydantic import AnyUrl
from clan_cli.webui.routers.flake import get_attrs
from ...task_manager import get_task
from ...vms import create, inspect
from ..api_outputs import (
VmConfig,
VmCreateResponse,
VmInspectResponse,
VmStatusResponse,
)
from ..tags import Tags
log = logging.getLogger(__name__)
router = APIRouter()
# TODO: Check for directory traversal
@router.post("/api/vms/inspect", tags=[Tags.vm])
async def inspect_vm(
flake_url: Annotated[AnyUrl | Path, Body()], flake_attr: Annotated[str, Body()]
) -> VmInspectResponse:
config = await inspect.inspect_vm(flake_url, flake_attr)
return VmInspectResponse(config=config)
@router.get("/api/vms/{uuid}/status", tags=[Tags.vm])
async def get_vm_status(uuid: UUID) -> VmStatusResponse:
task = get_task(uuid)
log.debug(msg=f"error: {task.error}, task.status: {task.status}")
error = str(task.error) if task.error is not None else None
return VmStatusResponse(status=task.status, error=error)
@router.get("/api/vms/{uuid}/logs", tags=[Tags.vm])
async def get_vm_logs(uuid: UUID) -> StreamingResponse:
# Generator function that yields log lines as they are available
def stream_logs() -> Iterator[str]:
task = get_task(uuid)
yield from task.log_lines()
return StreamingResponse(
content=stream_logs(),
media_type="text/plain",
)
# TODO: Check for directory traversal
@router.post("/api/vms/create", tags=[Tags.vm])
async def create_vm(vm: Annotated[VmConfig, Body()]) -> VmCreateResponse:
flake_attrs = await get_attrs(vm.flake_url)
if vm.flake_attr not in flake_attrs:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Provided attribute '{vm.flake_attr}' does not exist.",
)
task = create.create_vm(vm)
return VmCreateResponse(uuid=str(task.uuid))

View File

@ -1,32 +0,0 @@
import pytest
from api import TestClient
from fixtures_flakes import FlakeForTest
@pytest.mark.impure
def test_inspect(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
response = api.post(
"/api/vms/inspect",
json=dict(flake_url=str(test_flake_with_core.path), flake_attr="vm1"),
)
# print(f"SLEEPING FOR EVER: {99999}", file=sys.stderr)
# time.sleep(99999)
assert response.status_code == 200, f"Failed to inspect vm: {response.text}"
config = response.json()["config"]
assert config.get("flake_attr") == "vm1"
assert config.get("cores") == 1
assert config.get("memory_size") == 1024
assert config.get("graphics") is False
def test_incorrect_uuid(api: TestClient) -> None:
uuid_endpoints = [
"/api/vms/{}/status",
"/api/vms/{}/logs",
]
for endpoint in uuid_endpoints:
response = api.get(endpoint.format("1234"))
assert response.status_code == 422, f"Failed to get vm status: {response.text}"

View File

@ -1,130 +0,0 @@
import os
from pathlib import Path
from typing import TYPE_CHECKING, Iterator
import pytest
from api import TestClient
from cli import Cli
from fixtures_flakes import FlakeForTest, create_flake
from httpx import SyncByteStream
from pydantic import AnyUrl
from root import CLAN_CORE
if TYPE_CHECKING:
from age_keys import KeyPair
@pytest.fixture
def flake_with_vm_with_secrets(
monkeypatch: pytest.MonkeyPatch, temporary_home: Path
) -> Iterator[FlakeForTest]:
yield from create_flake(
monkeypatch,
temporary_home,
"test_flake_with_core_dynamic_machines",
CLAN_CORE,
machines=["vm_with_secrets"],
)
@pytest.fixture
def remote_flake_with_vm_without_secrets(
monkeypatch: pytest.MonkeyPatch, temporary_home: Path
) -> Iterator[FlakeForTest]:
yield from create_flake(
monkeypatch,
temporary_home,
"test_flake_with_core_dynamic_machines",
CLAN_CORE,
machines=["vm_without_secrets"],
remote=True,
)
def generic_create_vm_test(api: TestClient, flake: Path | AnyUrl, vm: str) -> None:
print(f"flake_url: {flake} ")
response = api.post(
"/api/vms/create",
json=dict(
flake_url=str(flake),
flake_attr=vm,
cores=1,
memory_size=1024,
graphics=False,
),
)
assert response.status_code == 200, "Failed to create vm"
uuid = response.json()["uuid"]
assert len(uuid) == 36
assert uuid.count("-") == 4
response = api.get(f"/api/vms/{uuid}/status")
assert response.status_code == 200, "Failed to get vm status"
response = api.get(f"/api/vms/{uuid}/logs")
print("=========VM LOGS==========")
assert isinstance(response.stream, SyncByteStream)
for line in response.stream:
print(line.decode("utf-8"))
print("=========END LOGS==========")
assert response.status_code == 200, "Failed to get vm logs"
print("Get /api/vms/{uuid}/status")
response = api.get(f"/api/vms/{uuid}/status")
print("Finished Get /api/vms/{uuid}/status")
assert response.status_code == 200, "Failed to get vm status"
data = response.json()
assert (
data["status"] == "FINISHED"
), f"Expected to be finished, but got {data['status']} ({data})"
@pytest.mark.skipif(not os.path.exists("/dev/kvm"), reason="Requires KVM")
@pytest.mark.impure
def test_create_local(
api: TestClient,
monkeypatch: pytest.MonkeyPatch,
flake_with_vm_with_secrets: FlakeForTest,
age_keys: list["KeyPair"],
) -> None:
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli()
cmd = [
"--flake",
str(flake_with_vm_with_secrets.path),
"secrets",
"users",
"add",
"user1",
age_keys[0].pubkey,
]
cli.run(cmd)
generic_create_vm_test(api, flake_with_vm_with_secrets.path, "vm_with_secrets")
@pytest.mark.skipif(not os.path.exists("/dev/kvm"), reason="Requires KVM")
@pytest.mark.impure
def test_create_remote(
api: TestClient,
monkeypatch: pytest.MonkeyPatch,
remote_flake_with_vm_without_secrets: FlakeForTest,
) -> None:
generic_create_vm_test(
api, remote_flake_with_vm_without_secrets.path, "vm_without_secrets"
)
# TODO: We need a test that creates the same VM twice, and checks that the second time it fails
# TODO: Democlan needs a machine called testVM, which is headless and gets executed by this test below
# pytest -n0 -s tests/test_vms_api_create.py::test_create_from_democlan
# @pytest.mark.skipif(not os.path.exists("/dev/kvm"), reason="Requires KVM")
# @pytest.mark.impure
# def test_create_from_democlan(
# api: TestClient,
# test_democlan_url: AnyUrl) -> None:
# generic_create_vm_test(
# api, test_democlan_url, "defaultVM"
# )