Merge pull request 'clan_uri: Support all other formats by just differentiating between remote and local' (#680) from Qubasa-main into main
This commit is contained in:
commit
8870351737
@ -1,17 +1,18 @@
|
||||
import json
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.nix import nix_eval
|
||||
|
||||
from .cmd import run
|
||||
|
||||
|
||||
def get_clan_module_names(
|
||||
flake_dir: Path,
|
||||
) -> tuple[list[str], str | None]:
|
||||
) -> list[str]:
|
||||
"""
|
||||
Get the list of clan modules from the clan-core flake input
|
||||
"""
|
||||
proc = subprocess.run(
|
||||
proc = run(
|
||||
nix_eval(
|
||||
[
|
||||
"--impure",
|
||||
@ -25,11 +26,8 @@ def get_clan_module_names(
|
||||
""",
|
||||
],
|
||||
),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=flake_dir,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
return [], proc.stderr
|
||||
|
||||
module_names = json.loads(proc.stdout)
|
||||
return module_names, None
|
||||
return module_names
|
||||
|
@ -10,38 +10,24 @@ from typing import Self
|
||||
from .errors import ClanError
|
||||
|
||||
|
||||
def url_ok(url: str) -> None:
|
||||
# Create a request object with the URL and the HEAD method
|
||||
req = urllib.request.Request(url, method="HEAD")
|
||||
try:
|
||||
# Open the URL and get the response object
|
||||
res = urllib.request.urlopen(req)
|
||||
|
||||
# Return True if the status code is 200 (OK)
|
||||
if not res.getcode() == 200:
|
||||
raise ClanError(f"URL has status code: {res.status_code}")
|
||||
except urllib.error.URLError as ex:
|
||||
raise ClanError(f"URL error: {ex}")
|
||||
|
||||
|
||||
# Define an enum with different members that have different values
|
||||
class ClanScheme(Enum):
|
||||
# Use the dataclass decorator to add fields and methods to the members
|
||||
@member
|
||||
@dataclass
|
||||
class HTTP:
|
||||
class REMOTE:
|
||||
url: str # The url field holds the HTTP URL
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"HTTP({self.url})" # The __str__ method returns a custom string representation
|
||||
return f"REMOTE({self.url})" # The __str__ method returns a custom string representation
|
||||
|
||||
@member
|
||||
@dataclass
|
||||
class FILE:
|
||||
class LOCAL:
|
||||
path: Path # The path field holds the local path
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"FILE({self.path})" # The __str__ method returns a custom string representation
|
||||
return f"LOCAL({self.path})" # The __str__ method returns a custom string representation
|
||||
|
||||
|
||||
# Parameters defined here will be DELETED from the nested uri
|
||||
@ -96,26 +82,16 @@ class ClanURI:
|
||||
)
|
||||
|
||||
match comb:
|
||||
case ("http" | "https", _, _, _, _, _):
|
||||
self.scheme = ClanScheme.HTTP.value(self._components.geturl()) # type: ignore
|
||||
case ("file", "", path, "", "", "") | ("", "", path, "", "", ""): # type: ignore
|
||||
self.scheme = ClanScheme.FILE.value(Path(path))
|
||||
self.scheme = ClanScheme.LOCAL.value(Path(path).resolve()) # type: ignore
|
||||
case _:
|
||||
raise ClanError(f"Unsupported uri components: {comb}")
|
||||
|
||||
def check_exits(self) -> None:
|
||||
match self.scheme:
|
||||
case ClanScheme.FILE.value(path):
|
||||
if not path.exists():
|
||||
raise ClanError(f"File does not exist: {path}")
|
||||
case ClanScheme.HTTP.value(url):
|
||||
return url_ok(url)
|
||||
self.scheme = ClanScheme.REMOTE.value(self._components.geturl()) # type: ignore
|
||||
|
||||
def get_internal(self) -> str:
|
||||
match self.scheme:
|
||||
case ClanScheme.FILE.value(path):
|
||||
case ClanScheme.LOCAL.value(path):
|
||||
return str(path)
|
||||
case ClanScheme.HTTP.value(url):
|
||||
case ClanScheme.REMOTE.value(url):
|
||||
return url
|
||||
case _:
|
||||
raise ClanError(f"Unsupported uri components: {self.scheme}")
|
||||
|
@ -1,11 +1,11 @@
|
||||
import logging
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from subprocess import PIPE, Popen
|
||||
from typing import Any, NamedTuple
|
||||
|
||||
from .custom_logger import get_caller
|
||||
from .errors import ClanError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -17,40 +17,47 @@ class CmdOut(NamedTuple):
|
||||
cwd: Path | None = None
|
||||
|
||||
|
||||
def run(cmd: list[str], cwd: Path | None = None) -> CmdOut:
|
||||
cwd_res = None
|
||||
if cwd is not None:
|
||||
if not cwd.exists():
|
||||
raise ClanError(f"Working directory {cwd} does not exist")
|
||||
if not cwd.is_dir():
|
||||
raise ClanError(f"Working directory {cwd} is not a directory")
|
||||
cwd_res = cwd.resolve()
|
||||
log.debug(
|
||||
f"Command: {shlex.join(cmd)}\nWorking directory: {cwd_res}\nCaller : {get_caller()}"
|
||||
def run(cmd: list[str], cwd: Path = Path.cwd()) -> CmdOut:
|
||||
# Start the subprocess
|
||||
process = subprocess.Popen(
|
||||
cmd, cwd=str(cwd), stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
)
|
||||
proc = Popen(
|
||||
args=cmd,
|
||||
stderr=PIPE,
|
||||
stdout=PIPE,
|
||||
text=True,
|
||||
cwd=cwd_res,
|
||||
)
|
||||
stdout, stderr = proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
# Initialize empty strings for output and error
|
||||
output = b""
|
||||
error = b""
|
||||
|
||||
# Iterate over the stdout stream
|
||||
for c in iter(lambda: process.stdout.read(1), b""): # type: ignore
|
||||
# Convert bytes to string and append to output
|
||||
output += c
|
||||
# Write to terminal
|
||||
sys.stdout.buffer.write(c)
|
||||
# Iterate over the stderr stream
|
||||
for c in iter(lambda: process.stderr.read(1), b""): # type: ignore
|
||||
# Convert bytes to string and append to error
|
||||
error += c
|
||||
# Write to terminal
|
||||
sys.stderr.buffer.write(c)
|
||||
# Wait for the subprocess to finish
|
||||
process.wait()
|
||||
|
||||
output_str = output.decode("utf-8")
|
||||
error_str = error.decode("utf-8")
|
||||
|
||||
if process.returncode != 0:
|
||||
raise ClanError(
|
||||
f"""
|
||||
command: {shlex.join(cmd)}
|
||||
working directory: {cwd_res}
|
||||
exit code: {proc.returncode}
|
||||
working directory: {cwd}
|
||||
exit code: {process.returncode}
|
||||
stderr:
|
||||
{stderr}
|
||||
{error_str}
|
||||
stdout:
|
||||
{stdout}
|
||||
{output_str}
|
||||
"""
|
||||
)
|
||||
|
||||
return CmdOut(stdout, stderr, cwd=cwd)
|
||||
return CmdOut(output_str, error_str, cwd=cwd)
|
||||
|
||||
|
||||
def runforcli(func: Callable[..., dict[str, CmdOut]], *args: Any) -> None:
|
||||
|
@ -1,4 +1,3 @@
|
||||
import copy
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
@ -46,9 +45,7 @@ def user_gcroot_dir() -> Path:
|
||||
def specific_groot_dir(*, clan_name: str, flake_url: str) -> Path:
|
||||
# Always build icon so that we can symlink it to the gcroot
|
||||
gcroot_dir = user_gcroot_dir()
|
||||
# burl = base64.urlsafe_b64encode(flake_url.encode()).decode()
|
||||
burl = copy.copy(flake_url).replace("/", "_").replace(":", "_")
|
||||
burl = urllib.parse.quote_plus(burl)
|
||||
burl = urllib.parse.quote_plus(flake_url)
|
||||
clan_gcroot = gcroot_dir / f"{clan_name}-{burl}"
|
||||
|
||||
clan_gcroot.mkdir(parents=True, exist_ok=True)
|
||||
|
@ -65,7 +65,6 @@ def list_history() -> list[HistoryEntry]:
|
||||
|
||||
# TODO: Add all vm entries to history
|
||||
def add_history(uri: ClanURI) -> list[HistoryEntry]:
|
||||
uri.check_exits()
|
||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||
logs = list_history()
|
||||
found = False
|
||||
|
@ -30,7 +30,6 @@ def _locked_open(filename: str | Path, mode: str = "r") -> Generator:
|
||||
def write_history_file(data: Any) -> None:
|
||||
with _locked_open(user_history_file(), "w+") as f:
|
||||
f.write(json.dumps(data, cls=EnhancedJSONEncoder, indent=4))
|
||||
f.truncate()
|
||||
|
||||
|
||||
def read_history_file() -> list[dict]:
|
||||
|
@ -1,11 +1,9 @@
|
||||
import argparse
|
||||
import json
|
||||
import shlex
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from ..errors import ClanError
|
||||
from ..cmd import run
|
||||
from ..nix import nix_config, nix_eval
|
||||
|
||||
|
||||
@ -31,17 +29,8 @@ def inspect_vm(flake_url: str | Path, flake_attr: str) -> VmConfig:
|
||||
]
|
||||
)
|
||||
|
||||
proc = subprocess.run(cmd, check=True, text=True, stdout=subprocess.PIPE)
|
||||
assert proc.stdout is not None
|
||||
if proc.returncode != 0:
|
||||
raise ClanError(
|
||||
f"""
|
||||
command: {shlex.join(cmd)}
|
||||
exit code: {proc.returncode}
|
||||
stdout:
|
||||
{proc.stdout}
|
||||
"""
|
||||
)
|
||||
proc = run(cmd)
|
||||
|
||||
data = json.loads(proc.stdout)
|
||||
return VmConfig(flake_url=flake_url, flake_attr=flake_attr, **data)
|
||||
|
||||
|
@ -10,6 +10,7 @@ from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import IO
|
||||
|
||||
from ..cmd import run
|
||||
from ..dirs import module_root, specific_groot_dir
|
||||
from ..errors import ClanError
|
||||
from ..nix import nix_build, nix_config, nix_shell
|
||||
@ -20,9 +21,10 @@ log = logging.getLogger(__name__)
|
||||
|
||||
def get_qemu_version() -> list[int]:
|
||||
# Run the command and capture the output
|
||||
output = subprocess.check_output(["qemu-kvm", "--version"])
|
||||
res = run(["qemu-kvm", "--version"])
|
||||
|
||||
# Decode the output from bytes to string
|
||||
output_str = output.decode("utf-8")
|
||||
output_str = res.stdout
|
||||
# Split the output by newline and get the first line
|
||||
first_line = output_str.split("\n")[0]
|
||||
# Split the first line by space and get the third element
|
||||
@ -39,7 +41,7 @@ def graphics_options(vm: VmConfig) -> list[str]:
|
||||
|
||||
# Check if the version is greater than 8.1.3 to enable virtio audio
|
||||
# if get_qemu_version() > [8, 1, 3]:
|
||||
# common = ["-audio", "driver=pa,model=virtio"]
|
||||
# common = ["-audio", "driver=pa,model=virtio"]
|
||||
|
||||
if vm.wayland:
|
||||
# fmt: off
|
||||
@ -135,16 +137,7 @@ def get_vm_create_info(vm: VmConfig, nix_options: list[str]) -> dict[str, str]:
|
||||
specific_groot_dir(clan_name=vm.clan_name, flake_url=str(vm.flake_url))
|
||||
/ f"vm-{machine}",
|
||||
)
|
||||
proc = subprocess.run(
|
||||
cmd,
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
text=True,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
raise ClanError(
|
||||
f"Failed to build vm config: {shlex.join(cmd)} failed with: {proc.returncode}"
|
||||
)
|
||||
proc = run(cmd)
|
||||
try:
|
||||
return json.loads(Path(proc.stdout.strip()).read_text())
|
||||
except json.JSONDecodeError as e:
|
||||
|
@ -1,9 +1,6 @@
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from clan_cli.clan_uri import ClanParameters, ClanScheme, ClanURI
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
|
||||
def test_get_internal() -> None:
|
||||
@ -12,7 +9,7 @@ def test_get_internal() -> None:
|
||||
assert uri.get_internal() == "https://example.com?password=1234"
|
||||
|
||||
uri = ClanURI("clan://~/Downloads")
|
||||
assert uri.get_internal() == "~/Downloads"
|
||||
assert uri.get_internal().endswith("/Downloads")
|
||||
|
||||
uri = ClanURI("clan:///home/user/Downloads")
|
||||
assert uri.get_internal() == "/home/user/Downloads"
|
||||
@ -25,24 +22,18 @@ def test_local_uri() -> None:
|
||||
# Create a ClanURI object from a local URI
|
||||
uri = ClanURI("clan://file:///home/user/Downloads")
|
||||
match uri.scheme:
|
||||
case ClanScheme.FILE.value(path):
|
||||
case ClanScheme.LOCAL.value(path):
|
||||
assert path == Path("/home/user/Downloads") # type: ignore
|
||||
case _:
|
||||
assert False
|
||||
|
||||
|
||||
def test_unsupported_schema() -> None:
|
||||
with pytest.raises(ClanError, match="Unsupported uri components: .*"):
|
||||
# Create a ClanURI object from an unsupported URI
|
||||
ClanURI("clan://ftp://ftp.example.com")
|
||||
|
||||
|
||||
def test_is_remote() -> None:
|
||||
# Create a ClanURI object from a remote URI
|
||||
uri = ClanURI("clan://https://example.com")
|
||||
|
||||
match uri.scheme:
|
||||
case ClanScheme.HTTP.value(url):
|
||||
case ClanScheme.REMOTE.value(url):
|
||||
assert url == "https://example.com" # type: ignore
|
||||
case _:
|
||||
assert False
|
||||
@ -51,7 +42,7 @@ def test_is_remote() -> None:
|
||||
def test_direct_local_path() -> None:
|
||||
# Create a ClanURI object from a remote URI
|
||||
uri = ClanURI("clan://~/Downloads")
|
||||
assert uri.get_internal() == "~/Downloads"
|
||||
assert uri.get_internal().endswith("/Downloads")
|
||||
|
||||
|
||||
def test_direct_local_path2() -> None:
|
||||
@ -67,7 +58,7 @@ def test_remote_with_clanparams() -> None:
|
||||
assert uri.params.flake_attr == "defaultVM"
|
||||
|
||||
match uri.scheme:
|
||||
case ClanScheme.HTTP.value(url):
|
||||
case ClanScheme.REMOTE.value(url):
|
||||
assert url == "https://example.com" # type: ignore
|
||||
case _:
|
||||
assert False
|
||||
@ -81,7 +72,7 @@ def test_from_path_with_custom() -> None:
|
||||
assert uri.params.flake_attr == "myVM"
|
||||
|
||||
match uri.scheme:
|
||||
case ClanScheme.FILE.value(path):
|
||||
case ClanScheme.LOCAL.value(path):
|
||||
assert path == Path("/home/user/Downloads") # type: ignore
|
||||
case _:
|
||||
assert False
|
||||
@ -95,7 +86,7 @@ def test_from_path_with_default() -> None:
|
||||
assert uri.params.flake_attr == "defaultVM"
|
||||
|
||||
match uri.scheme:
|
||||
case ClanScheme.FILE.value(path):
|
||||
case ClanScheme.LOCAL.value(path):
|
||||
assert path == Path("/home/user/Downloads") # type: ignore
|
||||
case _:
|
||||
assert False
|
||||
@ -109,7 +100,7 @@ def test_from_str() -> None:
|
||||
assert uri.params.flake_attr == "myVM"
|
||||
|
||||
match uri.scheme:
|
||||
case ClanScheme.HTTP.value(url):
|
||||
case ClanScheme.REMOTE.value(url):
|
||||
assert url == "https://example.com?password=asdasd&test=1234" # type: ignore
|
||||
case _:
|
||||
assert False
|
||||
@ -118,17 +109,17 @@ def test_from_str() -> None:
|
||||
params = ClanParameters(flake_attr="myVM")
|
||||
uri = ClanURI.from_str(url=uri_str, params=params)
|
||||
assert uri.params.flake_attr == "myVM"
|
||||
assert uri.get_internal() == "~/Downloads/democlan"
|
||||
assert uri.get_internal().endswith("/Downloads/democlan")
|
||||
|
||||
uri_str = "~/Downloads/democlan"
|
||||
uri = ClanURI.from_str(url=uri_str)
|
||||
assert uri.params.flake_attr == "defaultVM"
|
||||
assert uri.get_internal() == "~/Downloads/democlan"
|
||||
assert uri.get_internal().endswith("/Downloads/democlan")
|
||||
|
||||
uri_str = "clan://~/Downloads/democlan"
|
||||
uri = ClanURI.from_str(url=uri_str)
|
||||
assert uri.params.flake_attr == "defaultVM"
|
||||
assert uri.get_internal() == "~/Downloads/democlan"
|
||||
assert uri.get_internal().endswith("/Downloads/democlan")
|
||||
|
||||
|
||||
def test_remote_with_all_params() -> None:
|
||||
@ -137,7 +128,7 @@ def test_remote_with_all_params() -> None:
|
||||
assert uri.params.flake_attr == "myVM"
|
||||
|
||||
match uri.scheme:
|
||||
case ClanScheme.HTTP.value(url):
|
||||
case ClanScheme.REMOTE.value(url):
|
||||
assert url == "https://example.com?password=1234" # type: ignore
|
||||
case _:
|
||||
assert False
|
||||
|
Loading…
Reference in New Issue
Block a user