1
0
forked from clan/clan-core

Compare commits

...

36 Commits

Author SHA1 Message Date
cc2318acdd vars: implement prompts 2024-07-22 15:31:55 +07:00
72019d6bcb Merge pull request 'Automatic flake update - 2024-07-22T00:00+00:00' (#1791) from flake-update-2024-07-22 into main 2024-07-22 00:06:51 +00:00
Clan Merge Bot
249a18a734 update flake lock - 2024-07-22T00:00+00:00
Flake lock file updates:

• Updated input 'disko':
    'github:nix-community/disko/786965e1b1ed3fd2018d78399984f461e2a44689' (2024-07-11)
  → 'github:nix-community/disko/bec6e3cde912b8acb915fecdc509eda7c973fb42' (2024-07-19)
• Updated input 'nixos-images':
    'github:nix-community/nixos-images/5eddae0afbcfd4283af5d6676d08ad059ca04b70' (2024-07-11)
  → 'github:nix-community/nixos-images/accee005735844d57b411d9969c5d0aabc6a55f6' (2024-07-21)
• Updated input 'nixpkgs':
    'github:NixOS/nixpkgs/0af9d835c27984b3265145f8e3cbc6c153479196' (2024-07-14)
  → 'github:NixOS/nixpkgs/4cc8b29327bed3d52b40041f810f49734298af46' (2024-07-21)
• Updated input 'sops-nix':
    'github:Mic92/sops-nix/0703ba03fd9c1665f8ab68cc3487302475164617' (2024-07-14)
  → 'github:Mic92/sops-nix/909e8cfb60d83321d85c8d17209d733658a21c95' (2024-07-21)
• Updated input 'treefmt-nix':
    'github:numtide/treefmt-nix/b92afa1501ac73f1d745526adc4f89b527595f14' (2024-07-14)
  → 'github:numtide/treefmt-nix/888bfb10a9b091d9ed2f5f8064de8d488f7b7c97' (2024-07-20)
2024-07-22 00:00:22 +00:00
6c7e9bafea Merge pull request 'root-password: fix password-store support' (#1789) from inventory-fixes into main 2024-07-21 16:14:47 +00:00
a1a36606e4 cleanup deprecated facts options 2024-07-21 18:11:30 +02:00
3d12aabf0c test_flake_with_core_and_pass: don't use deprecated options 2024-07-21 18:03:51 +02:00
e79e199c9a root-password: fix password-store support 2024-07-21 17:59:11 +02:00
1db0321163 Merge pull request 'Clan create fixes' (#1788) from inventory-fixes into main 2024-07-21 15:05:13 +00:00
d356a63d6c improve lsblk instructions 2024-07-21 16:39:01 +02:00
824c5d3f80 flake/create: fix "directory does not exist" error 2024-07-21 16:00:35 +02:00
563ead4652 only set git author / email when no one is set globally 2024-07-21 16:00:35 +02:00
79a6ad2715 drop binary cache from flake
This generates warnings for users of the CLI and confuses them.
In our CI systems we can just provide our binary cache.
2024-07-21 15:27:31 +02:00
2516f38c37 remove deprecated adwaita-icon-theme 2024-07-21 15:26:37 +02:00
f3c9c379e6 Merge pull request 'Reapply "clan.core: rename clan.{deployment,networking} -> clan.core.{deployment,networking}"' (#1787) from Qubasa/clan-core:Qubasa-main into main 2024-07-19 20:23:25 +00:00
3546586dde fixup! fixup! fixup! clan-vm-manager: Restore to known good version 2024-07-19 22:20:12 +02:00
aa792fedfd fixup! fixup! clan-vm-manager: Restore to known good version 2024-07-19 22:12:29 +02:00
f1182af5a1 fixup! clan-vm-manager: Restore to known good version 2024-07-19 22:12:17 +02:00
728f8f5758 clan-vm-manager: Restore to known good version 2024-07-19 22:07:34 +02:00
1cb69cb5fc Reapply "clan.core: rename clan.{deployment,networking} -> clan.core.{deployment,networking}"
This reverts commit 9778444706.

workaround upstream bug: https://github.com/NixOS/nixpkgs/issues/324802
2024-07-19 22:07:34 +02:00
f66b809866 Merge pull request 'Classgen: refactor functions' (#1785) from hsjobeki/clan-core:hsjobeki-main into main 2024-07-19 16:52:37 +00:00
6d441a1494
Classgen: make type order predictable 2024-07-19 18:49:16 +02:00
5c18f67fed
Classgen: refactor functions 2024-07-19 18:20:22 +02:00
a7e3fd431d Merge pull request 'Classgen: add error reporting and combine common classes' (#1784) from hsjobeki/clan-core:hsjobeki-main into main 2024-07-19 11:07:06 +00:00
3435db68c8
Classgen: add error reporting and combine common classes 2024-07-19 13:03:38 +02:00
f00ddcad10 Merge pull request 'Docs: explain inventory imports' (#1783) from hsjobeki/clan-core:hsjobeki-rearrange-docs-front into main 2024-07-19 09:29:59 +00:00
988ed9dccd
Docs: explain inventory imports 2024-07-19 11:26:34 +02:00
aab6a45cda Merge pull request 'Docs: remove generated inventory.md from git index' (#1782) from hsjobeki/clan-core:hsjobeki-rearrange-docs-front into main 2024-07-19 09:20:32 +00:00
afa0984b57
Docs: remove generated inventory.md from git index 2024-07-19 11:17:08 +02:00
ee65d3918b Merge pull request 'docs: add reference index pages' (#1781) from hsjobeki/clan-core:rearrange-docs-front into main 2024-07-19 09:11:33 +00:00
67b76c8ced
Docs: generate api docs for inventory 2024-07-19 11:07:47 +02:00
13b8b949f9
docs: add reference index pages
- add index pages for each reference documentation category
- move concepts pages into the reference hierarchy
- render clanModules overview page in the style of the CLI overview
2024-07-19 10:27:04 +02:00
3a3f8e0756 Merge pull request 'Init: Autogenerate classes from nix interfaces' (#1778) from hsjobeki/clan-core:hsjobeki-main into main 2024-07-19 07:58:57 +00:00
6d49f5c926
Commit generated code otherwise CI cannot check types 2024-07-19 09:52:14 +02:00
c92ee71d42
Jsonschema: fix tests 2024-07-18 22:04:11 +02:00
07965598f5
Classgen: add mapped keys and more stuff 2024-07-18 21:58:36 +02:00
7e84eaa4b3
Init: Autogenerate classes from nix interfaces 2024-07-18 19:14:12 +02:00
116 changed files with 5366 additions and 492 deletions

1
.gitignore vendored
View File

@ -14,6 +14,7 @@ nixos.qcow2
**/*.glade~
/docs/out
# dream2nix
.dream2nix

View File

@ -1,9 +1,18 @@
{ pkgs, config, ... }:
{
pkgs,
config,
lib,
...
}:
{
users.mutableUsers = false;
users.users.root.hashedPasswordFile =
config.clan.core.facts.services.root-password.secret.password-hash.path;
sops.secrets."${config.clan.core.machineName}-password-hash".neededForUsers = true;
sops.secrets."${config.clan.core.machineName}-password-hash".neededForUsers = lib.mkIf (
config.clan.core.facts.secretStore == "sops"
) true;
clan.core.facts.services.root-password = {
secret.password = { };
secret.password-hash = { };

View File

@ -2,7 +2,8 @@
{
options.clan.single-disk = {
device = lib.mkOption {
type = lib.types.str;
default = null;
type = lib.types.nullOr lib.types.str;
description = "The primary disk device to install the system on";
# Question: should we set a default here?
# default = "/dev/null";

5
docs/.gitignore vendored
View File

@ -1,3 +1,6 @@
/site/reference
/site/reference/clan-core
/site/reference/clanModules
/site/reference/nix-api/inventory.md
/site/reference/cli
/site/static/Roboto-Regular.ttf
/site/static/FiraCode-VF.ttf

View File

@ -48,11 +48,13 @@ nav:
- Mesh VPN: getting-started/mesh-vpn.md
- Backup & Restore: getting-started/backups.md
- Flake-parts: getting-started/flake-parts.md
- Concepts:
- Configuration: concepts/configuration.md
- Inventory: concepts/inventory.md
- Guides:
- guides/index.md
- Inventory: guides/inventory.md
- Reference:
- reference/index.md
- Clan Modules:
- reference/clanModules/index.md
- reference/clanModules/borgbackup-static.md
- reference/clanModules/borgbackup.md
- reference/clanModules/deltachat.md
@ -77,13 +79,13 @@ nav:
- reference/clanModules/zerotier-static-peers.md
- reference/clanModules/zt-tcp-relay.md
- CLI:
- reference/cli/index.md
- reference/cli/backups.md
- reference/cli/config.md
- reference/cli/facts.md
- reference/cli/flakes.md
- reference/cli/flash.md
- reference/cli/history.md
- reference/cli/index.md
- reference/cli/machines.md
- reference/cli/secrets.md
- reference/cli/show.md
@ -98,6 +100,10 @@ nav:
- reference/clan-core/state.md
- reference/clan-core/deployment.md
- reference/clan-core/networking.md
- Nix API:
- reference/nix-api/index.md
- buildClan: reference/nix-api/buildclan.md
- Inventory: reference/nix-api/inventory.md
- Contributing: contributing/contributing.md
- Blog:
- blog/index.md

View File

@ -2,6 +2,7 @@
pkgs,
module-docs,
clan-cli-docs,
inventory-api-docs,
asciinema-player-js,
asciinema-player-css,
roboto,
@ -32,6 +33,7 @@ pkgs.stdenv.mkDerivation {
mkdir -p ./site/reference/cli
cp -af ${module-docs}/* ./site/reference/
cp -af ${clan-cli-docs}/* ./site/reference/cli/
cp -af ${inventory-api-docs} ./site/reference/nix-api/inventory.md
mkdir -p ./site/static/asciinema-player
ln -snf ${asciinema-player-js} ./site/static/asciinema-player/asciinema-player.min.js

View File

@ -73,7 +73,7 @@
in
{
devShells.docs = pkgs.callPackage ./shell.nix {
inherit (self'.packages) docs clan-cli-docs;
inherit (self'.packages) docs clan-cli-docs inventory-api-docs;
inherit
asciinema-player-js
asciinema-player-css
@ -83,7 +83,7 @@
};
packages = {
docs = pkgs.python3.pkgs.callPackage ./default.nix {
inherit (self'.packages) clan-cli-docs;
inherit (self'.packages) clan-cli-docs inventory-api-docs;
inherit (inputs) nixpkgs;
inherit module-docs;
inherit asciinema-player-js;

View File

@ -220,6 +220,11 @@ def render_roles(roles: list[str] | None, module_name: str) -> str:
return ""
clan_modules_descr = """Clan modules are [NixOS modules](https://wiki.nixos.org/wiki/NixOS_modules) which have been enhanced with additional features provided by Clan, with certain option types restricted to enable configuration through a graphical interface.
"""
def produce_clan_modules_docs() -> None:
if not CLAN_MODULES:
raise ValueError(
@ -245,6 +250,12 @@ def produce_clan_modules_docs() -> None:
# print(meta_map)
# {'borgbackup': '/nix/store/hi17dwgy7963ddd4ijh81fv0c9sbh8sw-options.json', ... }
modules_index = "# Modules Overview\n\n"
modules_index += clan_modules_descr
modules_index += "## Overview\n\n"
modules_index += '<div class="grid cards" markdown>\n\n'
for module_name, options_file in links.items():
readme_file = Path(CLAN_CORE_PATH) / "clanModules" / module_name / "README.md"
print(module_name, readme_file)
@ -254,6 +265,8 @@ def produce_clan_modules_docs() -> None:
frontmatter, readme_content = extract_frontmatter(readme, str(readme_file))
print(frontmatter, readme_content)
modules_index += build_option_card(module_name, frontmatter)
with open(Path(options_file) / "share/doc/nixos/options.json") as f:
options: dict[str, dict[str, Any]] = json.load(f)
print(f"Rendering options for {module_name}...")
@ -282,6 +295,39 @@ def produce_clan_modules_docs() -> None:
with open(outfile, "w") as of:
of.write(output)
modules_index += "</div>"
modules_index += "\n"
modules_outfile = Path(OUT) / "clanModules/index.md"
with open(modules_outfile, "w") as of:
of.write(modules_index)
def build_option_card(module_name: str, frontmatter: Frontmatter) -> str:
"""
Build the overview index card for each reference target option.
"""
def indent_all(text: str, indent_size: int = 4) -> str:
"""
Indent all lines in a string.
"""
indent = " " * indent_size
lines = text.split("\n")
indented_text = indent + ("\n" + indent).join(lines)
return indented_text
def to_md_li(module_name: str, frontmatter: Frontmatter) -> str:
md_li = (
f"""- **[{module_name}](./{"-".join(module_name.split(" "))}.md)**\n\n"""
)
md_li += f"""{indent_all("---", 4)}\n\n"""
fmd = f"\n{frontmatter.description.strip()}" if frontmatter.description else ""
md_li += f"""{indent_all(fmd, 4)}"""
return md_li
return f"{to_md_li(module_name, frontmatter)}\n\n"
if __name__ == "__main__":
produce_clan_core_docs()

View File

@ -3,6 +3,7 @@
pkgs,
module-docs,
clan-cli-docs,
inventory-api-docs,
asciinema-player-js,
asciinema-player-css,
roboto,
@ -19,6 +20,8 @@ pkgs.mkShell {
mkdir -p ./site/reference/cli
cp -af ${module-docs}/* ./site/reference/
cp -af ${clan-cli-docs}/* ./site/reference/cli/
cp -af ${inventory-api-docs} ./site/reference/nix-api/inventory.md
chmod +w ./site/reference/*
echo "Generated API documentation in './site/reference/' "

View File

@ -1,56 +0,0 @@
# Configuration
## Introduction
When managing machine configuration this can be done through many possible ways.
Ranging from writing `nix` expression in a `flake.nix` file; placing `autoincluded` files into your machine directory; or configuring everything in a simple UI (upcomming).
clan currently offers the following methods to configure machines:
!!! Success "Recommended for nix people"
- flake.nix (i.e. via `buildClan`)
- `machine` argument
- `inventory` argument
- machines/`machine_name`/configuration.nix (`autoincluded` if it exists)
???+ Note "Used by CLI & UI"
- inventory.json
- machines/`machine_name`/hardware-configuration.nix (`autoincluded` if it exists)
!!! Warning "Deprecated"
machines/`machine_name`/settings.json
## BuildClan
The core function that produces a clan. It returns a set of consistent configurations for all machines with ready-to-use secrets, backups and other services.
### Inputs
`directory`
: The directory containing the machines subdirectory
`machines`
: Allows to include machine-specific modules i.e. machines.${name} = { ... }
`meta`
: An optional set
: `{ name :: string, icon :: string, description :: string }`
`inventory`
: Service set for easily configuring distributed services, such as backups
: For more details see [Inventory](./inventory.md)
`specialArgs`
: Extra arguments to pass to nixosSystem i.e. useful to make self available
`pkgsForSystem`
: A function that maps from architecture to pkgs, if specified this nixpkgs will be only imported once for each system.
This improves performance, but all nipxkgs.* options will be ignored.
`(string -> pkgs )`

View File

@ -1,5 +1,31 @@
# Configuration - How to configure clan with your own machines
Managing machine configurations can be done in the following ways:
- writing `nix` expressions in a `flake.nix` file,
- placing `autoincluded` files into your machine directory,
- configuring everything in a simple UI (upcoming).
Clan currently offers the following methods to configure machines:
!!! Success "Recommended for nix people"
- flake.nix (i.e. via `buildClan`)
- `machine` argument
- `inventory` argument
- machines/`machine_name`/configuration.nix (`autoincluded` if it exists)
???+ Note "Used by CLI & UI"
- inventory.json
- machines/`machine_name`/hardware-configuration.nix (`autoincluded` if it exists)
!!! Warning "Deprecated"
machines/`machine_name`/settings.json
## Global configuration
In the `flake.nix` file:

View File

@ -0,0 +1,5 @@
# Guides
Detailed guides into the following subtopics:
- [Inventory](./inventory.md): Configuring Services across machine boundaries

View File

@ -2,31 +2,35 @@
`Inventory` is an abstract service layer for consistently configuring distributed services across machine boundaries.
## Meta
See [Inventory API Documentation](../reference/nix-api/inventory.md)
This guide will walk you through setting up a backup-service, where the inventory becomes useful.
## Prerequisites Meta (optional)
Metadata about the clan, will be displayed upfront in the upcomming clan-app, make sure to choose a unique name.
Make sure to set `name` either via `inventory.meta` OR via `clan.meta`.
```{.nix hl_lines="3-8"}
buildClan {
inventory = {
meta = {
# The following options are available
# name: string # Required, name of the clan.
# description: null | string
# icon: null | string
name = "Superclan"
description = "Awesome backups and family stuff"
};
};
}
```
## Machines
## How to add machines
Machines and a small pieve of their configuration can be added via `inventory.machines`.
Machines can be added via `inventory.machines` OR via `buildClan` directly.
!!! Note
It doesn't matter where the machine gets introduced to buildClan - All delarations are valid, duplications are merged.
However the clan-app (UI) will create machines in the inventory, because it cannot create arbitrary nixos configs.
However the clan-app (UI) will create machines in the inventory, because it cannot create arbitrary nix code or nixos configs.
In the following example `backup_server` is one machine - it may specify parts of its configuration in different places.
@ -78,9 +82,10 @@ A module can be added to one or multiple machines via `Roles`. clan's `Role` int
Each service can still be customized and configured according to the modules options.
- Per instance configuration via `services.<serviceName>.<instanceName>.config`
- Per role configuration via `services.<serviceName>.<instanceName>.roles.<roleName>.config`
- Per machine configuration via `services.<serviceName>.<instanceName>.machines.<machineName>.config`
### Configuration Examples
### Setting up the Backup Service
!!! Example "Borgbackup Example"
@ -112,49 +117,33 @@ Each service can still be customized and configured according to the modules opt
}
```
!!! Example "Packages Example"
### Scalabling the Backup
This example shows how to add `pkgs.firefox` via the inventory interface.
The inventory allows machines to set **Tags**
```{.nix hl_lines="8-11"}
buildClan {
inventory = {
machines = {
"sara" = {};
"jon" = {};
};
services = {
packages.set_1 = {
roles.default.machines = [ "jon" "sara" ];
# Packages is a configuration option of the "packages" clanModule
config.packages = ["firefox"];
};
};
};
}
```
### Tags
It is possible to add services to multiple machines via tags. The service instance gets added in the specified role. In this case `role = "default"`
It is possible to add services to multiple machines via tags. The service instance gets added in the specified role. In this case `role = "client"`
!!! Example "Tags Example"
```{.nix hl_lines="5 8 13"}
```{.nix hl_lines="9 12 17"}
buildClan {
inventory = {
machines = {
"sara" = {
tags = ["browsing"];
"backup_server" = {
# Don't include any nixos config here
# See inventory.Machines for available options
};
"jon" = {
tags = ["browsing"];
tags = [ "backup" ];
};
"sara" = {
tags = [ "backup" ];
};
};
services = {
packages.set_1 = {
roles.default.tags = [ "browsing" ];
config.packages = ["firefox"];
borgbackup.instance_1 = {
roles.client.tags = [ "backup" ];
roles.server.machines = [ "backup_server" ];
};
};
};
@ -164,9 +153,8 @@ It is possible to add services to multiple machines via tags. The service instan
### Multiple Service Instances
!!! danger "Important"
Not all modules support multiple instances yet.
Some modules have support for adding multiple instances of the same service in different roles or configurations.
Not all modules implement support for multiple instances yet.
Multiple instance usage could create complexity, refer to each modules documentation, for intended usage.
!!! Example
@ -194,9 +182,11 @@ Some modules have support for adding multiple instances of the same service in d
}
```
### Schema specification
### API specification
The complete schema specification can be retrieved via:
**The complete schema specification is available [here](../reference/nix-api/inventory.md)**
Or it can build anytime via:
```sh
nix build git+https://git.clan.lol/clan/clan-core#inventory-schema

View File

@ -0,0 +1,8 @@
# Documentation
This section of the site contains information about the following topics:
- How to use the [Clan CLI](./cli/index.md)
- Available services and application [modules](./clanModules/index.md)
- [Configuration options](./clan-core/index.md) controlling the essential features
- Descriptions of the [Nix interfaces](./nix-api/index.md) for defining a Clan

View File

@ -0,0 +1,29 @@
# buildClan
The core [function](https://git.clan.lol/clan/clan-core/src/branch/main/lib/build-clan/default.nix) that produces a Clan. It returns a set of consistent configurations for all machines with ready-to-use secrets, backups and other services.
## Inputs
`directory`
: The directory containing the machines subdirectory
`machines`
: Allows to include machine-specific modules i.e. machines.${name} = { ... }
`meta`
: An optional set
: `{ name :: string, icon :: string, description :: string }`
`inventory`
: Service set for easily configuring distributed services, such as backups
: For more details see [Inventory](./inventory.md)
`specialArgs`
: Extra arguments to pass to nixosSystem i.e. useful to make self available
`pkgsForSystem`
: A function that maps from architecture to pkgs, if specified this nixpkgs will be only imported once for each system.
This improves performance, but all nipxkgs.* options will be ignored.
`(string -> pkgs )`

View File

@ -0,0 +1,6 @@
# Nix API Overview
There are two top-level components of the Nix API, which together allow for the declarative definition of a Clan:
- the [Inventory](./inventory.md), a structure representing the machines, services, custom configurations, and other data that constitute a Clan, and
- the [`buildClan`](./buildclan.md) function, which constructs a Clan from an Inventory definition.

View File

@ -7,11 +7,11 @@
]
},
"locked": {
"lastModified": 1720661479,
"narHash": "sha256-nsGgA14vVn0GGiqEfomtVgviRJCuSR3UEopfP8ixW1I=",
"lastModified": 1721417620,
"narHash": "sha256-6q9b1h8fI3hXg2DG6/vrKWCeG8c5Wj2Kvv22RCgedzg=",
"owner": "nix-community",
"repo": "disko",
"rev": "786965e1b1ed3fd2018d78399984f461e2a44689",
"rev": "bec6e3cde912b8acb915fecdc509eda7c973fb42",
"type": "github"
},
"original": {
@ -48,11 +48,11 @@
]
},
"locked": {
"lastModified": 1720659757,
"narHash": "sha256-ltzUuCsEfPA9CYM9BAnwObBGqDyQIs2OLkbVMeOOk00=",
"lastModified": 1721571445,
"narHash": "sha256-2MnlPVcNJZ9Nbu90kFyo7+lng366gswErP4FExfrUbc=",
"owner": "nix-community",
"repo": "nixos-images",
"rev": "5eddae0afbcfd4283af5d6676d08ad059ca04b70",
"rev": "accee005735844d57b411d9969c5d0aabc6a55f6",
"type": "github"
},
"original": {
@ -63,11 +63,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1720977633,
"narHash": "sha256-if0qaFmAe8X01NsVRK5e9Asg9mEWVkHrA9WuqM5jB70=",
"lastModified": 1721571961,
"narHash": "sha256-jfF4gpRUpTBY2OxDB0FRySsgNGOiuDckEtu7YDQom3Y=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "0af9d835c27984b3265145f8e3cbc6c153479196",
"rev": "4cc8b29327bed3d52b40041f810f49734298af46",
"type": "github"
},
"original": {
@ -95,11 +95,11 @@
"nixpkgs-stable": []
},
"locked": {
"lastModified": 1720926522,
"narHash": "sha256-eTpnrT6yu1vp8C0B5fxHXhgKxHoYMoYTEikQx///jxY=",
"lastModified": 1721531171,
"narHash": "sha256-AsvPw7T0tBLb53xZGcUC3YPqlIpdxoSx56u8vPCr6gU=",
"owner": "Mic92",
"repo": "sops-nix",
"rev": "0703ba03fd9c1665f8ab68cc3487302475164617",
"rev": "909e8cfb60d83321d85c8d17209d733658a21c95",
"type": "github"
},
"original": {
@ -115,11 +115,11 @@
]
},
"locked": {
"lastModified": 1720930114,
"narHash": "sha256-VZK73b5hG5bSeAn97TTcnPjXUXtV7j/AtS4KN8ggCS0=",
"lastModified": 1721458737,
"narHash": "sha256-wNXLQ/ATs1S4Opg1PmuNoJ+Wamqj93rgZYV3Di7kxkg=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "b92afa1501ac73f1d745526adc4f89b527595f14",
"rev": "888bfb10a9b091d9ed2f5f8064de8d488f7b7c97",
"type": "github"
},
"original": {

View File

@ -1,11 +1,6 @@
{
description = "clan.lol base operating system";
nixConfig.extra-substituters = [ "https://cache.clan.lol" ];
nixConfig.extra-trusted-public-keys = [
"cache.clan.lol-1:3KztgSAB5R1M+Dz7vzkBGzXdodizbgLXGXKXlcQLA28="
];
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable-small";
disko.url = "github:nix-community/disko";

View File

@ -12,18 +12,32 @@
treefmt.programs.nixfmt.package = pkgs.nixfmt-rfc-style;
treefmt.programs.deadnix.enable = true;
treefmt.programs.mypy.directories = {
"pkgs/clan-cli" = {
extraPythonPackages = self'.packages.clan-cli.testDependencies;
modules = [ "clan_cli" ];
};
"pkgs/clan-app" = {
extraPythonPackages =
# clan-app currently only exists on linux
(self'.packages.clan-app.externalTestDeps or [ ]) ++ self'.packages.clan-cli.testDependencies;
modules = [ "clan_app" ];
};
};
treefmt.programs.mypy.directories =
{
"pkgs/clan-cli" = {
extraPythonPackages = self'.packages.clan-cli.testDependencies;
modules = [ "clan_cli" ];
};
"pkgs/clan-app" = {
extraPythonPackages =
# clan-app currently only exists on linux
(self'.packages.clan-app.externalTestDeps or [ ]) ++ self'.packages.clan-cli.testDependencies;
modules = [ "clan_app" ];
};
}
// (
if pkgs.stdenv.isLinux then
{
"pkgs/clan-vm-manager" = {
extraPythonPackages =
# clan-app currently only exists on linux
self'.packages.clan-vm-manager.testDependencies ++ self'.packages.clan-cli.testDependencies;
modules = [ "clan_vm_manager" ];
};
}
else
{ }
);
treefmt.programs.ruff.check = true;
treefmt.programs.ruff.format = true;

View File

@ -26,9 +26,6 @@
},
"roles": {
"default": {
"config": {
"packages": ["vim"]
},
"imports": [],
"machines": ["test-inventory-machine"],
"tags": []

View File

@ -20,6 +20,32 @@ let
};
importsOption = lib.mkOption {
description = ''
List of imported '.nix' files.
Each filename must be a string and is interpreted relative to the 'directory' passed to buildClan.
The import only happens if the machine is part of the service or role.
## Example
To import the `special.nix` file
```
. Clan Directory
flake.nix
...
modules
special.nix
...
```
```nix
{
imports = [ "modules/special.nix" ];
}
```
'';
default = [ ];
type = types.listOf types.str;
};
@ -41,6 +67,7 @@ in
options = {
inherit (metaOptions) name description icon;
tags = lib.mkOption {
default = [ ];
apply = lib.unique;
type = types.listOf types.str;
@ -49,16 +76,10 @@ in
default = null;
type = types.nullOr types.str;
};
deploy = lib.mkOption {
default = { };
type = types.submodule {
options = {
targetHost = lib.mkOption {
default = null;
type = types.nullOr types.str;
};
};
};
deploy.targetHost = lib.mkOption {
description = "Configuration for the deployment of the machine";
default = null;
type = types.nullOr types.str;
};
};
}

View File

@ -46,6 +46,29 @@ in
};
# Inventory schema with concrete module implementations
packages.inventory-api-docs = pkgs.stdenv.mkDerivation {
name = "inventory-schema";
buildInputs = [ ];
src = ./.;
buildPhase = ''
cat <<EOF > "$out"
# Inventory API
*Inventory* is an abstract service layer for consistently configuring distributed services across machine boundaries.
The following is a specification of the inventory in [cuelang](https://cuelang.org/) format.
\`\`\`cue
EOF
cat ${self'.packages.inventory-schema-pretty}/schema.cue >> $out
cat <<EOF >> $out
\`\`\`
EOF
'';
};
packages.inventory-schema = pkgs.stdenv.mkDerivation {
name = "inventory-schema";
buildInputs = [ pkgs.cue ];

View File

@ -51,10 +51,13 @@ let
type = "object";
additionalProperties = false;
properties = {
meta =
inventorySchema.properties.services.additionalProperties.additionalProperties.properties.meta;
meta = {
title = "service-meta";
} // inventorySchema.properties.services.additionalProperties.additionalProperties.properties.meta;
config = {
title = "${moduleName}-config";
default = { };
} // moduleSchema;
roles = {
type = "object";
@ -69,6 +72,7 @@ let
{
properties.config = {
title = "${moduleName}-config";
default = { };
} // moduleSchema;
};
}) (rolesOf moduleName)
@ -80,6 +84,7 @@ let
{
additionalProperties.properties.config = {
title = "${moduleName}-config";
default = { };
} // moduleSchema;
};
};

View File

@ -318,7 +318,7 @@ rec {
# return jsonschema property definition for submodule
# then (lib.attrNames (option.type.getSubOptions option.loc).opt)
then
parseOptions' (option.type.getSubOptions option.loc)
example // description // parseOptions' (option.type.getSubOptions option.loc)
# throw error if option type is not supported
else
notSupported option;

View File

@ -279,6 +279,7 @@ in
expected = {
type = "object";
additionalProperties = false;
description = "Test Description";
properties = {
opt = {
type = "boolean";
@ -303,6 +304,7 @@ in
expected = {
type = "object";
additionalProperties = false;
description = "Test Description";
properties = {
opt = {
type = "boolean";

View File

@ -39,7 +39,7 @@ in
vars = {
generators = lib.flip lib.mapAttrs config.clan.core.vars.generators (
_name: generator: {
inherit (generator) dependencies finalScript;
inherit (generator) dependencies finalScript prompts;
files = lib.flip lib.mapAttrs generator.files (_name: file: { inherit (file) secret; });
}
);

View File

@ -108,8 +108,9 @@ in
Prompts are available to the generator script as files.
For example, a prompt named 'prompt1' will be available via $prompts/prompt1
'';
default = { };
type = attrsOf (submodule {
options = {
options = options {
description = {
description = ''
The description of the prompted value

View File

@ -88,7 +88,7 @@ let
fsType = "ext4";
};
${config.clan.core.secretsUploadDirectory} = {
${config.clan.core.facts.secretUploadDirectory} = {
device = "secrets";
fsType = "9p";
neededForBoot = true;

View File

@ -6,7 +6,7 @@ from pathlib import Path
from clan_cli.cmd import run_no_stdout
from clan_cli.errors import ClanCmdError, ClanError
from clan_cli.inventory import Inventory, Service
from clan_cli.inventory import Inventory, load_inventory
from clan_cli.nix import nix_eval
from . import API
@ -150,24 +150,6 @@ def get_module_info(
)
@API.register
def update_module_instance(
base_path: str, module_name: str, instance_name: str, instance_config: Service
) -> Inventory:
inventory = Inventory.load_file(base_path)
module_instances = inventory.services.get(module_name, {})
module_instances[instance_name] = instance_config
inventory.services[module_name] = module_instances
inventory.persist(
base_path, f"Updated module instance {module_name}/{instance_name}"
)
return inventory
@API.register
def get_inventory(base_path: str) -> Inventory:
return Inventory.load_file(base_path)
return load_inventory(base_path)

View File

@ -6,7 +6,7 @@ from pathlib import Path
from clan_cli.api import API
from clan_cli.arg_actions import AppendOptionAction
from clan_cli.inventory import Inventory, InventoryMeta
from clan_cli.inventory import Meta, load_inventory, save_inventory
from ..cmd import CmdOut, run
from ..errors import ClanError
@ -18,9 +18,11 @@ minimal_template_url: str = "git+https://git.clan.lol/clan/clan-core#templates.m
@dataclass
class CreateClanResponse:
git_init: CmdOut
flake_init: CmdOut
git_init: CmdOut | None
git_add: CmdOut
git_config: CmdOut
git_config_username: CmdOut | None
git_config_email: CmdOut | None
flake_update: CmdOut
@ -29,14 +31,18 @@ class CreateOptions:
directory: Path | str
# Metadata for the clan
# Metadata can be shown with `clan show`
meta: InventoryMeta | None = None
meta: Meta | None = None
# URL to the template to use. Defaults to the "minimal" template
template_url: str = minimal_template_url
def git_command(directory: Path, *args: str) -> list[str]:
return nix_shell(["nixpkgs#git"], ["git", "-C", str(directory), *args])
@API.register
def create_clan(options: CreateOptions) -> CreateClanResponse:
directory = Path(options.directory)
directory = Path(options.directory).resolve()
template_url = options.template_url
if not directory.exists():
directory.mkdir()
@ -52,7 +58,6 @@ def create_clan(options: CreateOptions) -> CreateClanResponse:
description="Directory already exists and is not empty.",
)
cmd_responses = {}
command = nix_command(
[
"flake",
@ -61,44 +66,46 @@ def create_clan(options: CreateOptions) -> CreateClanResponse:
template_url,
]
)
out = run(command, cwd=directory)
flake_init = run(command, cwd=directory)
## Begin: setup git
command = nix_shell(["nixpkgs#git"], ["git", "init"])
out = run(command, cwd=directory)
cmd_responses["git init"] = out
git_init = None
if not directory.joinpath(".git").exists():
git_init = run(git_command(directory, "init"))
git_add = run(git_command(directory, "add", "."))
command = nix_shell(["nixpkgs#git"], ["git", "add", "."])
out = run(command, cwd=directory)
cmd_responses["git add"] = out
# check if username is set
has_username = run(git_command(directory, "config", "user.name"), check=False)
git_config_username = None
if has_username.returncode != 0:
git_config_username = run(
git_command(directory, "config", "user.name", "clan-tool")
)
command = nix_shell(["nixpkgs#git"], ["git", "config", "user.name", "clan-tool"])
out = run(command, cwd=directory)
cmd_responses["git config"] = out
command = nix_shell(
["nixpkgs#git"], ["git", "config", "user.email", "clan@example.com"]
)
out = run(command, cwd=directory)
cmd_responses["git config"] = out
## End: setup git
has_username = run(git_command(directory, "config", "user.email"), check=False)
git_config_email = None
if has_username.returncode != 0:
git_config_email = run(
git_command(directory, "config", "user.email", "clan@example.com")
)
# Write inventory.json file
inventory = Inventory.load_file(directory)
inventory = load_inventory(directory)
if options.meta is not None:
inventory.meta = options.meta
# Persist creates a commit message for each change
inventory.persist(directory, "Init inventory")
save_inventory(inventory, directory, "Init inventory")
command = ["nix", "flake", "update"]
out = run(command, cwd=directory)
cmd_responses["flake update"] = out
flake_update = run(
nix_shell(["nixpkgs#nix"], ["nix", "flake", "update"]), cwd=directory
)
response = CreateClanResponse(
git_init=cmd_responses["git init"],
git_add=cmd_responses["git add"],
git_config=cmd_responses["git config"],
flake_update=cmd_responses["flake update"],
flake_init=flake_init,
git_init=git_init,
git_add=git_add,
git_config_username=git_config_username,
git_config_email=git_config_email,
flake_update=flake_update,
)
return response
@ -113,7 +120,7 @@ def register_create_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--meta",
help=f"""Metadata to set for the clan. Available options are: {", ".join([f.name for f in fields(InventoryMeta)]) }""",
help=f"""Metadata to set for the clan. Available options are: {", ".join([f.name for f in fields(Meta)]) }""",
nargs=2,
metavar=("name", "value"),
action=AppendOptionAction,

View File

@ -6,7 +6,7 @@ from urllib.parse import urlparse
from clan_cli.api import API
from clan_cli.errors import ClanCmdError, ClanError
from clan_cli.inventory import InventoryMeta
from clan_cli.inventory import Meta
from ..cmd import run_no_stdout
from ..nix import nix_eval
@ -15,7 +15,7 @@ log = logging.getLogger(__name__)
@API.register
def show_clan_meta(uri: str | Path) -> InventoryMeta:
def show_clan_meta(uri: str | Path) -> Meta:
cmd = nix_eval(
[
f"{uri}#clanInternals.inventory.meta",
@ -61,7 +61,7 @@ def show_clan_meta(uri: str | Path) -> InventoryMeta:
description="Icon path must be a URL or a relative path.",
)
return InventoryMeta(
return Meta(
name=clan_meta.get("name"),
description=clan_meta.get("description", None),
icon=icon_path,

View File

@ -1,20 +1,20 @@
from dataclasses import dataclass
from clan_cli.api import API
from clan_cli.inventory import Inventory, InventoryMeta
from clan_cli.inventory import Meta, load_inventory, save_inventory
@dataclass
class UpdateOptions:
directory: str
meta: InventoryMeta
meta: Meta
@API.register
def update_clan_meta(options: UpdateOptions) -> InventoryMeta:
inventory = Inventory.load_file(options.directory)
def update_clan_meta(options: UpdateOptions) -> Meta:
inventory = load_inventory(options.directory)
inventory.meta = options.meta
inventory.persist(options.directory, "Update clan meta")
save_inventory(inventory, options.directory, "Update clan metadata")
return inventory.meta

View File

@ -18,9 +18,7 @@ class FlakeId:
), f"Flake {self._value} has an invalid type: {type(self._value)}"
def __str__(self) -> str:
return str(
self._value
) # The __str__ method returns a custom string representation
return str(self._value)
@property
def path(self) -> Path:

View File

@ -1,16 +1,44 @@
# ruff: noqa: N815
# ruff: noqa: N806
import dataclasses
import json
from dataclasses import asdict, dataclass, field, is_dataclass
from dataclasses import fields, is_dataclass
from pathlib import Path
from typing import Any, Literal
from types import UnionType
from typing import Any, get_args, get_origin
from clan_cli.errors import ClanError
from clan_cli.git import commit_file
from .classes import (
Inventory,
Machine,
MachineDeploy,
Meta,
Service,
ServiceBorgbackup,
ServiceBorgbackupRole,
ServiceBorgbackupRoleClient,
ServiceBorgbackupRoleServer,
ServiceMeta,
)
# Re export classes here
# This allows to rename classes in the generated code
__all__ = [
"Service",
"Machine",
"Meta",
"Inventory",
"MachineDeploy",
"ServiceBorgbackup",
"ServiceMeta",
"ServiceBorgbackupRole",
"ServiceBorgbackupRoleClient",
"ServiceBorgbackupRoleServer",
]
def sanitize_string(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"')
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def dataclass_to_dict(obj: Any) -> Any:
@ -22,8 +50,11 @@ def dataclass_to_dict(obj: Any) -> Any:
"""
if is_dataclass(obj):
return {
sanitize_string(k): dataclass_to_dict(v)
for k, v in asdict(obj).items() # type: ignore
# Use either the original name or name
sanitize_string(
field.metadata.get("original_name", field.name)
): dataclass_to_dict(getattr(obj, field.name))
for field in fields(obj) # type: ignore
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
@ -37,149 +68,133 @@ def dataclass_to_dict(obj: Any) -> Any:
return obj
@dataclass
class DeploymentInfo:
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def get_second_type(type_hint: type[dict]) -> type:
"""
Deployment information for a machine.
Get the value type of a dictionary type hint
"""
args = get_args(type_hint)
if len(args) == 2:
# Return the second argument, which should be the value type (Machine)
return args[1]
targetHost: str | None = None
raise ValueError(f"Invalid type hint for dict: {type_hint}")
@dataclass
class Machine:
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Inventory machine model.
DO NOT EDIT THIS CLASS.
Any changes here must be reflected in the inventory interface file and potentially other nix files.
- Persisted to the inventory.json file
- Source of truth to generate each clan machine.
- For hardware deployment, the machine must declare the host system.
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if data is None:
return None
name: str
deploy: DeploymentInfo = field(default_factory=DeploymentInfo)
description: str | None = None
icon: str | None = None
tags: list[str] = field(default_factory=list)
system: Literal["x86_64-linux"] | str | None = None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
original_name = field.metadata.get("original_name", field.name)
@staticmethod
def from_dict(data: dict[str, Any]) -> "Machine":
targetHost = data.get("deploy", {}).get("targetHost", None)
return Machine(
name=data["name"],
description=data.get("description", None),
icon=data.get("icon", None),
tags=data.get("tags", []),
system=data.get("system", None),
deploy=DeploymentInfo(targetHost),
)
field_value = data.get(original_name)
field_type = get_inner_type(field.type) # type: ignore
if original_name in data:
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
elif get_origin(field_type) is dict and isinstance(field_value, dict):
# The field is a dictionary with a specific type
inner_type = get_second_type(field_type)
field_value = {
k: from_dict(inner_type, v) for k, v in field_value.items()
}
elif get_origin is list and isinstance(field_value, list):
# The field is a list with a specific type
inner_type = get_args(field_type)[0]
field_value = [from_dict(inner_type, v) for v in field_value]
# Set the value
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Fields with default value
# a: Int = 1
# b: list = Field(default_factory=list)
if original_name in data or field_value is not None:
field_values[field.name] = field_value
else:
# Fields without default value
# a: Int
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e} {data}")
return None
# raise ClanError(f"Failed to instantiate {t.__name__}: {e}")
@dataclass
class MachineServiceConfig:
config: dict[str, Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
def get_path(flake_dir: str | Path) -> Path:
"""
Get the path to the inventory file in the flake directory
"""
return (Path(flake_dir) / "inventory.json").resolve()
@dataclass
class ServiceMeta:
name: str
description: str | None = None
icon: str | None = None
# Default inventory
default_inventory = Inventory(
meta=Meta(name="New Clan"), machines={}, services=Service()
)
@dataclass
class Role:
config: dict[str, Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
machines: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
def load_inventory(
flake_dir: str | Path, default: Inventory = default_inventory
) -> Inventory:
"""
Load the inventory file from the flake directory
If not file is found, returns the default inventory
"""
inventory = default_inventory
inventory_file = get_path(flake_dir)
if inventory_file.exists():
with open(inventory_file) as f:
try:
res = json.load(f)
inventory = from_dict(Inventory, res)
except json.JSONDecodeError as e:
# Error decoding the inventory file
raise ClanError(f"Error decoding inventory file: {e}")
return inventory
@dataclass
class Service:
meta: ServiceMeta
roles: dict[str, Role]
config: dict[str, Any] = field(default_factory=dict)
imports: list[str] = field(default_factory=list)
machines: dict[str, MachineServiceConfig] = field(default_factory=dict)
def save_inventory(inventory: Inventory, flake_dir: str | Path, message: str) -> None:
""" "
Write the inventory to the flake directory
and commit it to git with the given message
"""
inventory_file = get_path(flake_dir)
@staticmethod
def from_dict(d: dict[str, Any]) -> "Service":
return Service(
meta=ServiceMeta(**d.get("meta", {})),
roles={name: Role(**role) for name, role in d.get("roles", {}).items()},
machines=(
{
name: MachineServiceConfig(**machine)
for name, machine in d.get("machines", {}).items()
}
if d.get("machines")
else {}
),
config=d.get("config", {}),
imports=d.get("imports", []),
)
with open(inventory_file, "w") as f:
json.dump(dataclass_to_dict(inventory), f, indent=2)
@dataclass
class InventoryMeta:
name: str
description: str | None = None
icon: str | None = None
@dataclass
class Inventory:
meta: InventoryMeta
machines: dict[str, Machine]
services: dict[str, dict[str, Service]]
@staticmethod
def from_dict(d: dict[str, Any]) -> "Inventory":
return Inventory(
meta=InventoryMeta(**d.get("meta", {})),
machines={
name: Machine.from_dict(machine)
for name, machine in d.get("machines", {}).items()
},
services={
name: {
role: Service.from_dict(service)
for role, service in services.items()
}
for name, services in d.get("services", {}).items()
},
)
@staticmethod
def get_path(flake_dir: str | Path) -> Path:
return Path(flake_dir) / "inventory.json"
@staticmethod
def load_file(flake_dir: str | Path) -> "Inventory":
inventory = Inventory(
machines={}, services={}, meta=InventoryMeta(name="New Clan")
)
inventory_file = Inventory.get_path(flake_dir)
if inventory_file.exists():
with open(inventory_file) as f:
try:
res = json.load(f)
inventory = Inventory.from_dict(res)
except json.JSONDecodeError as e:
raise ClanError(f"Error decoding inventory file: {e}")
return inventory
def persist(self, flake_dir: str | Path, message: str) -> None:
inventory_file = Inventory.get_path(flake_dir)
with open(inventory_file, "w") as f:
json.dump(dataclass_to_dict(self), f, indent=2)
commit_file(inventory_file, Path(flake_dir), commit_message=message)
commit_file(inventory_file, Path(flake_dir), commit_message=message)

View File

@ -0,0 +1,162 @@
# DON NOT EDIT THIS FILE MANUALLY. IT IS GENERATED.
#
# ruff: noqa: N815
# ruff: noqa: N806
# ruff: noqa: F401
# fmt: off
from dataclasses import dataclass, field
from typing import Any
@dataclass
class MachineDeploy:
targetHost: None | str = field(default = None)
@dataclass
class Machine:
deploy: MachineDeploy
name: str
description: None | str = field(default = None)
icon: None | str = field(default = None)
system: None | str = field(default = None)
tags: list[str] = field(default_factory = list)
@dataclass
class Meta:
name: str
description: None | str = field(default = None)
icon: None | str = field(default = None)
@dataclass
class BorgbackupConfigDestination:
name: str
repo: str
@dataclass
class BorgbackupConfig:
destinations: dict[str, BorgbackupConfigDestination] = field(default_factory = dict)
@dataclass
class ServiceBorgbackupMachine:
config: BorgbackupConfig = field(default_factory = BorgbackupConfig)
imports: list[str] = field(default_factory = list)
@dataclass
class ServiceMeta:
name: str
description: None | str = field(default = None)
icon: None | str = field(default = None)
@dataclass
class ServiceBorgbackupRoleClient:
config: BorgbackupConfig = field(default_factory = BorgbackupConfig)
imports: list[str] = field(default_factory = list)
machines: list[str] = field(default_factory = list)
tags: list[str] = field(default_factory = list)
@dataclass
class ServiceBorgbackupRoleServer:
config: BorgbackupConfig = field(default_factory = BorgbackupConfig)
imports: list[str] = field(default_factory = list)
machines: list[str] = field(default_factory = list)
tags: list[str] = field(default_factory = list)
@dataclass
class ServiceBorgbackupRole:
client: ServiceBorgbackupRoleClient
server: ServiceBorgbackupRoleServer
@dataclass
class ServiceBorgbackup:
meta: ServiceMeta
roles: ServiceBorgbackupRole
config: BorgbackupConfig = field(default_factory = BorgbackupConfig)
machines: dict[str, ServiceBorgbackupMachine] = field(default_factory = dict)
@dataclass
class PackagesConfig:
packages: list[str] = field(default_factory = list)
@dataclass
class ServicePackageMachine:
config: PackagesConfig = field(default_factory = PackagesConfig)
imports: list[str] = field(default_factory = list)
@dataclass
class ServicePackageRoleDefault:
config: PackagesConfig = field(default_factory = PackagesConfig)
imports: list[str] = field(default_factory = list)
machines: list[str] = field(default_factory = list)
tags: list[str] = field(default_factory = list)
@dataclass
class ServicePackageRole:
default: ServicePackageRoleDefault
@dataclass
class ServicePackage:
meta: ServiceMeta
roles: ServicePackageRole
config: PackagesConfig = field(default_factory = PackagesConfig)
machines: dict[str, ServicePackageMachine] = field(default_factory = dict)
@dataclass
class SingleDiskConfig:
device: None | str = field(default = None)
@dataclass
class ServiceSingleDiskMachine:
config: SingleDiskConfig = field(default_factory = SingleDiskConfig)
imports: list[str] = field(default_factory = list)
@dataclass
class ServiceSingleDiskRoleDefault:
config: SingleDiskConfig = field(default_factory = SingleDiskConfig)
imports: list[str] = field(default_factory = list)
machines: list[str] = field(default_factory = list)
tags: list[str] = field(default_factory = list)
@dataclass
class ServiceSingleDiskRole:
default: ServiceSingleDiskRoleDefault
@dataclass
class ServiceSingleDisk:
meta: ServiceMeta
roles: ServiceSingleDiskRole
config: SingleDiskConfig = field(default_factory = SingleDiskConfig)
machines: dict[str, ServiceSingleDiskMachine] = field(default_factory = dict)
@dataclass
class Service:
borgbackup: dict[str, ServiceBorgbackup] = field(default_factory = dict)
packages: dict[str, ServicePackage] = field(default_factory = dict)
single_disk: dict[str, ServiceSingleDisk] = field(default_factory = dict, metadata = {"original_name": "single-disk"})
@dataclass
class Inventory:
meta: Meta
services: Service
machines: dict[str, Machine] = field(default_factory = dict)

View File

@ -7,7 +7,7 @@ from ..api import API
from ..clan_uri import FlakeId
from ..errors import ClanError
from ..git import commit_file
from ..inventory import Inventory, Machine
from ..inventory import Machine, MachineDeploy, get_path, load_inventory, save_inventory
log = logging.getLogger(__name__)
@ -20,11 +20,11 @@ def create_machine(flake: FlakeId, machine: Machine) -> None:
"Machine name must be a valid hostname", location="Create Machine"
)
inventory = Inventory.load_file(flake.path)
inventory = load_inventory(flake.path)
inventory.machines.update({machine.name: machine})
inventory.persist(flake.path, f"Create machine {machine.name}")
save_inventory(inventory, flake.path, f"Create machine {machine.name}")
commit_file(Inventory.get_path(flake.path), Path(flake.path))
commit_file(get_path(flake.path), Path(flake.path))
def create_command(args: argparse.Namespace) -> None:
@ -36,6 +36,7 @@ def create_command(args: argparse.Namespace) -> None:
description=args.description,
tags=args.tags,
icon=args.icon,
deploy=MachineDeploy(),
),
)

View File

@ -6,18 +6,18 @@ from ..clan_uri import FlakeId
from ..completions import add_dynamic_completer, complete_machines
from ..dirs import specific_machine_dir
from ..errors import ClanError
from ..inventory import Inventory
from ..inventory import load_inventory, save_inventory
@API.register
def delete_machine(flake: FlakeId, name: str) -> None:
inventory = Inventory.load_file(flake.path)
inventory = load_inventory(flake.path)
machine = inventory.machines.pop(name, None)
if machine is None:
raise ClanError(f"Machine {name} does not exist")
inventory.persist(flake.path, f"Delete machine {name}")
save_inventory(inventory, flake.path, f"Delete machine {name}")
folder = specific_machine_dir(flake.path, name)
if folder.exists():

View File

@ -4,7 +4,7 @@ import logging
from pathlib import Path
from clan_cli.api import API
from clan_cli.inventory import Machine
from clan_cli.inventory import Machine, from_dict
from ..cmd import run_no_stdout
from ..nix import nix_eval
@ -24,7 +24,7 @@ def list_machines(flake_url: str | Path, debug: bool = False) -> dict[str, Machi
proc = run_no_stdout(cmd)
res = proc.stdout.strip()
data = {name: Machine.from_dict(v) for name, v in json.loads(res).items()}
data = {name: from_dict(Machine, v) for name, v in json.loads(res).items()}
return data

View File

@ -25,6 +25,9 @@ class Machine:
_eval_cache: dict[str, str] = field(default_factory=dict)
_build_cache: dict[str, Path] = field(default_factory=dict)
def get_id(self) -> str:
return f"{self.flake}#{self.name}"
def flush_caches(self) -> None:
self.cached_deployment = None
self._build_cache.clear()

View File

@ -4,7 +4,7 @@ import logging
import os
import subprocess
import sys
from collections.abc import Callable
from getpass import getpass
from graphlib import TopologicalSorter
from pathlib import Path
from tempfile import TemporaryDirectory
@ -35,11 +35,10 @@ def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str:
"""
print(prompt, flush=True)
proc = subprocess.run(["cat"], stdout=subprocess.PIPE, text=True)
log.info("Input received. Processing...")
return proc.stdout
def bubblewrap_cmd(generator: str, generator_dir: Path, dep_tmpdir: Path) -> list[str]:
def bubblewrap_cmd(generator: str, generator_dir: Path, tmpdir: Path) -> list[str]:
# fmt: off
return nix_shell(
[
@ -52,7 +51,7 @@ def bubblewrap_cmd(generator: str, generator_dir: Path, dep_tmpdir: Path) -> lis
"--tmpfs", "/usr/lib/systemd",
"--dev", "/dev",
"--bind", str(generator_dir), str(generator_dir),
"--ro-bind", str(dep_tmpdir), str(dep_tmpdir),
"--ro-bind", str(tmpdir), str(tmpdir),
"--unshare-all",
"--unshare-user",
"--uid", "1000",
@ -92,7 +91,7 @@ def decrypt_dependencies(
def dependencies_as_dir(
decrypted_dependencies: dict[str, dict[str, bytes]],
tmpdir: Path,
) -> Path:
) -> None:
for dep_generator, files in decrypted_dependencies.items():
dep_generator_dir = tmpdir / dep_generator
dep_generator_dir.mkdir()
@ -102,7 +101,6 @@ def dependencies_as_dir(
file_path.touch()
file_path.chmod(0o600)
file_path.write_bytes(file)
return tmpdir
def execute_generator(
@ -112,7 +110,6 @@ def execute_generator(
secret_vars_store: SecretStoreBase,
public_vars_store: FactStoreBase,
dep_tmpdir: Path,
prompt: Callable[[str], str],
) -> bool:
generator_dir = dep_tmpdir / generator_name
# check if all secrets exist and generate them if at least one is missing
@ -124,11 +121,7 @@ def execute_generator(
msg = f"flake is not a Path: {machine.flake}"
msg += "fact/secret generation is only supported for local flakes"
# compatibility for old outputs.nix users
generator = machine.vars_generators[generator_name]["finalScript"]
# if machine.vars_data[generator_name]["generator"]["prompt"]:
# prompt_value = prompt(machine.vars_data[generator_name]["generator"]["prompt"])
# env["prompt_value"] = prompt_value
# build temporary file tree of dependencies
decrypted_dependencies = decrypt_dependencies(
@ -138,10 +131,26 @@ def execute_generator(
generator_dir.mkdir(parents=True)
env["out"] = str(generator_dir)
with TemporaryDirectory() as tmp:
dep_tmpdir = dependencies_as_dir(decrypted_dependencies, Path(tmp))
tmpdir = Path(tmp)
deps_tempdir = tmpdir / "deps"
prompts_tmpdir = tmpdir / "prompts"
# populate dependency inputs
dependencies_as_dir(decrypted_dependencies, deps_tempdir)
# populate prompted values
# TODO: make this rest API friendly
if machine.vars_generators[generator_name]["prompts"]:
prompts_tmpdir.mkdir()
env["prompts"] = str(prompts_tmpdir)
for prompt_name, prompt in machine.vars_generators[generator_name][
"prompts"
].items():
prompt_file = prompts_tmpdir / prompt_name
value = prompt_func(prompt["description"], prompt["type"])
prompt_file.write_text(value)
env["in"] = str(dep_tmpdir)
if sys.platform == "linux":
cmd = bubblewrap_cmd(generator, generator_dir, dep_tmpdir=dep_tmpdir)
cmd = bubblewrap_cmd(generator, generator_dir, tmpdir=tmpdir)
else:
cmd = ["bash", "-c", generator]
run(
@ -177,9 +186,18 @@ def execute_generator(
return True
def prompt_func(text: str) -> str:
print(f"{text}: ")
return read_multiline_input()
def prompt_func(description: str, input_type: str) -> str:
print(f"Enter the value for {description}: ")
if input_type == "line":
result = input()
elif input_type == "multiline":
result = read_multiline_input()
elif input_type == "hidden":
result = getpass()
else:
raise ClanError(f"Unknown input type: {input_type} for prompt {description}")
log.info("Input received. Processing...")
return result
def _get_subgraph(graph: dict[str, set], vertex: str) -> dict[str, set]:
@ -198,7 +216,6 @@ def _generate_vars_for_machine(
generator_name: str | None,
regenerate: bool,
tmpdir: Path,
prompt: Callable[[str], str] = prompt_func,
) -> bool:
local_temp = tmpdir / machine.name
local_temp.mkdir()
@ -216,13 +233,6 @@ def _generate_vars_for_machine(
f"Could not find generator with name: {generator_name}. The following generators are available: {generators}"
)
# if generator_name:
# machine_generator_facts = {
# generator_name: machine.vars_generators[generator_name]
# }
# else:
# machine_generator_facts = machine.vars_generators
graph = {
gen_name: set(generator["dependencies"])
for gen_name, generator in machine.vars_generators.items()
@ -251,7 +261,6 @@ def _generate_vars_for_machine(
secret_vars_store=secret_vars_store,
public_vars_store=public_vars_store,
dep_tmpdir=local_temp,
prompt=prompt,
)
if machine_updated:
# flush caches to make sure the new secrets are available in evaluation
@ -263,7 +272,6 @@ def generate_vars(
machines: list[Machine],
generator_name: str | None,
regenerate: bool,
prompt: Callable[[str], str] = prompt_func,
) -> bool:
was_regenerated = False
with TemporaryDirectory() as tmp:
@ -273,7 +281,7 @@ def generate_vars(
errors = 0
try:
was_regenerated |= _generate_vars_for_machine(
machine, generator_name, regenerate, tmpdir, prompt
machine, generator_name, regenerate, tmpdir
)
except Exception as exc:
log.error(f"Failed to generate facts for {machine.name}: {exc}")

View File

@ -21,6 +21,10 @@ class VmConfig:
graphics: bool
waypipe: bool = False
def __post_init__(self) -> None:
if isinstance(self.flake_url, str):
self.flake_url = FlakeId(self.flake_url)
def inspect_vm(machine: Machine) -> VmConfig:
data = json.loads(machine.eval_nix("config.clan.core.vm.inspect"))

View File

@ -1,8 +1,13 @@
import os
import random
from collections.abc import Generator
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from clan_cli.qemu.qmp import QEMUMonitorProtocol
from ..errors import ClanError
from .inspect import VmConfig
@ -145,3 +150,25 @@ def qemu_command(
else:
command.append("-nographic")
return QemuCommand(command, vsock_cid=vsock_cid)
class QMPWrapper:
def __init__(self, state_dir: Path) -> None:
# These sockets here are just symlinks to the real sockets which
# are created by the run.py file. The reason being that we run into
# file path length issues on Linux. If no qemu process is running
# the symlink will be dangling.
self._qmp_socket: Path = state_dir / "qmp.sock"
self._qga_socket: Path = state_dir / "qga.sock"
@contextmanager
def qmp_ctx(self) -> Generator[QEMUMonitorProtocol, None, None]:
rpath = self._qmp_socket.resolve()
if not rpath.exists():
raise ClanError(f"qmp socket {rpath} does not exist. Is the VM running?")
qmp = QEMUMonitorProtocol(str(rpath))
qmp.connect()
try:
yield qmp
finally:
qmp.close()

View File

@ -110,7 +110,7 @@ def run_vm(
nix_options: list[str] = [],
) -> None:
with ExitStack() as stack:
machine = Machine(vm.machine_name, vm.flake_url)
machine = Machine(name=vm.machine_name, flake=vm.flake_url)
log.debug(f"Creating VM for {machine}")
# store the temporary rootfs inside XDG_CACHE_HOME on the host

View File

@ -21,6 +21,9 @@
clan-core-path,
nixpkgs,
includedRuntimeDeps,
inventory-schema,
classgen,
}:
let
pythonDependencies = [
@ -60,6 +63,8 @@ let
rm $out/clan_cli/config/jsonschema
ln -sf ${nixpkgs'} $out/clan_cli/nixpkgs
cp -r ${../../lib/jsonschema} $out/clan_cli/config/jsonschema
${classgen}/bin/classgen ${inventory-schema}/schema.json $out/clan_cli/inventory/classes.py
'';
# Create a custom nixpkgs for use within the project

View File

@ -9,7 +9,7 @@
{ self', pkgs, ... }:
let
flakeLock = lib.importJSON (self + /flake.lock);
flakeInputs = (builtins.removeAttrs inputs [ "self" ]);
flakeInputs = builtins.removeAttrs inputs [ "self" ];
flakeLockVendoredDeps = flakeLock // {
nodes =
flakeLock.nodes
@ -38,7 +38,6 @@
'';
in
{
devShells.clan-cli = pkgs.callPackage ./shell.nix {
inherit (self'.packages) clan-cli clan-cli-full;
inherit self';
@ -46,6 +45,7 @@
packages = {
clan-cli = pkgs.python3.pkgs.callPackage ./default.nix {
inherit (inputs) nixpkgs;
inherit (self'.packages) inventory-schema classgen;
clan-core-path = clanCoreWithVendoredDeps;
includedRuntimeDeps = [
"age"
@ -54,6 +54,7 @@
};
clan-cli-full = pkgs.python3.pkgs.callPackage ./default.nix {
inherit (inputs) nixpkgs;
inherit (self'.packages) inventory-schema classgen;
clan-core-path = clanCoreWithVendoredDeps;
includedRuntimeDeps = lib.importJSON ./clan_cli/nix/allowed-programs.json;
};
@ -64,6 +65,8 @@
buildInputs = [ pkgs.python3 ];
installPhase = ''
${self'.packages.classgen}/bin/classgen ${self'.packages.inventory-schema}/schema.json ./clan_cli/inventory/classes.py
python docs.py reference
mkdir -p $out
cp -r out/* $out
@ -77,6 +80,8 @@
buildInputs = [ pkgs.python3 ];
installPhase = ''
${self'.packages.classgen}/bin/classgen ${self'.packages.inventory-schema}/schema.json ./clan_cli/inventory/classes.py
python api.py > $out
'';
};
@ -84,6 +89,35 @@
default = self'.packages.clan-cli;
};
checks = self'.packages.clan-cli.tests;
checks = self'.packages.clan-cli.tests // {
inventory-classes-up-to-date = pkgs.stdenv.mkDerivation {
name = "inventory-classes-up-to-date";
src = ./clan_cli/inventory;
env = {
classFile = "classes.py";
};
installPhase = ''
${self'.packages.classgen}/bin/classgen ${self'.packages.inventory-schema}/schema.json b_classes.py
file1=$classFile
file2=b_classes.py
echo "Comparing $file1 and $file2"
if cmp -s "$file1" "$file2"; then
echo "Files are identical"
echo "Classes file is up to date"
else
echo "Classes file is out of date or has been modified"
echo "run ./update.sh in the inventory directory to update the classes file"
echo "--------------------------------\n"
diff "$file1" "$file2"
echo "--------------------------------\n\n"
exit 1
fi
touch $out
'';
};
};
};
}

View File

@ -45,5 +45,9 @@ mkShell {
# Needed for impure tests
ln -sfT ${clan-cli.nixpkgs} "$PKG_ROOT/clan_cli/nixpkgs"
# Generate classes.py from inventory schema
# This file is in .gitignore
${self'.packages.classgen}/bin/classgen ${self'.packages.inventory-schema}/schema.json $PKG_ROOT/clan_cli/inventory/classes.py
'';
}

View File

@ -3,10 +3,10 @@
clan.core.networking.targetHost = "__CLAN_TARGET_ADDRESS__";
system.stateVersion = lib.version;
sops.age.keyFile = "__CLAN_SOPS_KEY_PATH__";
clan.core.secretsUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.core.facts.secretUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.virtualisation.graphics = false;
clan.core.networking.zerotier.controller.enable = true;
clan.core.facts.networking.zerotier.controller.enable = true;
networking.useDHCP = false;
systemd.services.shutdown-after-boot = {

View File

@ -3,7 +3,7 @@
clan.core.networking.targetHost = "__CLAN_TARGET_ADDRESS__";
system.stateVersion = lib.version;
sops.age.keyFile = "__CLAN_SOPS_KEY_PATH__";
clan.core.secretsUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.core.facts.secretUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.virtualisation.graphics = false;
clan.core.networking.zerotier.controller.enable = true;

View File

@ -18,7 +18,7 @@
clan.core.networking.targetHost = "__CLAN_TARGET_ADDRESS__";
system.stateVersion = lib.version;
sops.age.keyFile = "__CLAN_SOPS_KEY_PATH__";
clan.core.secretsUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.core.facts.secretUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.core.sops.defaultGroups = [ "admins" ];
clan.virtualisation.graphics = false;
@ -48,7 +48,7 @@
clan.core.networking.targetHost = "__CLAN_TARGET_ADDRESS__";
system.stateVersion = lib.version;
sops.age.keyFile = "__CLAN_SOPS_KEY_PATH__";
clan.core.secretsUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.core.facts.secretUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.core.networking.zerotier.networkId = "82b44b162ec6c013";
};
};

View File

@ -25,8 +25,8 @@
clan.core.networking.targetHost = "__CLAN_TARGET_ADDRESS__";
system.stateVersion = lib.version;
clan.core.secretStore = "password-store";
clan.core.secretsUploadDirectory = lib.mkForce "__CLAN_SOPS_KEY_DIR__/secrets";
clan.core.facts.secretStore = "password-store";
clan.core.facts.secretUploadDirectory = lib.mkForce "__CLAN_SOPS_KEY_DIR__/secrets";
clan.core.networking.zerotier.controller.enable = true;

View File

@ -8,7 +8,7 @@ from clan_cli.config.machine import (
verify_machine_config,
)
from clan_cli.config.schema import machine_schema
from clan_cli.inventory import Machine
from clan_cli.inventory import Machine, MachineDeploy
from clan_cli.machines.create import create_machine
from clan_cli.machines.list import list_machines
@ -31,6 +31,7 @@ def test_create_machine_on_minimal_clan(test_flake_minimal: FlakeForTest) -> Non
description="A test machine",
tags=["test"],
icon=None,
deploy=MachineDeploy(),
),
)

View File

@ -4,9 +4,19 @@ from typing import TYPE_CHECKING
import pytest
from fixtures_flakes import FlakeForTest
from clan_cli.api.modules import list_modules, update_module_instance
from clan_cli.api.modules import list_modules
from clan_cli.clan_uri import FlakeId
from clan_cli.inventory import Machine, Role, Service, ServiceMeta
from clan_cli.inventory import (
Machine,
MachineDeploy,
ServiceBorgbackup,
ServiceBorgbackupRole,
ServiceBorgbackupRoleClient,
ServiceBorgbackupRoleServer,
ServiceMeta,
load_inventory,
save_inventory,
)
from clan_cli.machines.create import create_machine
from clan_cli.nix import nix_eval, run_no_stdout
@ -51,21 +61,30 @@ def test_add_module_to_inventory(
]
)
create_machine(
FlakeId(base_path), Machine(name="machine1", tags=[], system="x86_64-linux")
)
update_module_instance(
base_path,
"borgbackup",
"borgbackup1",
Service(
meta=ServiceMeta(name="borgbackup"),
roles={
"client": Role(machines=["machine1"]),
"server": Role(machines=["machine1"]),
},
FlakeId(base_path),
Machine(
name="machine1", tags=[], system="x86_64-linux", deploy=MachineDeploy()
),
)
inventory = load_inventory(base_path)
inventory.services.borgbackup = {
"borg1": ServiceBorgbackup(
meta=ServiceMeta(name="borg1"),
roles=ServiceBorgbackupRole(
client=ServiceBorgbackupRoleClient(
machines=["machine1"],
),
server=ServiceBorgbackupRoleServer(
machines=["machine1"],
),
),
)
}
save_inventory(inventory, base_path, "Add borgbackup service")
cmd = ["facts", "generate", "--flake", str(test_flake_with_core.path), "machine1"]
cli.run(cmd)

View File

@ -1,6 +1,7 @@
import os
from collections import defaultdict
from collections.abc import Callable
from io import StringIO
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
@ -55,7 +56,8 @@ def test_dependencies_as_files() -> None:
),
)
with TemporaryDirectory() as tmpdir:
dep_tmpdir = dependencies_as_dir(decrypted_dependencies, Path(tmpdir))
dep_tmpdir = Path(tmpdir)
dependencies_as_dir(decrypted_dependencies, dep_tmpdir)
assert dep_tmpdir.is_dir()
assert (dep_tmpdir / "gen_1" / "var_1a").read_bytes() == b"var_1a"
assert (dep_tmpdir / "gen_1" / "var_1b").read_bytes() == b"var_1b"
@ -232,3 +234,27 @@ def test_dependant_generators(
)
assert child_file_path.is_file()
assert child_file_path.read_text() == "hello\n"
def test_prompt(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
) -> None:
config = nested_dict()
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
my_generator["files"]["my_value"]["secret"] = False
my_generator["prompts"]["prompt1"]["description"] = "dream2nix"
my_generator["script"] = "cat $prompts/prompt1 > $out/my_value"
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config),
)
monkeypatch.chdir(flake.path)
monkeypatch.setattr("sys.stdin", StringIO("my input"))
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
var_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_value"
)
assert var_file_path.is_file()
assert var_file_path.read_text() == "my input"

View File

@ -0,0 +1,7 @@
# shellcheck shell=bash
source_up
watch_file flake-module.nix shell.nix default.nix
# Because we depend on nixpkgs sources, uploading to builders takes a long time
use flake .#clan-vm-manager --builders ''

1
pkgs/clan-vm-manager/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
**/.vscode

View File

@ -0,0 +1,31 @@
{
"clientRemote": "",
"gitRemote": "",
"gitSha": "",
"treeEntries": [
{
"label": "source of problem",
"entryType": 0,
"author": "lhebendanz",
"locations": [
{
"path": "../clan-cli/clan_cli/history/add.py",
"startLine": 45,
"endLine": 59,
"label": "",
"description": ""
}
],
"details": {
"severity": "",
"difficulty": "",
"type": "",
"description": "",
"exploit": "",
"recommendation": "Short term, \nLong term, \n"
}
}
],
"auditedFiles": [],
"resolvedEntries": []
}

View File

@ -0,0 +1,94 @@
# Clan VM Manager
Provides users with the simple functionality to manage their locally registered clans.
![app-preview](screenshots/image.png)
## Available commands
Run this application
```bash
./bin/clan-vm-manager
```
Join the default machine of a clan
```bash
./bin/clan-vm-manager [clan-uri]
```
Join a specific machine of a clan
```bash
./bin/clan-vm-manager [clan-uri]#[machine]
```
For more available commands see the developer section below.
## Developing this Application
### Debugging Style and Layout
```bash
# Enable the GTK debugger
gsettings set org.gtk.Settings.Debug enable-inspector-keybinding true
# Start the application with the debugger attached
GTK_DEBUG=interactive ./bin/clan-vm-manager --debug
```
Appending `--debug` flag enables debug logging printed into the console.
### Profiling
To activate profiling you can run
```bash
PERF=1 ./bin/clan-vm-manager
```
### Library Components
> Note:
>
> we recognized bugs when starting some cli-commands through the integrated vs-code terminal.
> If encountering issues make sure to run commands in a regular os-shell.
lib-Adw has a demo application showing all widgets. You can run it by executing
```bash
adwaita-1-demo
```
GTK4 has a demo application showing all widgets. You can run it by executing
```bash
gtk4-widget-factory
```
To find available icons execute
```bash
gtk4-icon-browser
```
### Links
Here are some important documentation links related to the Clan VM Manager:
- [Adw PyGobject Reference](http://lazka.github.io/pgi-docs/index.html#Adw-1): This link provides the PyGObject reference documentation for the Adw library, which is used in the Clan VM Manager. It contains detailed information about the Adw widgets and their usage.
- [GTK4 PyGobject Reference](http://lazka.github.io/pgi-docs/index.html#Gtk-4.0): This link provides the PyGObject reference documentation for GTK4, the toolkit used for building the user interface of the Clan VM Manager. It includes information about GTK4 widgets, signals, and other features.
- [Adw Widget Gallery](https://gnome.pages.gitlab.gnome.org/libadwaita/doc/main/widget-gallery.html): This link showcases a widget gallery for Adw, allowing you to see the available widgets and their visual appearance. It can be helpful for designing the user interface of the Clan VM Manager.
- [Python + GTK3 Tutorial](https://python-gtk-3-tutorial.readthedocs.io/en/latest/textview.html): Although the Clan VM Manager uses GTK4, this tutorial for GTK3 can still be useful as it covers the basics of building GTK-based applications with Python. It includes examples and explanations for various GTK widgets, including text views.
- [GNOME Human Interface Guidelines](https://developer.gnome.org/hig/): This link provides the GNOME Human Interface Guidelines, which offer design and usability recommendations for creating GNOME applications. It covers topics such as layout, navigation, and interaction patterns.
## Error handling
> Error dialogs should be avoided where possible, since they are disruptive.
>
> For simple non-critical errors, toasts can be a good alternative.

View File

@ -0,0 +1,13 @@
#!/usr/bin/env python3
import sys
from pathlib import Path
module_path = Path(__file__).parent.parent.absolute()
sys.path.insert(0, str(module_path))
sys.path.insert(0, str(module_path.parent / "clan_cli"))
from clan_vm_manager import main # NOQA
if __name__ == "__main__":
main()

View File

@ -0,0 +1,43 @@
{
"folders": [
{
"path": "."
},
{
"path": "../clan-cli/clan_cli"
},
{
"path": "../clan-cli/tests"
},
{
"path": "../../nixosModules"
},
{
"path": "../../lib/build-clan"
}
],
"settings": {
"python.linting.mypyEnabled": true,
"files.exclude": {
"**/__pycache__": true,
"**/.direnv": true,
"**/.hypothesis": true,
"**/.mypy_cache": true,
"**/.reports": true,
"**/.ruff_cache": true,
"**/result/**": true,
"/nix/store/**": true
},
"search.exclude": {
"**/__pycache__": true,
"**/.direnv": true,
"**/.hypothesis": true,
"**/.mypy_cache": true,
"**/.reports": true,
"**/.ruff_cache": true,
"**/result/": true,
"/nix/store/**": true
},
"files.autoSave": "off"
}
}

View File

@ -0,0 +1,14 @@
import logging
import sys
from clan_cli.profiler import profile
from clan_vm_manager.app import MainApplication
log = logging.getLogger(__name__)
@profile
def main(argv: list[str] = sys.argv) -> int:
app = MainApplication()
return app.run(argv)

View File

@ -0,0 +1,6 @@
import sys
from . import main
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,127 @@
#!/usr/bin/env python3
import logging
from typing import Any, ClassVar
import gi
from clan_vm_manager import assets
from clan_vm_manager.singletons.toast import InfoToast, ToastOverlay
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from clan_cli.custom_logger import setup_logging
from gi.repository import Adw, Gdk, Gio, Gtk
from clan_vm_manager.components.interfaces import ClanConfig
from clan_vm_manager.singletons.use_join import GLib, GObject
from .windows.main_window import MainWindow
log = logging.getLogger(__name__)
class MainApplication(Adw.Application):
"""
This class is initialized every time the app is started
Only the Adw.ApplicationWindow is a singleton.
So don't use any singletons in the Adw.Application class.
"""
__gsignals__: ClassVar = {
"join_request": (GObject.SignalFlags.RUN_FIRST, None, [str]),
}
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(
application_id="org.clan.vm-manager",
flags=Gio.ApplicationFlags.HANDLES_COMMAND_LINE,
)
self.add_main_option(
"debug",
ord("d"),
GLib.OptionFlags.NONE,
GLib.OptionArg.NONE,
"enable debug mode",
None,
)
self.window: MainWindow | None = None
self.connect("activate", self.on_activate)
self.connect("shutdown", self.on_shutdown)
def on_shutdown(self, source: "MainApplication") -> None:
log.debug("Shutting down Adw.Application")
if self.get_windows() == []:
log.warning("No windows to destroy")
if self.window:
# TODO: Doesn't seem to raise the destroy signal. Need to investigate
# self.get_windows() returns an empty list. Desync between window and application?
self.window.close()
# Killing vms directly. This is dirty
self.window.kill_vms()
else:
log.error("No window to destroy")
def do_command_line(self, command_line: Any) -> int:
options = command_line.get_options_dict()
# convert GVariantDict -> GVariant -> dict
options = options.end().unpack()
if "debug" in options and self.window is None:
setup_logging(logging.DEBUG, root_log_name=__name__.split(".")[0])
setup_logging(logging.DEBUG, root_log_name="clan_cli")
elif self.window is None:
setup_logging(logging.INFO, root_log_name=__name__.split(".")[0])
log.debug("Debug logging enabled")
if "debug" in options:
ToastOverlay.use().add_toast_unique(
InfoToast("Debug logging enabled").toast, "info.debugging.enabled"
)
args = command_line.get_arguments()
self.activate()
if len(args) > 1:
uri = args[1]
self.emit("join_request", uri)
return 0
def on_window_hide_unhide(self, *_args: Any) -> None:
if not self.window:
log.error("No window to hide/unhide")
return
if self.window.is_visible():
self.window.hide()
else:
self.window.present()
def dummy_menu_entry(self) -> None:
log.info("Dummy menu entry called")
def on_activate(self, source: "MainApplication") -> None:
if not self.window:
self.init_style()
self.window = MainWindow(config=ClanConfig(initial_view="list"))
self.window.set_application(self)
self.window.show()
# TODO: For css styling
def init_style(self) -> None:
resource_path = assets.loc / "style.css"
log.debug(f"Style css path: {resource_path}")
css_provider = Gtk.CssProvider()
css_provider.load_from_path(str(resource_path))
display = Gdk.Display.get_default()
assert display is not None
Gtk.StyleContext.add_provider_for_display(
display,
css_provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
)

View File

@ -0,0 +1,7 @@
from pathlib import Path
loc: Path = Path(__file__).parent
def get_asset(name: str | Path) -> Path:
return loc / name

Binary file not shown.

After

Width:  |  Height:  |  Size: 108 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 95 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 106 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 104 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 155 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 163 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 183 KiB

View File

@ -0,0 +1,66 @@
/* Insert custom styles here */
navigation-view {
padding: 5px;
/* padding-left: 5px;
padding-right: 5px;
padding-bottom: 5px; */
}
avatar {
margin: 2px;
}
.trust {
padding-top: 25px;
padding-bottom: 25px;
}
.join-list {
margin-top: 1px;
margin-left: 2px;
margin-right: 2px;
}
.progress-bar {
margin-right: 25px;
min-width: 200px;
}
.group-list {
background-color: inherit;
}
.group-list > .activatable:hover {
background-color: unset;
}
.group-list > row {
margin-top: 12px;
border-bottom: unset;
}
.vm-list {
margin-top: 25px;
margin-bottom: 25px;
}
.no-shadow {
box-shadow: none;
}
.search-entry {
margin-bottom: 12px;
}
searchbar {
margin-bottom: 25px;
}
.log-view {
margin-top: 12px;
font-family: monospace;
padding: 8px;
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 152 KiB

View File

@ -0,0 +1,132 @@
import logging
import os
import signal
import sys
import traceback
from pathlib import Path
from typing import Any
import gi
gi.require_version("GdkPixbuf", "2.0")
import dataclasses
import multiprocessing as mp
from collections.abc import Callable
log = logging.getLogger(__name__)
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def _kill_group(proc: mp.Process) -> None:
pid = proc.pid
if proc.is_alive() and pid:
os.killpg(pid, signal.SIGTERM)
else:
log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead")
@dataclasses.dataclass(frozen=True)
class MPProcess:
name: str
proc: mp.Process
out_file: Path
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def kill_group(self) -> None:
_kill_group(proc=self.proc)
def _set_proc_name(name: str) -> None:
if sys.platform != "linux":
return
import ctypes
# Define the prctl function with the appropriate arguments and return type
libc = ctypes.CDLL("libc.so.6")
prctl = libc.prctl
prctl.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
]
prctl.restype = ctypes.c_int
# Set the process name to "my_process"
prctl(15, name.encode(), 0, 0, 0)
def _init_proc(
func: Callable,
out_file: Path,
proc_name: str,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
**kwargs: Any,
) -> None:
# Create a new process group
os.setsid()
# Open stdout and stderr
with open(out_file, "w") as out_fd:
os.dup2(out_fd.fileno(), sys.stdout.fileno())
os.dup2(out_fd.fileno(), sys.stderr.fileno())
# Print some information
pid = os.getpid()
gpid = os.getpgid(pid=pid)
# Set the process name
_set_proc_name(proc_name)
# Close stdin
sys.stdin.close()
linebreak = "=" * 5
# Execute the main function
print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr)
try:
func(**kwargs)
except Exception as ex:
traceback.print_exc()
if on_except is not None:
on_except(ex, mp.current_process())
# Kill the new process and all its children by sending a SIGTERM signal to the process group
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
os.killpg(gpid, signal.SIGTERM)
sys.exit(1)
# Don't use a finally block here, because we want the exitcode to be set to
# 0 if the function returns normally
def spawn(
*,
out_file: Path,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
func: Callable,
**kwargs: Any,
) -> MPProcess:
# Decouple the process from the parent
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="forkserver")
# Set names
proc_name = f"MPExec:{func.__name__}"
# Start the process
proc = mp.Process(
target=_init_proc,
args=(func, out_file, proc_name, on_except),
name=proc_name,
kwargs=kwargs,
)
proc.start()
# Return the process
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
return mp_proc

View File

@ -0,0 +1,220 @@
import logging
from collections.abc import Callable
from typing import Any, Generic, TypeVar
import gi
gi.require_version("Gio", "2.0")
from gi.repository import Gio, GObject
log = logging.getLogger(__name__)
# Define type variables for key and value types
K = TypeVar("K") # Key type
V = TypeVar(
"V", bound=GObject.Object
) # Value type, bound to GObject.GObject or its subclasses
class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
"""
A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values.
Only use self[key] and del self[key] for accessing the items for better performance.
This class could be optimized by having the objects remember their position in the list.
"""
def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None:
super().__init__()
self.gtype = gtype
self.key_gen = key_gen
# From Python 3.7 onwards dictionaries are ordered by default
self._items: dict[K, V] = dict()
##################################
# #
# Gio.ListStore Interface #
# #
##################################
@classmethod
def new(cls: Any, gtype: type[V]) -> "GKVStore":
return cls.__new__(cls, gtype)
def append(self, item: V) -> None:
key = self.key_gen(item)
self[key] = item
def find(self, item: V) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if v == item:
return True, i
return False, -1
def find_with_equal_func(
self, item: V, equal_func: Callable[[V, V], bool]
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item):
return True, i
return False, -1
def find_with_equal_func_full(
self, item: V, equal_func: Callable[[V, V, Any], bool], user_data: Any
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item, user_data):
return True, i
return False, -1
def insert(self, position: int, item: V) -> None:
log.warning("Inserting is O(n) in GKVStore. Better use append")
log.warning(
"This functions may have incorrect items_changed signal behavior. Please test it"
)
key = self.key_gen(item)
if key in self._items:
raise ValueError("Key already exists in the dictionary")
if position < 0 or position > len(self._items):
raise IndexError("Index out of range")
# Temporary storage for items to be reinserted
temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]]
# Delete items from the original dict
for k in list(self.keys())[position:]:
del self._items[k]
# Insert the new key-value pair
self._items[key] = item
# Reinsert the items
for i, (k, v) in enumerate(temp_list):
self._items[k] = v
# Notify the model of the changes
self.items_changed(position, 0, 1)
def insert_sorted(
self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any
) -> None:
raise NotImplementedError("insert_sorted is not implemented in GKVStore")
def remove(self, position: int) -> None:
if position < 0 or position >= self.get_n_items():
return
key = self.keys()[position]
del self[key]
self.items_changed(position, 1, 0)
def remove_all(self) -> None:
self._items.clear()
self.items_changed(0, len(self._items), 0)
def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None:
raise NotImplementedError("sort is not implemented in GKVStore")
def splice(self, position: int, n_removals: int, additions: list[V]) -> None:
raise NotImplementedError("splice is not implemented in GKVStore")
##################################
# #
# Gio.ListModel Interface #
# #
##################################
def get_item(self, position: int) -> V | None:
if position < 0 or position >= self.get_n_items():
return None
# Access items by index since OrderedDict does not support direct indexing
key = list(self._items.keys())[position]
return self._items[key]
def do_get_item(self, position: int) -> V | None:
return self.get_item(position)
def get_item_type(self) -> Any:
return self.gtype.__gtype__ # type: ignore[attr-defined]
def do_get_item_type(self) -> GObject.GType:
return self.get_item_type()
def get_n_items(self) -> int:
return len(self._items)
def do_get_n_items(self) -> int:
return self.get_n_items()
##################################
# #
# Dict Interface #
# #
##################################
def keys(self) -> list[K]:
return list(self._items.keys())
def values(self) -> list[V]:
return list(self._items.values())
def items(self) -> list[tuple[K, V]]:
return list(self._items.items())
def get(self, key: K, default: V | None = None) -> V | None:
return self._items.get(key, default)
# O(1) operation if the key does not exist, O(n) if it does
def __setitem__(self, key: K, value: V) -> None:
# If the key already exists, remove it O(n)
if key in self._items:
log.debug("Updating an existing key in GKVStore is O(n)")
position = self.keys().index(key)
self._items[key] = value
self.items_changed(position, 1, 1)
else:
# Add the new key-value pair
self._items[key] = value
position = max(len(self._items) - 1, 0)
self.items_changed(position, 0, 1)
# O(n) operation
def __delitem__(self, key: K) -> None:
position = self.keys().index(key)
del self._items[key]
self.items_changed(position, 1, 0)
def __len__(self) -> int:
return len(self._items)
# O(1) operation
def __getitem__(self, key: K) -> V: # type: ignore[override]
return self._items[key]
def __contains__(self, key: K) -> bool: # type: ignore[override]
return key in self._items
def __str__(self) -> str:
resp = "GKVStore(\n"
for k, v in self._items.items():
resp += f"{k}: {v}\n"
resp += ")"
return resp
def __repr__(self) -> str:
return self._items.__str__()
##################################
# #
# Custom Methods #
# #
##################################
def first(self) -> V:
return self.values()[0]
def last(self) -> V:
return self.values()[-1]
def register_on_change(
self, callback: Callable[["GKVStore[K,V]", int, int, int], None]
) -> None:
self.connect("items-changed", callback)

View File

@ -0,0 +1,10 @@
from dataclasses import dataclass
import gi
gi.require_version("Gtk", "4.0")
@dataclass
class ClanConfig:
initial_view: str

View File

@ -0,0 +1,74 @@
import logging
from collections.abc import Callable
from typing import TypeVar
import gi
from clan_vm_manager import assets
gi.require_version("Adw", "1")
from gi.repository import Adw, GdkPixbuf, Gio, GObject, Gtk
log = logging.getLogger(__name__)
ListItem = TypeVar("ListItem", bound=GObject.Object)
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
class EmptySplash(Gtk.Box):
def __init__(self, on_join: Callable[[str], None]) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
self.on_join = on_join
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
clan_icon = self.load_image(str(assets.get_asset("clan_black_notext.png")))
if clan_icon:
image = Gtk.Image.new_from_pixbuf(clan_icon)
else:
image = Gtk.Image.new_from_icon_name("image-missing")
# same as the clamp
image.set_pixel_size(400)
image.set_opacity(0.5)
image.set_margin_top(20)
image.set_margin_bottom(10)
vbox.append(image)
empty_label = Gtk.Label(label="Welcome to Clan! Join your first clan.")
join_entry = Gtk.Entry()
join_entry.set_placeholder_text("clan://<url>")
join_entry.set_hexpand(True)
join_button = Gtk.Button(label="Join")
join_button.connect("clicked", self._on_join, join_entry)
join_entry.connect("activate", lambda e: self._on_join(join_button, e))
clamp = Adw.Clamp()
clamp.set_maximum_size(400)
clamp.set_margin_bottom(40)
vbox.append(empty_label)
hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
hbox.append(join_entry)
hbox.append(join_button)
vbox.append(hbox)
clamp.set_child(vbox)
self.append(clamp)
def load_image(self, file_path: str) -> GdkPixbuf.Pixbuf | None:
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file(file_path)
return pixbuf
except Exception as e:
log.error(f"Failed to load image: {e}")
return None
def _on_join(self, button: Gtk.Button, entry: Gtk.Entry) -> None:
"""
Callback for the join button
Extracts the text from the entry and calls the on_join callback
"""
log.info(f"Splash screen: Joining {entry.get_text()}")
self.on_join(entry.get_text())

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,384 @@
import logging
import multiprocessing as mp
import os
import tempfile
import threading
import time
import weakref
from collections.abc import Callable, Generator
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import IO, ClassVar
import gi
from clan_cli import vms
from clan_cli.clan_uri import ClanURI
from clan_cli.dirs import vm_state_dir
from clan_cli.history.add import HistoryEntry
from clan_cli.machines.machines import Machine
from clan_cli.vms.qemu import QMPWrapper
from clan_vm_manager.components.executor import MPProcess, spawn
from clan_vm_manager.singletons.toast import (
InfoToast,
SuccessToast,
ToastOverlay,
WarningToast,
)
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib, GObject, Gtk
log = logging.getLogger(__name__)
class VMObject(GObject.Object):
# Define a custom signal with the name "vm_stopped" and a string argument for the message
__gsignals__: ClassVar = {
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, []),
"vm_build_notify": (GObject.SignalFlags.RUN_FIRST, None, [bool, bool]),
}
def __init__(
self,
icon: Path,
data: HistoryEntry,
build_log_cb: Callable[[Gio.File], None],
) -> None:
super().__init__()
# Store the data from the history entry
self.data: HistoryEntry = data
self.build_log_cb = build_log_cb
# Create a process object to store the VM process
self.vm_process: MPProcess = MPProcess(
"vm_dummy", mp.Process(), Path("./dummy")
)
self.build_process: MPProcess = MPProcess(
"build_dummy", mp.Process(), Path("./dummy")
)
self._start_thread: threading.Thread = threading.Thread()
self.machine: Machine | None = None
self.qmp_wrap: QMPWrapper | None = None
# Watcher to stop the VM
self.KILL_TIMEOUT: int = 20 # seconds
self._stop_thread: threading.Thread = threading.Thread()
# Build progress bar vars
self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar()
self.progress_bar.hide()
self.progress_bar.set_hexpand(True) # Horizontally expand
self.prog_bar_id: int = 0
# Create a temporary directory to store the logs
self.log_dir: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory(
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
)
self._logs_id: int = 0
self._log_file: IO[str] | None = None
# To be able to set the switch state programmatically
# we need to store the handler id returned by the connect method
# and block the signal while we change the state. This is cursed.
self.switch: Gtk.Switch = Gtk.Switch()
self.switch_handler_id: int = self.switch.connect(
"notify::active", self._on_switch_toggle
)
self.connect("vm_status_changed", self._on_vm_status_changed)
# Make sure the VM is killed when the reference to this object is dropped
self._finalizer: weakref.finalize = weakref.finalize(self, self._kill_ref_drop)
def _vm_status_changed_task(self) -> bool:
self.emit("vm_status_changed")
return GLib.SOURCE_REMOVE
def update(self, data: HistoryEntry) -> None:
self.data = data
def _on_vm_status_changed(self, source: "VMObject") -> None:
# Signal may be emitted multiple times
self.emit("vm_build_notify", self.is_building(), self.is_running())
prev_state = self.switch.get_state()
next_state = self.is_running() and not self.is_building()
self.switch.set_state(next_state)
if prev_state is False and next_state is True:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"{source.data.flake.flake_attr} started").toast,
"success.vm.start",
)
if self.switch.get_sensitive() is False and not self.is_building():
self.switch.set_sensitive(True)
exit_vm = self.vm_process.proc.exitcode
exit_build = self.build_process.proc.exitcode
exitc = exit_vm or exit_build
if not self.is_running() and exitc != 0:
with self.switch.handler_block(self.switch_handler_id):
self.switch.set_active(False)
log.error(f"VM exited with error. Exitcode: {exitc}")
ToastOverlay.use().add_toast_unique(
WarningToast(f"VM exited with error. Exitcode: {exitc}").toast,
"warning.vm.exit",
)
def _on_switch_toggle(self, switch: Gtk.Switch, user_state: bool) -> None:
if switch.get_active():
switch.set_state(False)
switch.set_sensitive(False)
self.start()
else:
switch.set_state(True)
self.shutdown()
switch.set_sensitive(False)
# We use a context manager to create the machine object
# and make sure it is destroyed when the context is exited
@contextmanager
def _create_machine(self) -> Generator[Machine, None, None]:
uri = ClanURI.from_str(
url=str(self.data.flake.flake_url), machine_name=self.data.flake.flake_attr
)
if uri.flake.is_local():
self.machine = Machine(
name=self.data.flake.flake_attr,
flake=uri.flake,
)
if uri.flake.is_remote():
self.machine = Machine(
name=self.data.flake.flake_attr,
flake=uri.flake,
)
assert self.machine is not None
state_dir = vm_state_dir(
flake_url=str(self.machine.flake.url), vm_name=self.machine.name
)
self.qmp_wrap = QMPWrapper(state_dir)
assert self.machine is not None
yield self.machine
self.machine = None
def _pulse_progress_bar_task(self) -> bool:
if self.progress_bar.is_visible():
self.progress_bar.pulse()
return GLib.SOURCE_CONTINUE
else:
return GLib.SOURCE_REMOVE
def __start(self) -> None:
with self._create_machine() as machine:
# Start building VM
tstart = datetime.now()
log.info(f"Building VM {self.get_id()}")
log_dir = Path(str(self.log_dir.name))
# Start the build process
self.build_process = spawn(
on_except=None,
out_file=log_dir / "build.log",
func=vms.run.build_vm,
machine=machine,
tmpdir=log_dir,
)
gfile = Gio.File.new_for_path(str(log_dir / "build.log"))
# Gio documentation:
# Obtains a file monitor for the given file.
# If no file notification mechanism exists, then regular polling of the file is used.
g_monitor = gfile.monitor_file(Gio.FileMonitorFlags.NONE, None)
g_monitor.connect("changed", self.on_logs_changed)
GLib.idle_add(self._vm_status_changed_task)
self.switch.set_sensitive(True)
# Start the logs watcher
self._logs_id = GLib.timeout_add(
50, self._get_logs_task, self.build_process
)
if self._logs_id == 0:
log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.build_process.out_file}")
# Start the progress bar and show it
self.progress_bar.show()
self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar_task)
if self.prog_bar_id == 0:
log.error("Couldn't spawn a progress bar task")
# Wait for the build to finish then hide the progress bar
self.build_process.proc.join()
tend = datetime.now()
log.info(f"VM {self.get_id()} build took {tend - tstart}s")
self.progress_bar.hide()
# Check if the VM was built successfully
if self.build_process.proc.exitcode != 0:
log.error(f"Failed to build VM {self.get_id()}")
GLib.idle_add(self._vm_status_changed_task)
return
log.info(f"Successfully built VM {self.get_id()}")
# Start the VM
self.vm_process = spawn(
on_except=None,
out_file=Path(str(self.log_dir.name)) / "vm.log",
func=vms.run.run_vm,
vm=self.data.flake.vm,
cachedir=log_dir,
socketdir=log_dir,
)
log.debug(f"Started VM {self.get_id()}")
GLib.idle_add(self._vm_status_changed_task)
# Start the logs watcher
self._logs_id = GLib.timeout_add(50, self._get_logs_task, self.vm_process)
if self._logs_id == 0:
log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.vm_process.out_file}")
# Wait for the VM to stop
self.vm_process.proc.join()
log.debug(f"VM {self.get_id()} has stopped")
GLib.idle_add(self._vm_status_changed_task)
def on_logs_changed(
self,
monitor: Gio.FileMonitor,
file: Gio.File,
other_file: Gio.File,
event_type: Gio.FileMonitorEvent,
) -> None:
if event_type == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
# File was changed and the changes were written to disk
# wire up the callback for setting the logs
self.build_log_cb(file)
def start(self) -> None:
if self.is_running():
log.warn("VM is already running. Ignoring start request")
self.emit("vm_status_changed", self)
return
log.debug(f"VM state dir {self.log_dir.name}")
self._start_thread = threading.Thread(target=self.__start)
self._start_thread.start()
def _get_logs_task(self, proc: MPProcess) -> bool:
if not proc.out_file.exists():
return GLib.SOURCE_CONTINUE
if not self._log_file:
try:
self._log_file = open(proc.out_file)
except Exception as ex:
log.exception(ex)
self._log_file = None
return GLib.SOURCE_REMOVE
line = os.read(self._log_file.fileno(), 4096)
if len(line) != 0:
print(line.decode("utf-8"), end="", flush=True)
if not proc.proc.is_alive():
log.debug("Removing logs watcher")
self._log_file = None
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
def is_running(self) -> bool:
return self._start_thread.is_alive()
def is_building(self) -> bool:
return self.build_process.proc.is_alive()
def is_shutting_down(self) -> bool:
return self._stop_thread.is_alive()
def get_id(self) -> str:
return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}"
def __stop(self) -> None:
log.info(f"Stopping VM {self.get_id()}")
start_time = datetime.now()
while self.is_running():
diff = datetime.now() - start_time
if diff.seconds > self.KILL_TIMEOUT:
log.error(
f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it"
)
self.vm_process.kill_group()
break
if self.is_building():
log.info(f"VM {self.get_id()} is still building. Killing it")
self.build_process.kill_group()
break
if not self.machine:
log.error(f"Machine object is None. Killing VM {self.get_id()}")
self.vm_process.kill_group()
break
# Try to shutdown the VM gracefully using QMP
try:
assert self.qmp_wrap is not None
with self.qmp_wrap.qmp_ctx() as qmp:
qmp.command("system_powerdown")
except Exception as ex:
log.debug(f"QMP command 'system_powerdown' ignored. Error: {ex}")
# Try 20 times to stop the VM
time.sleep(self.KILL_TIMEOUT / 20)
GLib.idle_add(self._vm_status_changed_task)
log.debug(f"VM {self.get_id()} has stopped")
ToastOverlay.use().add_toast_unique(
InfoToast(f"Stopped {self.get_id()}").toast, "info.vm.exit"
)
def shutdown(self) -> None:
if not self.is_running():
log.warning("VM not running. Ignoring shutdown request.")
self.emit("vm_status_changed", self)
return
if self.is_shutting_down():
log.warning("Shutdown already in progress")
self.emit("vm_status_changed", self)
return
self._stop_thread = threading.Thread(target=self.__stop)
self._stop_thread.start()
def _kill_ref_drop(self) -> None:
if self.is_running():
log.warning("Killing VM due to reference drop")
self.kill()
def kill(self) -> None:
if not self.is_running():
log.warning(f"Tried to kill VM {self.get_id()} is not running")
return
log.info(f"Killing VM {self.get_id()} now")
if self.vm_process.proc.is_alive():
self.vm_process.kill_group()
if self.build_process.proc.is_alive():
self.build_process.kill_group()
def read_whole_log(self) -> str:
if not self.vm_process.out_file.exists():
log.error(f"Log file {self.vm_process.out_file} does not exist")
return ""
return self.vm_process.out_file.read_text()
def __str__(self) -> str:
return f"VM({self.get_id()})"
def __repr__(self) -> str:
return self.__str__()

View File

@ -0,0 +1,150 @@
import logging
from collections.abc import Callable
from typing import Any
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Adw
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.views.logs import Logs
log = logging.getLogger(__name__)
class ToastOverlay:
"""
The ToastOverlay is a class that manages the display of toasts
It should be used as a singleton in your application to prevent duplicate toasts
Usage
"""
# For some reason, the adw toast overlay cannot be subclassed
# Thats why it is added as a class property
overlay: Adw.ToastOverlay
active_toasts: set[str]
_instance: "None | ToastOverlay" = None
def __init__(self) -> None:
raise RuntimeError("Call use() instead")
@classmethod
def use(cls: Any) -> "ToastOverlay":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.overlay = Adw.ToastOverlay()
cls.active_toasts = set()
return cls._instance
def add_toast_unique(self, toast: Adw.Toast, key: str) -> None:
if key not in self.active_toasts:
self.active_toasts.add(key)
self.overlay.add_toast(toast)
toast.connect("dismissed", lambda toast: self.active_toasts.remove(key))
class ErrorToast:
toast: Adw.Toast
def __init__(
self, message: str, persistent: bool = False, details: str = ""
) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"""<span foreground='red'>❌ Error </span> {message}"""
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.HIGH)
self.toast.set_button_label("Show more")
if persistent:
self.toast.set_timeout(0)
views = ViewStack.use().view
# we cannot check this type, python is not smart enough
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
logs_view.set_message(details)
self.toast.connect(
"button-clicked",
lambda _: views.set_visible_child_name("logs"),
)
class WarningToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"<span foreground='orange'>⚠ Warning </span> {message}"
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class InfoToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(f"<span>❕</span> {message}")
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class SuccessToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(f"<span foreground='green'>✅</span> {message}")
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class LogToast:
toast: Adw.Toast
def __init__(
self,
message: str,
on_button_click: Callable[[], None],
button_label: str = "More",
persistent: bool = False,
) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"""Logs are available <span weight="regular">{message}</span>"""
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
self.toast.set_button_label(button_label)
self.toast.connect(
"button-clicked",
lambda _: on_button_click(),
)

View File

@ -0,0 +1,114 @@
import logging
import threading
from collections.abc import Callable
from typing import Any, ClassVar, cast
import gi
from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import HistoryEntry, add_history
from clan_cli.machines.machines import Machine
from clan_vm_manager.components.gkvstore import GKVStore
from clan_vm_manager.singletons.use_vms import ClanStore
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
class JoinValue(GObject.Object):
__gsignals__: ClassVar = {
"join_finished": (GObject.SignalFlags.RUN_FIRST, None, []),
}
url: ClanURI
entry: HistoryEntry | None
def _join_finished_task(self) -> bool:
self.emit("join_finished")
return GLib.SOURCE_REMOVE
def __init__(self, url: ClanURI) -> None:
super().__init__()
self.url: ClanURI = url
self.entry: HistoryEntry | None = None
def __join(self) -> None:
new_entry = add_history(self.url)
self.entry = new_entry
GLib.idle_add(self._join_finished_task)
def join(self) -> None:
threading.Thread(target=self.__join).start()
class JoinList:
"""
This is a singleton.
It is initialized with the first call of use()
"""
_instance: "None | JoinList" = None
list_store: Gio.ListStore
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
raise RuntimeError("Call use() instead")
@classmethod
def use(cls: Any) -> "JoinList":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.list_store = Gio.ListStore.new(JoinValue)
ClanStore.use().register_on_deep_change(cls._instance._rerender_join_list)
return cls._instance
def _rerender_join_list(
self, source: GKVStore, position: int, removed: int, added: int
) -> None:
self.list_store.items_changed(
0, self.list_store.get_n_items(), self.list_store.get_n_items()
)
def is_empty(self) -> bool:
return self.list_store.get_n_items() == 0
def push(self, uri: ClanURI, after_join: Callable[[JoinValue], None]) -> None:
"""
Add a join request.
This method can add multiple join requests if called subsequently for each request.
"""
value = JoinValue(uri)
machine_id = Machine(uri.machine_name, uri.flake)
machine_id_list = []
for machine_obj in self.list_store:
mvalue: ClanURI = cast(JoinValue, machine_obj).url
machine = Machine(mvalue.machine_name, mvalue.flake)
machine_id_list.append(machine.get_id())
if machine_id in machine_id_list:
log.info(f"Join request already exists: {value.url}. Ignoring.")
return
value.connect("join_finished", self._on_join_finished)
value.connect("join_finished", after_join)
self.list_store.append(value)
def _on_join_finished(self, source: JoinValue) -> None:
log.info(f"Join finished: {source.url}")
self.discard(source)
assert source.entry is not None
ClanStore.use().push_history_entry(source.entry)
def discard(self, value: JoinValue) -> None:
(has, idx) = self.list_store.find(value)
if has:
self.list_store.remove(idx)

View File

@ -0,0 +1,36 @@
from typing import Any
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Adw
class ViewStack:
"""
This is a singleton.
It is initialized with the first call of use()
Usage:
ViewStack.use().set_visible()
ViewStack.use() can also be called before the data is needed. e.g. to eliminate/reduce waiting time.
"""
_instance: "None | ViewStack" = None
view: Adw.ViewStack
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
raise RuntimeError("Call use() instead")
@classmethod
def use(cls: Any) -> "ViewStack":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.view = Adw.ViewStack()
return cls._instance

View File

@ -0,0 +1,183 @@
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any, ClassVar
import gi
from clan_cli.clan_uri import ClanURI
from clan_cli.history.add import HistoryEntry
from clan_cli.machines.machines import Machine
from clan_vm_manager import assets
from clan_vm_manager.components.gkvstore import GKVStore
from clan_vm_manager.components.vmobj import VMObject
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.views.logs import Logs
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
class VMStore(GKVStore):
def __init__(self) -> None:
super().__init__(VMObject, lambda vm: vm.data.flake.flake_attr)
class Emitter(GObject.GObject):
__gsignals__: ClassVar = {
"is_ready": (GObject.SignalFlags.RUN_FIRST, None, []),
}
class ClanStore:
_instance: "None | ClanStore" = None
_clan_store: GKVStore[str, VMStore]
_emitter: Emitter
# set the vm that is outputting logs
# build logs are automatically streamed to the logs-view
_logging_vm: VMObject | None = None
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
raise RuntimeError("Call use() instead")
@classmethod
def use(cls: Any) -> "ClanStore":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls._clan_store = GKVStore(
VMStore, lambda store: store.first().data.flake.flake_url
)
cls._emitter = Emitter()
return cls._instance
def emit(self, signal: str) -> None:
self._emitter.emit(signal)
def connect(self, signal: str, cb: Callable[(...), Any]) -> None:
self._emitter.connect(signal, cb)
def set_logging_vm(self, ident: str) -> VMObject | None:
vm = self.get_vm(ClanURI(f"clan://{ident}"))
if vm is not None:
self._logging_vm = vm
return self._logging_vm
def register_on_deep_change(
self, callback: Callable[[GKVStore, int, int, int], None]
) -> None:
"""
Register a callback that is called when a clan_store or one of the included VMStores changes
"""
def on_vmstore_change(
store: VMStore, position: int, removed: int, added: int
) -> None:
callback(store, position, removed, added)
def on_clanstore_change(
store: "GKVStore", position: int, removed: int, added: int
) -> None:
if added > 0:
store.values()[position].register_on_change(on_vmstore_change)
callback(store, position, removed, added)
self.clan_store.register_on_change(on_clanstore_change)
@property
def clan_store(self) -> GKVStore[str, VMStore]:
return self._clan_store
def create_vm_task(self, vm: HistoryEntry) -> bool:
self.push_history_entry(vm)
return GLib.SOURCE_REMOVE
def push_history_entry(self, entry: HistoryEntry) -> None:
# TODO: We shouldn't do this here but in the list view
if entry.flake.icon is None:
icon: Path = assets.loc / "placeholder.jpeg"
else:
icon = Path(entry.flake.icon)
def log_details(gfile: Gio.File) -> None:
self.log_details(vm, gfile)
vm = VMObject(icon=icon, data=entry, build_log_cb=log_details)
self.push(vm)
def log_details(self, vm: VMObject, gfile: Gio.File) -> None:
views = ViewStack.use().view
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
def file_read_callback(
source_object: Gio.File, result: Gio.AsyncResult, _user_data: Any
) -> None:
try:
# Finish the asynchronous read operation
res = source_object.load_contents_finish(result)
_success, contents, _etag_out = res
# Convert the byte array to a string and print it
logs_view.set_message(contents.decode("utf-8"))
except Exception as e:
print(f"Error reading file: {e}")
# only one vm can output logs at a time
if vm == self._logging_vm:
gfile.load_contents_async(None, file_read_callback, None)
# we cannot check this type, python is not smart enough
def push(self, vm: VMObject) -> None:
url = str(vm.data.flake.flake_url)
# Only write to the store if the Clan is not already in it
# Every write to the KVStore rerenders bound widgets to the clan_store
if url not in self.clan_store:
log.debug(f"Creating new VMStore for {url}")
vm_store = VMStore()
vm_store.append(vm)
self.clan_store[url] = vm_store
else:
vm_store = self.clan_store[url]
machine = vm.data.flake.flake_attr
old_vm = vm_store.get(machine)
if old_vm:
log.info(
f"VM {vm.data.flake.flake_attr} already exists in store. Updating data field."
)
old_vm.update(vm.data)
else:
log.debug(f"Appending VM {vm.data.flake.flake_attr} to store")
vm_store.append(vm)
def remove(self, vm: VMObject) -> None:
del self.clan_store[str(vm.data.flake.flake_url)][vm.data.flake.flake_attr]
def get_vm(self, uri: ClanURI) -> None | VMObject:
flake_id = Machine(uri.machine_name, uri.flake).get_id()
vm_store = self.clan_store.get(flake_id)
if vm_store is None:
return None
machine = vm_store.get(uri.machine_name, None)
return machine
def get_running_vms(self) -> list[VMObject]:
return [
vm
for clan in self.clan_store.values()
for vm in clan.values()
if vm.is_running()
]
def kill_all(self) -> None:
for vm in self.get_running_vms():
vm.kill()

View File

@ -0,0 +1,61 @@
import os
from collections.abc import Callable
from functools import partial
from typing import Any, Literal, TypeVar
import gi
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GObject, Gtk
# Define a TypeVar that is bound to GObject.Object
ListItem = TypeVar("ListItem", bound=GObject.Object)
def create_details_list(
model: Gio.ListStore, render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget]
) -> Gtk.ListBox:
boxed_list = Gtk.ListBox()
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
boxed_list.add_css_class("boxed-list")
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
return boxed_list
class PreferencesValue(GObject.Object):
variant: Literal["CPU", "MEMORY"]
editable: bool
data: Any
def __init__(
self, variant: Literal["CPU", "MEMORY"], editable: bool, data: Any
) -> None:
super().__init__()
self.variant = variant
self.editable = editable
self.data = data
class Details(Gtk.Box):
def __init__(self) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
preferences_store = Gio.ListStore.new(PreferencesValue)
preferences_store.append(PreferencesValue("CPU", True, 1))
self.details_list = create_details_list(
model=preferences_store, render_row=self.render_entry_row
)
self.append(self.details_list)
def render_entry_row(
self, boxed_list: Gtk.ListBox, item: PreferencesValue
) -> Gtk.Widget:
cores: int | None = os.cpu_count()
fcores = float(cores) if cores else 1.0
row = Adw.SpinRow.new_with_range(0, fcores, 1)
row.set_value(item.data)
return row

View File

@ -0,0 +1,356 @@
import base64
import logging
from collections.abc import Callable
from functools import partial
from typing import Any, TypeVar
import gi
from clan_cli.clan_uri import ClanURI
from clan_vm_manager.components.gkvstore import GKVStore
from clan_vm_manager.components.interfaces import ClanConfig
from clan_vm_manager.components.list_splash import EmptySplash
from clan_vm_manager.components.vmobj import VMObject
from clan_vm_manager.singletons.toast import (
LogToast,
SuccessToast,
ToastOverlay,
WarningToast,
)
from clan_vm_manager.singletons.use_join import JoinList, JoinValue
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.singletons.use_vms import ClanStore, VMStore
from clan_vm_manager.views.logs import Logs
gi.require_version("Adw", "1")
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
log = logging.getLogger(__name__)
ListItem = TypeVar("ListItem", bound=GObject.Object)
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
def create_boxed_list(
model: CustomStore,
render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget],
) -> Gtk.ListBox:
boxed_list = Gtk.ListBox()
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
boxed_list.add_css_class("boxed-list")
boxed_list.add_css_class("no-shadow")
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
return boxed_list
class ClanList(Gtk.Box):
"""
The ClanList
Is the composition of
the ClanListToolbar
the clanListView
# ------------------------ #
# - Tools <Start> <Stop> < Edit> #
# ------------------------ #
# - List Items
# - <...>
# ------------------------#
"""
def __init__(self, config: ClanConfig) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
app.connect("join_request", self.on_join_request)
self.log_label: Gtk.Label = Gtk.Label()
# Add join list
self.join_boxed_list = create_boxed_list(
model=JoinList.use().list_store, render_row=self.render_join_row
)
self.join_boxed_list.add_css_class("join-list")
self.append(self.join_boxed_list)
clan_store = ClanStore.use()
clan_store.connect("is_ready", self.display_splash)
self.group_list = create_boxed_list(
model=clan_store.clan_store, render_row=self.render_group_row
)
self.group_list.add_css_class("group-list")
self.append(self.group_list)
self.splash = EmptySplash(on_join=lambda x: self.on_join_request(x, x))
def display_splash(self, source: GKVStore) -> None:
print("Displaying splash")
if (
ClanStore.use().clan_store.get_n_items() == 0
and JoinList.use().list_store.get_n_items() == 0
):
self.append(self.splash)
def render_group_row(
self, boxed_list: Gtk.ListBox, vm_store: VMStore
) -> Gtk.Widget:
self.remove(self.splash)
vm = vm_store.first()
log.debug("Rendering group row for %s", vm.data.flake.flake_url)
grp = Adw.PreferencesGroup()
grp.set_title(vm.data.flake.clan_name)
grp.set_description(vm.data.flake.flake_url)
add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s"))
add_action.connect("activate", self.on_add)
app = Gio.Application.get_default()
assert app is not None
app.add_action(add_action)
# menu_model = Gio.Menu()
# TODO: Make this lazy, blocks UI startup for too long
# for vm in machines.list.list_machines(flake_url=vm.data.flake.flake_url):
# if vm not in vm_store:
# menu_model.append(vm, f"app.add::{vm}")
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
add_button = Gtk.Button()
add_button_content = Adw.ButtonContent.new()
add_button_content.set_label("Add machine")
add_button_content.set_icon_name("list-add-symbolic")
add_button.add_css_class("flat")
add_button.set_child(add_button_content)
# add_button.set_has_frame(False)
# add_button.set_menu_model(menu_model)
# add_button.set_label("Add machine")
box.append(add_button)
grp.set_header_suffix(box)
vm_list = create_boxed_list(model=vm_store, render_row=self.render_vm_row)
grp.add(vm_list)
return grp
def on_add(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Adding new machine", target)
def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VMObject) -> Gtk.Widget:
# Remove no-shadow class if attached
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
flake = vm.data.flake
row = Adw.ActionRow()
# ====== Display Avatar ======
avatar = Adw.Avatar()
machine_icon = flake.vm.machine_icon
# If there is a machine icon, display it else
# display the clan icon
if machine_icon:
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(machine_icon)))
elif flake.icon:
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(flake.icon)))
else:
avatar.set_text(flake.clan_name + " " + flake.flake_attr)
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
# ====== Display Name And Url =====
row.set_title(flake.flake_attr)
row.set_title_lines(1)
row.set_title_selectable(True)
# If there is a machine description, display it else
# display the clan name
if flake.vm.machine_description:
row.set_subtitle(flake.vm.machine_description)
else:
row.set_subtitle(flake.clan_name)
row.set_subtitle_lines(1)
# ==== Display build progress bar ====
build_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
build_box.set_valign(Gtk.Align.CENTER)
build_box.append(vm.progress_bar)
build_box.set_homogeneous(False)
row.add_suffix(build_box) # This allows children to have different sizes
# ==== Action buttons ====
button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
button_box.set_valign(Gtk.Align.CENTER)
## Drop down menu
open_action = Gio.SimpleAction.new("edit", GLib.VariantType.new("s"))
open_action.connect("activate", self.on_edit)
action_id = base64.b64encode(vm.get_id().encode("utf-8")).decode("utf-8")
build_logs_action = Gio.SimpleAction.new(
f"logs.{action_id}", GLib.VariantType.new("s")
)
build_logs_action.connect("activate", self.on_show_build_logs)
build_logs_action.set_enabled(False)
app = Gio.Application.get_default()
assert app is not None
app.add_action(open_action)
app.add_action(build_logs_action)
# set a callback function for conditionally enabling the build_logs action
def on_vm_build_notify(
vm: VMObject, is_building: bool, is_running: bool
) -> None:
build_logs_action.set_enabled(is_building or is_running)
app.add_action(build_logs_action)
if is_building:
ToastOverlay.use().add_toast_unique(
LogToast(
"""Build process running ...""",
on_button_click=lambda: self.show_vm_build_logs(vm.get_id()),
).toast,
f"info.build.running.{vm}",
)
vm.connect("vm_build_notify", on_vm_build_notify)
menu_model = Gio.Menu()
menu_model.append("Edit", f"app.edit::{vm.get_id()}")
menu_model.append("Show Logs", f"app.logs.{action_id}::{vm.get_id()}")
pref_button = Gtk.MenuButton()
pref_button.set_icon_name("open-menu-symbolic")
pref_button.set_menu_model(menu_model)
button_box.append(pref_button)
## VM switch button
switch_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
switch_box.set_valign(Gtk.Align.CENTER)
switch_box.append(vm.switch)
button_box.append(switch_box)
row.add_suffix(button_box)
return row
def on_edit(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Editing settings for machine", target)
def on_show_build_logs(self, _: Any, parameter: Any) -> None:
target = parameter.get_string()
self.show_vm_build_logs(target)
def show_vm_build_logs(self, target: str) -> None:
vm = ClanStore.use().set_logging_vm(target)
if vm is None:
raise ValueError(f"VM {target} not found")
views = ViewStack.use().view
# Reset the logs view
logs: Logs = views.get_child_by_name("logs") # type: ignore
if logs is None:
raise ValueError("Logs view not found")
name = vm.machine.name if vm.machine else "Unknown"
logs.set_title(f"""📄<span weight="normal"> {name}</span>""")
# initial message. Streaming happens automatically when the file is changed by the build process
with open(vm.build_process.out_file) as f:
logs.set_message(f.read())
views.set_visible_child_name("logs")
def render_join_row(
self, boxed_list: Gtk.ListBox, join_val: JoinValue
) -> Gtk.Widget:
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
log.debug("Rendering join row for %s", join_val.url)
row = Adw.ActionRow()
row.set_title(join_val.url.machine_name)
row.set_subtitle(str(join_val.url))
row.add_css_class("trust")
vm = ClanStore.use().get_vm(join_val.url)
# Can't do this here because clan store is empty at this point
if vm is not None:
sub = row.get_subtitle()
assert sub is not None
ToastOverlay.use().add_toast_unique(
WarningToast(
f"""<span weight="regular">{join_val.url.machine_name!s}</span> Already exists. Joining again will update it"""
).toast,
"warning.duplicate.join",
)
row.set_subtitle(
sub + "\nClan already exists. Joining again will update it"
)
avatar = Adw.Avatar()
avatar.set_text(str(join_val.url.machine_name))
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
cancel_button = Gtk.Button(label="Cancel")
cancel_button.add_css_class("error")
cancel_button.connect("clicked", partial(self.on_discard_clicked, join_val))
self.cancel_button = cancel_button
trust_button = Gtk.Button(label="Join")
trust_button.add_css_class("success")
trust_button.connect("clicked", partial(self.on_trust_clicked, join_val))
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
box.append(cancel_button)
box.append(trust_button)
row.add_suffix(box)
return row
def on_join_request(self, source: Any, url: str) -> None:
log.debug("Join request: %s", url)
clan_uri = ClanURI(url)
JoinList.use().push(clan_uri, self.on_after_join)
def on_after_join(self, source: JoinValue) -> None:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"Updated {source.url.machine_name}").toast,
"success.join",
)
# If the join request list is empty disable the shadow artefact
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")
def on_trust_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
source.set_sensitive(False)
self.cancel_button.set_sensitive(False)
value.join()
def on_discard_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
JoinList.use().discard(value)
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")

