forked from clan/clan-core
Improved ClanURI
This commit is contained in:
parent
7a82f364c7
commit
0a9fb41785
@ -40,7 +40,7 @@ class ClanParameters:
|
|||||||
class ClanURI:
|
class ClanURI:
|
||||||
# Initialize the class with a clan:// URI
|
# Initialize the class with a clan:// URI
|
||||||
def __init__(self, uri: str) -> None:
|
def __init__(self, uri: str) -> None:
|
||||||
|
self._full_uri = uri
|
||||||
# Check if the URI starts with clan://
|
# Check if the URI starts with clan://
|
||||||
if uri.startswith("clan://"):
|
if uri.startswith("clan://"):
|
||||||
self._nested_uri = uri[7:]
|
self._nested_uri = uri[7:]
|
||||||
@ -54,13 +54,13 @@ class ClanURI:
|
|||||||
# Parse the query string into a dictionary
|
# Parse the query string into a dictionary
|
||||||
query = urllib.parse.parse_qs(self._components.query)
|
query = urllib.parse.parse_qs(self._components.query)
|
||||||
|
|
||||||
params: dict[str, str] = {}
|
new_params: dict[str, str] = {}
|
||||||
for field in dataclasses.fields(ClanParameters):
|
for field in dataclasses.fields(ClanParameters):
|
||||||
if field.name in query:
|
if field.name in query:
|
||||||
values = query[field.name]
|
values = query[field.name]
|
||||||
if len(values) > 1:
|
if len(values) > 1:
|
||||||
raise ClanError(f"Multiple values for parameter: {field.name}")
|
raise ClanError(f"Multiple values for parameter: {field.name}")
|
||||||
params[field.name] = values[0]
|
new_params[field.name] = values[0]
|
||||||
|
|
||||||
# Remove the field from the query dictionary
|
# Remove the field from the query dictionary
|
||||||
# clan uri and nested uri share one namespace for query parameters
|
# clan uri and nested uri share one namespace for query parameters
|
||||||
@ -69,7 +69,7 @@ class ClanURI:
|
|||||||
|
|
||||||
new_query = urllib.parse.urlencode(query, doseq=True)
|
new_query = urllib.parse.urlencode(query, doseq=True)
|
||||||
self._components = self._components._replace(query=new_query)
|
self._components = self._components._replace(query=new_query)
|
||||||
self.params = ClanParameters(**params)
|
self.params = ClanParameters(**new_params)
|
||||||
|
|
||||||
comb = (
|
comb = (
|
||||||
self._components.scheme,
|
self._components.scheme,
|
||||||
@ -97,10 +97,25 @@ class ClanURI:
|
|||||||
case _:
|
case _:
|
||||||
raise ClanError(f"Unsupported uri components: {self.scheme}")
|
raise ClanError(f"Unsupported uri components: {self.scheme}")
|
||||||
|
|
||||||
|
def get_full_uri(self) -> str:
|
||||||
|
return self._full_uri
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_path(cls, path: Path, params: ClanParameters) -> Self: # noqa
|
def from_path(cls, path: Path, params: ClanParameters | None = None) -> Self: # noqa
|
||||||
urlparams = urllib.parse.urlencode(params.__dict__)
|
return cls.from_str(str(path), params)
|
||||||
return cls(f"clan://{path}?{urlparams}")
|
|
||||||
|
@classmethod
|
||||||
|
def from_str(cls, url: str, params: ClanParameters | None = None) -> Self: # noqa
|
||||||
|
if params is None:
|
||||||
|
return cls(f"clan://{url}")
|
||||||
|
|
||||||
|
comp = urllib.parse.urlparse(url)
|
||||||
|
query = urllib.parse.parse_qs(comp.query)
|
||||||
|
query.update(params.__dict__)
|
||||||
|
new_query = urllib.parse.urlencode(query, doseq=True)
|
||||||
|
comp = comp._replace(query=new_query)
|
||||||
|
new_url = urllib.parse.urlunparse(comp)
|
||||||
|
return cls(f"clan://{new_url}")
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return f"ClanURI({self._components.geturl()})"
|
return f"ClanURI({self._components.geturl()})"
|
||||||
|
@ -3,11 +3,11 @@ import argparse
|
|||||||
import dataclasses
|
import dataclasses
|
||||||
import datetime
|
import datetime
|
||||||
import json
|
import json
|
||||||
from pathlib import Path
|
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from clan_cli.flakes.inspect import FlakeConfig, inspect_flake
|
from clan_cli.flakes.inspect import FlakeConfig, inspect_flake
|
||||||
|
|
||||||
|
from ..clan_uri import ClanURI
|
||||||
from ..dirs import user_history_file
|
from ..dirs import user_history_file
|
||||||
from ..locked_open import locked_open
|
from ..locked_open import locked_open
|
||||||
|
|
||||||
@ -46,10 +46,12 @@ def list_history() -> list[HistoryEntry]:
|
|||||||
return logs
|
return logs
|
||||||
|
|
||||||
|
|
||||||
def add_history(path: Path) -> list[HistoryEntry]:
|
def add_history(uri: ClanURI) -> list[HistoryEntry]:
|
||||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||||
logs = list_history()
|
logs = list_history()
|
||||||
found = False
|
found = False
|
||||||
|
path = uri.get_internal()
|
||||||
|
machine = uri.params.flake_attr
|
||||||
|
|
||||||
for entry in logs:
|
for entry in logs:
|
||||||
if entry.flake.flake_url == str(path):
|
if entry.flake.flake_url == str(path):
|
||||||
@ -59,7 +61,7 @@ def add_history(path: Path) -> list[HistoryEntry]:
|
|||||||
if found:
|
if found:
|
||||||
break
|
break
|
||||||
|
|
||||||
flake = inspect_flake(path, "defaultVM")
|
flake = inspect_flake(path, machine)
|
||||||
flake.flake_url = str(flake.flake_url)
|
flake.flake_url = str(flake.flake_url)
|
||||||
history = HistoryEntry(
|
history = HistoryEntry(
|
||||||
flake=flake,
|
flake=flake,
|
||||||
@ -80,5 +82,7 @@ def add_history_command(args: argparse.Namespace) -> None:
|
|||||||
|
|
||||||
# takes a (sub)parser and configures it
|
# takes a (sub)parser and configures it
|
||||||
def register_add_parser(parser: argparse.ArgumentParser) -> None:
|
def register_add_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
parser.add_argument("path", type=Path, help="Path to the flake", default=Path("."))
|
parser.add_argument(
|
||||||
|
"uri", type=ClanURI, help="Path to the flake", default=ClanURI(".")
|
||||||
|
)
|
||||||
parser.set_defaults(func=add_history_command)
|
parser.set_defaults(func=add_history_command)
|
||||||
|
@ -101,6 +101,31 @@ def test_from_path_with_default() -> None:
|
|||||||
assert False
|
assert False
|
||||||
|
|
||||||
|
|
||||||
|
def test_from_str() -> None:
|
||||||
|
# Create a ClanURI object from a remote URI with parameters
|
||||||
|
uri_str = "https://example.com?password=asdasd&test=1234"
|
||||||
|
params = ClanParameters(flake_attr="myVM")
|
||||||
|
uri = ClanURI.from_str(url=uri_str, params=params)
|
||||||
|
assert uri.params.flake_attr == "myVM"
|
||||||
|
|
||||||
|
match uri.scheme:
|
||||||
|
case ClanScheme.HTTP.value(url):
|
||||||
|
assert url == "https://example.com?password=asdasd&test=1234" # type: ignore
|
||||||
|
case _:
|
||||||
|
assert False
|
||||||
|
|
||||||
|
uri_str = "~/Downloads/democlan"
|
||||||
|
params = ClanParameters(flake_attr="myVM")
|
||||||
|
uri = ClanURI.from_str(url=uri_str, params=params)
|
||||||
|
assert uri.params.flake_attr == "myVM"
|
||||||
|
assert uri.get_internal() == "~/Downloads/democlan"
|
||||||
|
|
||||||
|
uri_str = "~/Downloads/democlan"
|
||||||
|
uri = ClanURI.from_str(url=uri_str)
|
||||||
|
assert uri.params.flake_attr == "defaultVM"
|
||||||
|
assert uri.get_internal() == "~/Downloads/democlan"
|
||||||
|
|
||||||
|
|
||||||
def test_remote_with_all_params() -> None:
|
def test_remote_with_all_params() -> None:
|
||||||
# Create a ClanURI object from a remote URI with parameters
|
# Create a ClanURI object from a remote URI with parameters
|
||||||
uri = ClanURI("clan://https://example.com?flake_attr=myVM&password=1234")
|
uri = ClanURI("clan://https://example.com?flake_attr=myVM&password=1234")
|
||||||
|
@ -13,26 +13,26 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
|
|
||||||
def test_history_add(
|
def test_history_add(
|
||||||
test_flake: FlakeForTest,
|
test_flake_with_core: FlakeForTest,
|
||||||
) -> None:
|
) -> None:
|
||||||
cli = Cli()
|
cli = Cli()
|
||||||
cmd = [
|
cmd = [
|
||||||
"history",
|
"history",
|
||||||
"add",
|
"add",
|
||||||
str(test_flake.path),
|
str(test_flake_with_core.path),
|
||||||
]
|
]
|
||||||
breakpoint()
|
|
||||||
cli.run(cmd)
|
cli.run(cmd)
|
||||||
|
|
||||||
history_file = user_history_file()
|
history_file = user_history_file()
|
||||||
assert history_file.exists()
|
assert history_file.exists()
|
||||||
history = [HistoryEntry(**entry) for entry in json.loads(open(history_file).read())]
|
history = [HistoryEntry(**entry) for entry in json.loads(open(history_file).read())]
|
||||||
assert history[0].flake.flake_url == str(test_flake.path)
|
assert history[0].flake.flake_url == str(test_flake_with_core.path)
|
||||||
|
|
||||||
|
|
||||||
def test_history_list(
|
def test_history_list(
|
||||||
capsys: CaptureFixture,
|
capsys: CaptureFixture,
|
||||||
test_flake: FlakeForTest,
|
test_flake_with_core: FlakeForTest,
|
||||||
) -> None:
|
) -> None:
|
||||||
cli = Cli()
|
cli = Cli()
|
||||||
cmd = [
|
cmd = [
|
||||||
@ -41,8 +41,8 @@ def test_history_list(
|
|||||||
]
|
]
|
||||||
|
|
||||||
cli.run(cmd)
|
cli.run(cmd)
|
||||||
assert str(test_flake.path) not in capsys.readouterr().out
|
assert str(test_flake_with_core.path) not in capsys.readouterr().out
|
||||||
|
|
||||||
cli.run(["history", "add", str(test_flake.path)])
|
cli.run(["history", "add", str(test_flake_with_core.path)])
|
||||||
cli.run(cmd)
|
cli.run(cmd)
|
||||||
assert str(test_flake.path) in capsys.readouterr().out
|
assert str(test_flake_with_core.path) in capsys.readouterr().out
|
||||||
|
Loading…
Reference in New Issue
Block a user