Merge pull request 'api/vm/create: start vm' (#327) from lassulus-start-vm into main
All checks were successful
checks-impure / test (push) Successful in 9s
checks / test (push) Successful in 23s
assets1 / test (push) Successful in 6s

Reviewed-on: #327
This commit is contained in:
Mic92 2023-09-27 09:47:50 +00:00
commit 041a98fae4
12 changed files with 388 additions and 64 deletions

17
pkgs/clan-cli/.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,17 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Clan Webui",
"type": "python",
"request": "launch",
"module": "clan_cli.webui",
"justMyCode": false,
"args": [ "--reload", "--no-open", "--log-level", "debug" ],
}
]
}

15
pkgs/clan-cli/.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,15 @@
{
"python.testing.pytestArgs": [
// Coverage is not supported by vscode:
// https://github.com/Microsoft/vscode-python/issues/693
// Note that this will make pytest fail if pytest-cov is not installed,
// if that's the case, then this option needs to be be removed (overrides
// can be set at a workspace level, it's up to you to decide what's the
// best approach). You might also prefer to only set this option
// per-workspace (wherever coverage is used).
"--no-cov",
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
}

View File

@ -28,6 +28,32 @@ To start a local developement environment instead, use the `--dev` flag:
This will spawn two webserver, a python one to for the api and a nodejs one that rebuilds the ui on the fly.
## Run webui directly
Useful for vscode run and debug option
```bash
python -m clan_cli.webui --reload --no-open
```
Add this `launch.json` to your .vscode directory to have working breakpoints in your vscode editor.
```json
{
"version": "0.2.0",
"configurations": [
{
"name": "Clan Webui",
"type": "python",
"request": "launch",
"module": "clan_cli.webui",
"justMyCode": true,
"args": ["--reload", "--no-open", "--log-level", "debug"]
}
]
}
```
## Run locally single-threaded for debugging
By default tests run in parallel using pytest-parallel.

View File

@ -0,0 +1,37 @@
import logging
from typing import Any
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
green = "\u001b[32m"
blue = "\u001b[34m"
def get_formatter(color: str) -> logging.Formatter:
reset = "\x1b[0m"
return logging.Formatter(
f"{color}%(levelname)s{reset}:(%(filename)s:%(lineno)d): %(message)s"
)
FORMATTER = {
logging.DEBUG: get_formatter(blue),
logging.INFO: get_formatter(green),
logging.WARNING: get_formatter(yellow),
logging.ERROR: get_formatter(red),
logging.CRITICAL: get_formatter(bold_red),
}
class CustomFormatter(logging.Formatter):
def format(self, record: Any) -> str:
return FORMATTER[record.levelno].format(record)
def register(level: Any) -> None:
ch = logging.StreamHandler()
ch.setLevel(level)
ch.setFormatter(CustomFormatter())
logging.basicConfig(level=level, handlers=[ch])

View File

@ -1,14 +1,19 @@
import logging
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.routing import APIRoute
from fastapi.staticfiles import StaticFiles
from .. import custom_logger
from .assets import asset_path
from .routers import flake, health, machines, root, vms
origins = [
"http://localhost:3000",
]
# Logging setup
log = logging.getLogger(__name__)
def setup_app() -> FastAPI:
@ -23,8 +28,11 @@ def setup_app() -> FastAPI:
app.include_router(flake.router)
app.include_router(health.router)
app.include_router(machines.router)
app.include_router(root.router)
app.include_router(vms.router)
# Needs to be last in register. Because of wildcard route
app.include_router(root.router)
app.add_exception_handler(vms.NixBuildException, vms.nix_build_exception_handler)
app.mount("/static", StaticFiles(directory=asset_path()), name="static")
@ -32,7 +40,16 @@ def setup_app() -> FastAPI:
for route in app.routes:
if isinstance(route, APIRoute):
route.operation_id = route.name # in this case, 'read_items'
log.debug(f"Registered route: {route}")
return app
# TODO: How do I get the log level from the command line in here?
custom_logger.register(logging.DEBUG)
app = setup_app()
for i in app.exception_handlers.items():
log.info(f"Registered exception handler: {i}")
log.warn("log warn")
log.debug("log debug")

View File

@ -1,3 +1,5 @@
# Logging setup
import logging
from typing import Annotated
from fastapi import APIRouter, Body
@ -19,6 +21,7 @@ from ..schemas import (
Status,
)
log = logging.getLogger(__name__)
router = APIRouter()
@ -38,7 +41,7 @@ async def create_machine(machine: Annotated[MachineCreate, Body()]) -> MachineRe
@router.get("/api/machines/{name}")
async def get_machine(name: str) -> MachineResponse:
print("TODO")
log.error("TODO")
return MachineResponse(machine=Machine(name=name, status=Status.UNKNOWN))

View File

@ -1,18 +1,38 @@
import asyncio
import json
import logging
import shlex
from typing import Annotated, AsyncIterator
from typing import Annotated, Iterator
from uuid import UUID
from fastapi import APIRouter, Body, HTTPException, Request, status
from fastapi import APIRouter, BackgroundTasks, Body, HTTPException, Request, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, StreamingResponse
from ...nix import nix_build, nix_eval
from ..schemas import VmConfig, VmInspectResponse
from ..schemas import VmConfig, VmCreateResponse, VmInspectResponse, VmStatusResponse
from ..task_manager import BaseTask, get_task, register_task
log = logging.getLogger(__name__)
router = APIRouter()
def nix_inspect_vm_cmd(machine: str, flake_url: str) -> list[str]:
return nix_eval(
[
f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.clan.vm.config"
]
)
def nix_build_vm_cmd(machine: str, flake_url: str) -> list[str]:
return nix_build(
[
f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.build.vm"
]
)
class NixBuildException(HTTPException):
def __init__(self, msg: str, loc: list = ["body", "flake_attr"]):
detail = [
@ -27,36 +47,50 @@ class NixBuildException(HTTPException):
)
class BuildVmTask(BaseTask):
def __init__(self, uuid: UUID, vm: VmConfig) -> None:
super().__init__(uuid)
self.vm = vm
def run(self) -> None:
try:
self.log.debug(f"BuildVM with uuid {self.uuid} started")
cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url)
proc = self.run_cmd(cmd)
self.log.debug(f"stdout: {proc.stdout}")
vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm"
self.log.debug(f"vm_path: {vm_path}")
self.run_cmd([vm_path])
self.finished = True
except Exception as e:
self.failed = True
self.finished = True
log.exception(e)
def nix_build_exception_handler(
request: Request, exc: NixBuildException
) -> JSONResponse:
log.error("NixBuildException: %s", exc)
return JSONResponse(
status_code=exc.status_code,
content=jsonable_encoder(dict(detail=exc.detail)),
)
def nix_inspect_vm(machine: str, flake_url: str) -> list[str]:
return nix_eval(
[
f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.clan.vm.config"
]
)
def nix_build_vm(machine: str, flake_url: str) -> list[str]:
return nix_build(
[
f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.build.vm"
]
)
##################################
# #
# ======== VM ROUTES ======== #
# #
##################################
@router.post("/api/vms/inspect")
async def inspect_vm(
flake_url: Annotated[str, Body()], flake_attr: Annotated[str, Body()]
) -> VmInspectResponse:
cmd = nix_inspect_vm(flake_attr, flake_url=flake_url)
cmd = nix_inspect_vm_cmd(flake_attr, flake_url=flake_url)
proc = await asyncio.create_subprocess_exec(
cmd[0],
*cmd[1:],
@ -81,33 +115,43 @@ command output:
)
async def vm_build(vm: VmConfig) -> AsyncIterator[str]:
cmd = nix_build_vm(vm.flake_attr, flake_url=vm.flake_url)
proc = await asyncio.create_subprocess_exec(
cmd[0],
*cmd[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
@router.get("/api/vms/{uuid}/status")
async def get_status(uuid: UUID) -> VmStatusResponse:
task = get_task(uuid)
return VmStatusResponse(running=not task.finished, status=0)
@router.get("/api/vms/{uuid}/logs")
async def get_logs(uuid: UUID) -> StreamingResponse:
# Generator function that yields log lines as they are available
def stream_logs() -> Iterator[str]:
task = get_task(uuid)
for proc in task.procs:
if proc.done:
log.debug("stream logs and proc is done")
for line in proc.stderr:
yield line + "\n"
for line in proc.stdout:
yield line + "\n"
continue
while True:
out = proc.output
line = out.get()
if line is None:
log.debug("stream logs and line is None")
break
yield line
return StreamingResponse(
content=stream_logs(),
media_type="text/plain",
)
assert proc.stdout is not None and proc.stderr is not None
async for line in proc.stdout:
yield line.decode("utf-8", "ignore")
stderr = ""
async for line in proc.stderr:
stderr += line.decode("utf-8", "ignore")
res = await proc.wait()
if res != 0:
raise NixBuildException(
f"""
Failed to build vm from '{vm.flake_url}#{vm.flake_attr}'.
command: {shlex.join(cmd)}
exit code: {res}
command output:
{stderr}
"""
)
@router.post("/api/vms/create")
async def create_vm(vm: Annotated[VmConfig, Body()]) -> StreamingResponse:
return StreamingResponse(vm_build(vm))
async def create_vm(
vm: Annotated[VmConfig, Body()], background_tasks: BackgroundTasks
) -> VmCreateResponse:
uuid = register_task(BuildVmTask, vm)
return VmCreateResponse(uuid=str(uuid))

View File

@ -44,6 +44,15 @@ class VmConfig(BaseModel):
graphics: bool
class VmStatusResponse(BaseModel):
status: int
running: bool
class VmCreateResponse(BaseModel):
uuid: str
class VmInspectResponse(BaseModel):
config: VmConfig

View File

@ -12,7 +12,7 @@ from typing import Iterator
# XXX: can we dynamically load this using nix develop?
from uvicorn import run
logger = logging.getLogger(__name__)
log = logging.getLogger(__name__)
def defer_open_browser(base_url: str) -> None:
@ -27,7 +27,7 @@ def defer_open_browser(base_url: str) -> None:
@contextmanager
def spawn_node_dev_server(host: str, port: int) -> Iterator[None]:
logger.info("Starting node dev server...")
log.info("Starting node dev server...")
path = Path(__file__).parent.parent.parent.parent / "ui"
with subprocess.Popen(
[
@ -87,5 +87,6 @@ def start_server(args: argparse.Namespace) -> None:
port=args.port,
log_level=args.log_level,
reload=args.reload,
access_log=args.log_level == "debug",
headers=headers,
)

View File

@ -0,0 +1,119 @@
import logging
import os
import queue
import select
import shlex
import subprocess
import threading
from typing import Any
from uuid import UUID, uuid4
class CmdState:
def __init__(self, proc: subprocess.Popen) -> None:
global LOOP
self.proc: subprocess.Popen = proc
self.stdout: list[str] = []
self.stderr: list[str] = []
self.output: queue.SimpleQueue = queue.SimpleQueue()
self.returncode: int | None = None
self.done: bool = False
class BaseTask(threading.Thread):
def __init__(self, uuid: UUID) -> None:
# calling parent class constructor
threading.Thread.__init__(self)
# constructor
self.uuid: UUID = uuid
self.log = logging.getLogger(__name__)
self.procs: list[CmdState] = []
self.failed: bool = False
self.finished: bool = False
def run(self) -> None:
self.finished = True
def run_cmd(self, cmd: list[str]) -> CmdState:
cwd = os.getcwd()
self.log.debug(f"Working directory: {cwd}")
self.log.debug(f"Running command: {shlex.join(cmd)}")
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
# shell=True,
cwd=cwd,
)
self.procs.append(CmdState(p))
p_state = self.procs[-1]
while p.poll() is None:
# Check if stderr is ready to be read from
rlist, _, _ = select.select([p.stderr, p.stdout], [], [], 0)
if p.stderr in rlist:
assert p.stderr is not None
line = p.stderr.readline()
if line != "":
p_state.stderr.append(line.strip("\n"))
self.log.debug(f"stderr: {line}")
p_state.output.put(line)
if p.stdout in rlist:
assert p.stdout is not None
line = p.stdout.readline()
if line != "":
p_state.stdout.append(line.strip("\n"))
self.log.debug(f"stdout: {line}")
p_state.output.put(line)
p_state.returncode = p.returncode
p_state.output.put(None)
p_state.done = True
if p.returncode != 0:
raise RuntimeError(f"Failed to run command: {shlex.join(cmd)}")
self.log.debug("Successfully ran command")
return p_state
class TaskPool:
def __init__(self) -> None:
self.lock: threading.RLock = threading.RLock()
self.pool: dict[UUID, BaseTask] = {}
def __getitem__(self, uuid: UUID) -> BaseTask:
with self.lock:
return self.pool[uuid]
def __setitem__(self, uuid: UUID, task: BaseTask) -> None:
with self.lock:
if uuid in self.pool:
raise KeyError(f"Task with uuid {uuid} already exists")
if type(uuid) is not UUID:
raise TypeError("uuid must be of type UUID")
self.pool[uuid] = task
POOL: TaskPool = TaskPool()
def get_task(uuid: UUID) -> BaseTask:
global POOL
return POOL[uuid]
def register_task(task: type, *args: Any) -> UUID:
global POOL
if not issubclass(task, BaseTask):
raise TypeError("task must be a subclass of BaseTask")
uuid = uuid4()
inst_task = task(uuid, *args)
POOL[uuid] = inst_task
inst_task.start()
return uuid

View File

@ -17,6 +17,16 @@
system.stateVersion = lib.version;
clan.networking.zerotier.controller.enable = true;
systemd.services.shutdown-after-boot = {
enable = true;
wantedBy = [ "multi-user.target" ];
after = [ "multi-user.target" ];
script = ''
#!/usr/bin/env bash
shutdown -h now
'';
};
};
};
};