View File

@ -0,0 +1,65 @@
import logging
import gi
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, Gtk
from clan_vm_manager.singletons.use_views import ViewStack
log = logging.getLogger(__name__)
class Logs(Gtk.Box):
"""
Simple log view
This includes a banner and a text view and a button to close the log and navigate back to the overview
"""
def __init__(self) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
self.banner = Adw.Banner.new("")
self.banner.set_use_markup(True)
self.banner.set_revealed(True)
self.banner.set_button_label("Close")
self.banner.connect(
"button-clicked",
lambda _: ViewStack.use().view.set_visible_child_name("list"),
)
self.text_view = Gtk.TextView()
self.text_view.set_editable(False)
self.text_view.set_wrap_mode(Gtk.WrapMode.WORD)
self.text_view.add_css_class("log-view")
self.append(self.banner)
self.append(self.text_view)
def set_title(self, title: str) -> None:
self.banner.set_title(title)
def set_message(self, message: str) -> None:
"""
Set the log message. This will delete any previous message
"""
buffer = self.text_view.get_buffer()
buffer.set_text(message)
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)
def append_message(self, message: str) -> None:
"""
Append to the end of a potentially existent log message
"""
buffer = self.text_view.get_buffer()
end_iter = buffer.get_end_iter()
buffer.insert(end_iter, message) # type: ignore
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)

