clan-core/pkgs/clan-cli/clan_cli/flash.py
2024-05-16 15:08:24 +02:00

297 lines
8.4 KiB
Python

import argparse
import importlib
import json
import logging
import os
import re
import shutil
import textwrap
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
from .cmd import Log, run
from .errors import ClanError
from .facts.secret_modules import SecretStoreBase
from .machines.machines import Machine
from .nix import nix_shell
log = logging.getLogger(__name__)
def list_available_ssh_keys(ssh_dir: Path = Path("~/.ssh").expanduser()) -> list[Path]:
"""
Function to list all available SSH public keys in the default .ssh directory.
Returns a list of paths to available public key files.
"""
public_key_patterns = ["*.pub"]
available_keys: list[Path] = []
# Check for public key files
for pattern in public_key_patterns:
for key_path in ssh_dir.glob(pattern):
if key_path.is_file():
available_keys.append(key_path)
return available_keys
def read_public_key_contents(public_keys: list[Path]) -> list[str]:
"""
Function to read and return the contents of available SSH public keys.
Returns a list containing the contents of each public key.
"""
public_key_contents = []
for key_path in public_keys:
try:
with open(key_path.expanduser()) as key_file:
public_key_contents.append(key_file.read().strip())
except FileNotFoundError:
log.error(f"Public key file not found: {key_path}")
return public_key_contents
def get_keymap_and_locale() -> dict[str, str]:
locale = "en_US.UTF-8"
keymap = "en"
# Execute the `localectl status` command
result = run(["localectl", "status"])
if result.returncode == 0:
output = result.stdout
# Extract the Keymap (X11 Layout)
keymap_match = re.search(r"X11 Layout:\s+(.*)", output)
if keymap_match:
keymap = keymap_match.group(1)
# Extract the System Locale (LANG only)
locale_match = re.search(r"System Locale:\s+LANG=(.*)", output)
if locale_match:
locale = locale_match.group(1)
return {"keymap": keymap, "locale": locale}
def flash_machine(
machine: Machine,
*,
mode: str,
disks: dict[str, str],
system_config: dict[str, Any],
dry_run: bool,
debug: bool,
) -> None:
secret_facts_module = importlib.import_module(machine.secret_facts_module)
secret_facts_store: SecretStoreBase = secret_facts_module.SecretStore(
machine=machine
)
with TemporaryDirectory() as tmpdir_:
tmpdir = Path(tmpdir_)
upload_dir = machine.secrets_upload_directory
if upload_dir.startswith("/"):
local_dir = tmpdir / upload_dir[1:]
else:
local_dir = tmpdir / upload_dir
local_dir.mkdir(parents=True)
secret_facts_store.upload(local_dir)
disko_install = []
if os.geteuid() != 0:
if shutil.which("sudo") is None:
raise ClanError(
"sudo is required to run disko-install as a non-root user"
)
disko_install.append("sudo")
disko_install.append("disko-install")
if dry_run:
disko_install.append("--dry-run")
if debug:
disko_install.append("--debug")
for name, device in disks.items():
disko_install.extend(["--disk", name, device])
disko_install.extend(["--extra-files", str(local_dir), upload_dir])
disko_install.extend(["--flake", str(machine.flake) + "#" + machine.name])
disko_install.extend(["--mode", str(mode)])
disko_install.extend(
[
"--system-config",
json.dumps(system_config),
]
)
cmd = nix_shell(
["nixpkgs#disko"],
disko_install,
)
run(cmd, log=Log.BOTH, error_msg=f"Failed to flash {machine}")
@dataclass
class FlashOptions:
flake: Path
machine: str
disks: dict[str, str]
ssh_keys_path: list[Path]
dry_run: bool
confirm: bool
debug: bool
mode: str
language: str
keymap: str
class AppendDiskAction(argparse.Action):
def __init__(self, option_strings: str, dest: str, **kwargs: Any) -> None:
super().__init__(option_strings, dest, **kwargs)
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[str] | None,
option_string: str | None = None,
) -> None:
disks = getattr(namespace, self.dest)
assert isinstance(values, list), "values must be a list"
disks[values[0]] = values[1]
def flash_command(args: argparse.Namespace) -> None:
opts = FlashOptions(
flake=args.flake,
machine=args.machine,
disks=args.disk,
ssh_keys_path=args.ssh_pubkey,
dry_run=args.dry_run,
confirm=not args.yes,
debug=args.debug,
mode=args.mode,
language=args.lang,
keymap=args.keymap,
)
machine = Machine(opts.machine, flake=opts.flake)
if opts.confirm and not opts.dry_run:
disk_str = ", ".join(f"{name}={device}" for name, device in opts.disks.items())
msg = f"Install {machine.name}"
if disk_str != "":
msg += f" to {disk_str}"
msg += "? [y/N] "
ask = input(msg)
if ask != "y":
return
root_keys = read_public_key_contents(opts.ssh_keys_path)
if opts.confirm and not root_keys:
msg = "Should we add your SSH public keys to the root user? [y/N] "
ask = input(msg)
if ask == "y":
pubkeys = list_available_ssh_keys()
root_keys.extend(read_public_key_contents(pubkeys))
else:
raise ClanError(
"No SSH public keys provided. Use --ssh-pubkey to add keys."
)
elif not opts.confirm and not root_keys:
pubkeys = list_available_ssh_keys()
root_keys.extend(read_public_key_contents(pubkeys))
# If ssh-pubkeys set, we don't need to ask for confirmation
elif opts.confirm and root_keys:
pass
elif not opts.confirm and root_keys:
pass
else:
raise ClanError("Invalid state")
localectl = get_keymap_and_locale()
extra_config = {
"users": {
"users": {"root": {"openssh": {"authorizedKeys": {"keys": root_keys}}}}
},
"console": {
"keyMap": opts.keymap if opts.keymap else localectl["keymap"],
},
"i18n": {
"defaultLocale": opts.language if opts.language else localectl["locale"],
},
}
flash_machine(
machine,
mode=opts.mode,
disks=opts.disks,
system_config=extra_config,
dry_run=opts.dry_run,
debug=opts.debug,
)
def register_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"machine",
type=str,
help="machine to install",
)
parser.add_argument(
"--disk",
type=str,
nargs=2,
metavar=("name", "value"),
action=AppendDiskAction,
help="device to flash to",
default={},
)
mode_help = textwrap.dedent("""\
Specify the mode of operation. Valid modes are: format, mount."
Format will format the disk before installing.
Mount will mount the disk before installing.
Mount is useful for updating an existing system without losing data.
""")
parser.add_argument(
"--mode",
type=str,
help=mode_help,
choices=["format", "mount"],
default="format",
)
parser.add_argument(
"--ssh-pubkey",
type=Path,
action="append",
default=[],
help="ssh pubkey file to add to the root user. Can be used multiple times",
)
parser.add_argument(
"--lang",
type=str,
help="system language",
)
parser.add_argument(
"--keymap",
type=str,
help="system keymap",
)
parser.add_argument(
"--yes",
action="store_true",
help="do not ask for confirmation",
default=False,
)
parser.add_argument(
"--dry-run",
help="Only build the system, don't flash it",
default=False,
action="store_true",
)
parser.set_defaults(func=flash_command)