View File

@ -2,24 +2,25 @@ from pathlib import Path
import pytest
from api import TestClient
from httpx import SyncByteStream
@pytest.mark.impure
def test_inspect(api: TestClient, test_flake_with_core: Path) -> None:
response = api.post(
"/api/vms/inspect",
json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"),
)
assert response.status_code == 200, "Failed to inspect vm"
config = response.json()["config"]
assert config.get("flake_attr") == "vm1"
assert config.get("cores") == 1
assert config.get("memory_size") == 1024
assert config.get("graphics") is True
# @pytest.mark.impure
# def test_inspect(api: TestClient, test_flake_with_core: Path) -> None:
# response = api.post(
# "/api/vms/inspect",
# json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"),
# )
# assert response.status_code == 200, "Failed to inspect vm"
# config = response.json()["config"]
# assert config.get("flake_attr") == "vm1"
# assert config.get("cores") == 1
# assert config.get("memory_size") == 1024
# assert config.get("graphics") is True
@pytest.mark.impure
def test_create(api: TestClient, test_flake_with_core: Path) -> None:
print(f"flake_url: {test_flake_with_core} ")
response = api.post(
"/api/vms/create",
json=dict(
@ -30,4 +31,29 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None:
graphics=True,
),
)
assert response.status_code == 200, "Failed to inspect vm"
assert response.status_code == 200, "Failed to create vm"
uuid = response.json()["uuid"]
assert len(uuid) == 36
assert uuid.count("-") == 4
response = api.get(f"/api/vms/{uuid}/status")
assert response.status_code == 200, "Failed to get vm status"
response = api.get(f"/api/vms/{uuid}/logs")
print("=========FLAKE LOGS==========")
assert isinstance(response.stream, SyncByteStream)
for line in response.stream:
assert line != b"", "Failed to get vm logs"
print(line.decode("utf-8"), end="")
print("=========END LOGS==========")
assert response.status_code == 200, "Failed to get vm logs"
response = api.get(f"/api/vms/{uuid}/logs")
assert isinstance(response.stream, SyncByteStream)
print("=========VM LOGS==========")
for line in response.stream:
assert line != b"", "Failed to get vm logs"
print(line.decode("utf-8"), end="")
print("=========END LOGS==========")
assert response.status_code == 200, "Failed to get vm logs"