View File

@ -0,0 +1,156 @@
import dataclasses
import json
import logging
import sys
import threading
from collections.abc import Callable
from pathlib import Path
from threading import Lock
from typing import Any
import gi
from clan_cli.api import API
gi.require_version("WebKit", "6.0")
from gi.repository import GLib, WebKit
site_index: Path = (
Path(sys.argv[0]).absolute()
/ Path("../..")
/ Path("clan_vm_manager/.webui/index.html")
).resolve()
log = logging.getLogger(__name__)
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if dataclasses.is_dataclass(obj):
return {k: dataclass_to_dict(v) for k, v in dataclasses.asdict(obj).items()}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {k: dataclass_to_dict(v) for k, v in obj.items()}
else:
return obj
class WebView:
def __init__(self, methods: dict[str, Callable]) -> None:
self.method_registry: dict[str, Callable] = methods
self.webview = WebKit.WebView()
settings = self.webview.get_settings()
# settings.
settings.set_property("enable-developer-extras", True)
self.webview.set_settings(settings)
self.manager = self.webview.get_user_content_manager()
# Can be called with: window.webkit.messageHandlers.gtk.postMessage("...")
# Important: it seems postMessage must be given some payload, otherwise it won't trigger the event
self.manager.register_script_message_handler("gtk")
self.manager.connect("script-message-received", self.on_message_received)
self.webview.load_uri(f"file://{site_index}")
# global mutex lock to ensure functions run sequentially
self.mutex_lock = Lock()
self.queue_size = 0
def on_message_received(
self, user_content_manager: WebKit.UserContentManager, message: Any
) -> None:
payload = json.loads(message.to_json(0))
method_name = payload["method"]
handler_fn = self.method_registry[method_name]
log.debug(f"Received message: {payload}")
log.debug(f"Queue size: {self.queue_size} (Wait)")
def threaded_wrapper() -> bool:
"""
Ensures only one function is executed at a time
Wait until there is no other function acquiring the global lock.
Starts a thread with the potentially long running API function within.
"""
if not self.mutex_lock.locked():
thread = threading.Thread(
target=self.threaded_handler,
args=(
handler_fn,
payload.get("data"),
method_name,
),
)
thread.start()
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
GLib.idle_add(
threaded_wrapper,
)
self.queue_size += 1
def threaded_handler(
self,
handler_fn: Callable[
...,
Any,
],
data: dict[str, Any] | None,
method_name: str,
) -> None:
with self.mutex_lock:
log.debug("Executing... ", method_name)
log.debug(f"{data}")
if data is None:
result = handler_fn()
else:
reconciled_arguments = {}
for k, v in data.items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_type = API.get_method_argtype(method_name, k)
if dataclasses.is_dataclass(arg_type):
reconciled_arguments[k] = arg_type(**v)
else:
reconciled_arguments[k] = v
result = handler_fn(**reconciled_arguments)
serialized = json.dumps(dataclass_to_dict(result))
# Use idle_add to queue the response call to js on the main GTK thread
GLib.idle_add(self.return_data_to_js, method_name, serialized)
self.queue_size -= 1
log.debug(f"Done: Remaining queue size: {self.queue_size}")
def return_data_to_js(self, method_name: str, serialized: str) -> bool:
# This function must be run on the main GTK thread to interact with the webview
# result = method_fn(data) # takes very long
# serialized = result
self.webview.evaluate_javascript(
f"""
window.clan.{method_name}(`{serialized}`);
""",
-1,
None,
None,
None,
)
return GLib.SOURCE_REMOVE
def get_webview(self) -> WebKit.WebView:
return self.webview

View File

@ -0,0 +1,92 @@
import logging
import threading
import gi
from clan_cli.api import API
from clan_cli.history.list import list_history
from clan_vm_manager.components.interfaces import ClanConfig
from clan_vm_manager.singletons.toast import ToastOverlay
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.singletons.use_vms import ClanStore
from clan_vm_manager.views.details import Details
from clan_vm_manager.views.list import ClanList
from clan_vm_manager.views.logs import Logs
from clan_vm_manager.views.webview import WebView
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GLib, Gtk
from clan_vm_manager.components.trayicon import TrayIcon
log = logging.getLogger(__name__)
class MainWindow(Adw.ApplicationWindow):
def __init__(self, config: ClanConfig) -> None:
super().__init__()
self.set_title("Clan Manager")
self.set_default_size(980, 850)
overlay = ToastOverlay.use().overlay
view = Adw.ToolbarView()
overlay.set_child(view)
self.set_content(overlay)
header = Adw.HeaderBar()
view.add_top_bar(header)
app = Gio.Application.get_default()
assert app is not None
self.tray_icon: TrayIcon = TrayIcon(app)
# Initialize all ClanStore
threading.Thread(target=self._populate_vms).start()
# Initialize all views
stack_view = ViewStack.use().view
clamp = Adw.Clamp()
clamp.set_child(stack_view)
clamp.set_maximum_size(1000)
scroll = Gtk.ScrolledWindow()
scroll.set_propagate_natural_height(True)
scroll.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
scroll.set_child(clamp)
stack_view.add_named(ClanList(config), "list")
stack_view.add_named(Details(), "details")
stack_view.add_named(Logs(), "logs")
webview = WebView(methods=API._registry)
stack_view.add_named(webview.get_webview(), "webview")
stack_view.set_visible_child_name(config.initial_view)
view.set_content(scroll)
self.connect("destroy", self.on_destroy)
def _set_clan_store_ready(self) -> bool:
ClanStore.use().emit("is_ready")
return GLib.SOURCE_REMOVE
def _populate_vms(self) -> None:
# Execute `clan flakes add <path>` to democlan for this to work
# TODO: Make list_history a generator function
for entry in list_history():
GLib.idle_add(ClanStore.use().create_vm_task, entry)
GLib.idle_add(self._set_clan_store_ready)
def kill_vms(self) -> None:
log.debug("Killing all VMs")
ClanStore.use().kill_all()
def on_destroy(self, source: "Adw.ApplicationWindow") -> None:
log.info("====Destroying Adw.ApplicationWindow===")
ClanStore.use().kill_all()
self.tray_icon.destroy()

View File

@ -0,0 +1,172 @@
{
adwaita-icon-theme,
clan-cli,
copyDesktopItems,
fontconfig,
gobject-introspection,
gtk4,
libadwaita,
makeDesktopItem,
pygobject-stubs,
pygobject3,
pytest, # Testing framework
pytest-cov, # Generate coverage reports
pytest-subprocess, # fake the real subprocess behavior to make your tests more independent.
pytest-timeout, # Add timeouts to your tests
pytest-xdist, # Run tests in parallel on multiple cores
python3,
runCommand,
setuptools,
webkitgtk_6_0,
webview-ui,
wrapGAppsHook,
}:
let
source = ./.;
desktop-file = makeDesktopItem {
name = "org.clan.vm-manager";
exec = "clan-vm-manager %u";
icon = ./clan_vm_manager/assets/clan_white.png;
desktopName = "Clan Manager";
startupWMClass = "clan";
mimeTypes = [ "x-scheme-handler/clan" ];
};
# Dependencies that are directly used in the project but nor from internal python packages
externalPythonDeps = [
pygobject3
pygobject-stubs
gtk4
libadwaita
webkitgtk_6_0
adwaita-icon-theme
];
# Deps including python packages from the local project
allPythonDeps = [ (python3.pkgs.toPythonModule clan-cli) ] ++ externalPythonDeps;
# Runtime binary dependencies required by the application
runtimeDependencies = [
];
# Dependencies required for running tests
externalTestDeps =
externalPythonDeps
++ runtimeDependencies
++ [
pytest # Testing framework
pytest-cov # Generate coverage reports
pytest-subprocess # fake the real subprocess behavior to make your tests more independent.
pytest-xdist # Run tests in parallel on multiple cores
pytest-timeout # Add timeouts to your tests
];
# Dependencies required for running tests
testDependencies = runtimeDependencies ++ allPythonDeps ++ externalTestDeps;
# Setup Python environment with all dependencies for running tests
pythonWithTestDeps = python3.withPackages (_ps: testDependencies);
in
python3.pkgs.buildPythonApplication rec {
name = "clan-vm-manager";
src = source;
format = "pyproject";
makeWrapperArgs = [
"--set FONTCONFIG_FILE ${fontconfig.out}/etc/fonts/fonts.conf"
# This prevents problems with mixed glibc versions that might occur when the
# cli is called through a browser built against another glibc
"--unset LD_LIBRARY_PATH"
];
# Deps needed only at build time
nativeBuildInputs = [
setuptools
copyDesktopItems
wrapGAppsHook
gobject-introspection
];
# The necessity of setting buildInputs and propagatedBuildInputs to the
# same values for your Python package within Nix largely stems from ensuring
# that all necessary dependencies are consistently available both
# at build time and runtime,
buildInputs = allPythonDeps ++ runtimeDependencies;
propagatedBuildInputs = allPythonDeps ++ runtimeDependencies;
# also re-expose dependencies so we test them in CI
passthru = {
tests = {
clan-vm-manager-pytest =
runCommand "clan-vm-manager-pytest" { inherit buildInputs propagatedBuildInputs nativeBuildInputs; }
''
cp -r ${source} ./src
chmod +w -R ./src
cd ./src
export FONTCONFIG_FILE=${fontconfig.out}/etc/fonts/fonts.conf
export FONTCONFIG_PATH=${fontconfig.out}/etc/fonts
mkdir -p .home/.local/share/fonts
export HOME=.home
fc-cache --verbose
# > fc-cache succeded
echo "Loaded the following fonts ..."
fc-list
echo "STARTING ..."
export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1
${pythonWithTestDeps}/bin/python -m pytest -s -m "not impure" ./tests
touch $out
'';
clan-vm-manager-no-breakpoints = runCommand "clan-vm-manager-no-breakpoints" { } ''
if grep --include \*.py -Rq "breakpoint()" ${source}; then
echo "breakpoint() found in ${source}:"
grep --include \*.py -Rn "breakpoint()" ${source}
exit 1
fi
touch $out
'';
};
};
# Additional pass-through attributes
passthru.desktop-file = desktop-file;
passthru.externalPythonDeps = externalPythonDeps;
passthru.externalTestDeps = externalTestDeps;
passthru.runtimeDependencies = runtimeDependencies;
passthru.testDependencies = testDependencies;
# TODO: place webui in lib/python3.11/site-packages/clan_vm_manager
postInstall = ''
mkdir -p $out/clan_vm_manager/.webui
cp -r ${webview-ui}/lib/node_modules/@clan/webview-ui/dist/* $out/clan_vm_manager/.webui
'';
# Don't leak python packages into a devshell.
# It can be very confusing if you `nix run` than load the cli from the devshell instead.
postFixup = ''
rm $out/nix-support/propagated-build-inputs
'';
checkPhase = ''
export FONTCONFIG_FILE=${fontconfig.out}/etc/fonts/fonts.conf
export FONTCONFIG_PATH=${fontconfig.out}/etc/fonts
mkdir -p .home/.local/share/fonts
export HOME=.home
fc-cache --verbose
# > fc-cache succeded
echo "Loaded the following fonts ..."
fc-list
PYTHONPATH= $out/bin/clan-vm-manager --help
'';
desktopItems = [ desktop-file ];
}

54
pkgs/clan-vm-manager/demo.sh Executable file
View File

@ -0,0 +1,54 @@
#!/usr/bin/env bash
set -e -o pipefail
check_git_tag() {
local repo_path="$1"
local target_tag="$2"
# Change directory to the specified Git repository
pushd "$repo_path" > /dev/null 2>&1
# shellcheck disable=SC2181
if [ $? -ne 0 ]; then
echo "Error: Failed to change directory to $repo_path"
return 1
fi
# Get the current Git tag
local current_tag
current_tag=$(git describe --tags --exact-match 2>/dev/null)
# Restore the original directory
popd > /dev/null 2>&1
# Check if the current tag is 2.0
if [ "$current_tag" = "$target_tag" ]; then
echo "Current Git tag in $repo_path is $target_tag"
else
echo "Error: Current Git tag in $repo_path is not $target_tag"
exit 1
fi
}
if [ -z "$1" ]; then
echo "Usage: $0 <democlan>"
exit 1
fi
democlan="$1"
check_git_tag "$democlan" "v2.2"
check_git_tag "." "demo-v2.3"
rm -rf ~/.config/clan
clan history add "clan://$democlan#localsend-wayland1"
clear
cat << EOF
Open up this link in a browser:
"clan://$democlan#localsend-wayland2"
EOF

View File

@ -0,0 +1,24 @@
{ ... }:
{
perSystem =
{
config,
pkgs,
lib,
system,
...
}:
if lib.elem system lib.platforms.darwin then
{ }
else
{
devShells.clan-vm-manager = pkgs.callPackage ./shell.nix {
inherit (config.packages) clan-vm-manager webview-ui;
};
packages.clan-vm-manager = pkgs.python3.pkgs.callPackage ./default.nix {
inherit (config.packages) clan-cli webview-ui;
};
checks = config.packages.clan-vm-manager.tests;
};
}

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CLAN=$(nix build .#clan-vm-manager --print-out-paths)
if ! command -v xdg-mime &> /dev/null; then
echo "Warning: 'xdg-mime' is not available. The desktop file cannot be installed."
fi
# install desktop file
set -eou pipefail
DESKTOP_FILE_NAME=org.clan.vm-manager.desktop
DESKTOP_DST=~/.local/share/applications/"$DESKTOP_FILE_NAME"
DESKTOP_SRC="$CLAN/share/applications/$DESKTOP_FILE_NAME"
UI_BIN="$CLAN/bin/clan-vm-manager"
cp -f "$DESKTOP_SRC" "$DESKTOP_DST"
sleep 2
sed -i "s|Exec=.*clan-vm-manager|Exec=$UI_BIN|" "$DESKTOP_DST"
xdg-mime default "$DESKTOP_FILE_NAME" x-scheme-handler/clan
echo "==== Validating desktop file installation ===="
set -x
desktop-file-validate "$DESKTOP_DST"
set +xeou pipefail

View File

@ -0,0 +1,7 @@
# Webkit GTK doesn't interop flawless with Solid.js build result
1. Webkit expects script tag to be in `body` only solid.js puts the in the head.
2. script and css files are loaded with type="module" and crossorigin tags beeing set. WebKit silently fails to load then.
3. Paths to resiources are not allowed to start with "/" because webkit interprets them relative to the system and not the base url.
4. webkit doesn't support native features such as directly handling external urls (i.e opening them in the default browser)
6. Other problems to be found?

View File

@ -0,0 +1,48 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "clan-vm-manager"
description = "clan vm manager"
dynamic = ["version"]
scripts = { clan-vm-manager = "clan_vm_manager:main" }
[project.urls]
Homepage = "https://clan.lol/"
Documentation = "https://docs.clan.lol/"
Repository = "https://git.clan.lol/clan/clan-core"
[tool.setuptools.packages.find]
exclude = ["result"]
[tool.setuptools.package-data]
clan_vm_manager = ["**/assets/*"]
[tool.pytest.ini_options]
testpaths = "tests"
faulthandler_timeout = 60
log_level = "DEBUG"
log_format = "%(levelname)s: %(message)s\n %(pathname)s:%(lineno)d::%(funcName)s"
addopts = "--cov . --cov-report term --cov-report html:.reports/html --no-cov-on-fail --durations 5 --color=yes --new-first" # Add --pdb for debugging
norecursedirs = "tests/helpers"
markers = ["impure"]
[tool.mypy]
python_version = "3.11"
warn_redundant_casts = true
disallow_untyped_calls = true
disallow_untyped_defs = true
no_implicit_optional = true
[[tool.mypy.overrides]]
module = "argcomplete.*"
ignore_missing_imports = true
[tool.ruff]
target-version = "py311"
line-length = 88
lint.select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
lint.ignore = ["E501", "E402", "N802", "ANN101", "ANN401", "A003"]

Some files were not shown because too many files have changed in this diff Show More