add import-sops command to secrets
All checks were successful
build / test (pull_request) Successful in 21s
All checks were successful
build / test (pull_request) Successful in 21s
This commit is contained in:
parent
4cf82f3596
commit
1d1452ddd5
@ -2,6 +2,7 @@
|
||||
import argparse
|
||||
|
||||
from .groups import register_groups_parser
|
||||
from .import_sops import register_import_sops_parser
|
||||
from .machines import register_machines_parser
|
||||
from .secrets import register_secrets_parser
|
||||
from .users import register_users_parser
|
||||
@ -25,4 +26,7 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
machines_parser = subparser.add_parser("machines", help="manage machines")
|
||||
register_machines_parser(machines_parser)
|
||||
|
||||
import_sops_parser = subparser.add_parser("import-sops", help="import a sops file")
|
||||
register_import_sops_parser(import_sops_parser)
|
||||
|
||||
register_secrets_parser(subparser)
|
||||
|
51
pkgs/clan-cli/clan_cli/secrets/import_sops.py
Normal file
51
pkgs/clan-cli/clan_cli/secrets/import_sops.py
Normal file
@ -0,0 +1,51 @@
|
||||
import argparse
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from ..errors import ClanError
|
||||
from ..nix import nix_shell
|
||||
from .secrets import encrypt_secret
|
||||
|
||||
|
||||
def import_sops(args: argparse.Namespace) -> None:
|
||||
file = Path(args.sops_file)
|
||||
file_type = file.suffix
|
||||
|
||||
try:
|
||||
file.read_text()
|
||||
except OSError as e:
|
||||
raise ClanError(f"Could not read file {file}: {e}") from e
|
||||
if file_type == ".yaml":
|
||||
cmd = ["sops"]
|
||||
if args.input_type:
|
||||
cmd += ["--input-type", args.input_type]
|
||||
cmd += ["--output-type", "json", "--decrypt", args.sops_file]
|
||||
cmd = nix_shell(["sops"], cmd)
|
||||
try:
|
||||
res = subprocess.run(cmd, check=True, text=True, stdout=subprocess.PIPE)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise ClanError(f"Could not import sops file {file}: {e}") from e
|
||||
secrets = json.loads(res.stdout)
|
||||
for k, v in secrets.items():
|
||||
if not isinstance(v, str):
|
||||
print(
|
||||
f"WARNING: {k} is not a string but {type(v)}, skipping",
|
||||
file=sys.stderr,
|
||||
)
|
||||
encrypt_secret(k, v)
|
||||
|
||||
|
||||
def register_import_sops_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument(
|
||||
"sops_file",
|
||||
type=str,
|
||||
help="the sops file to import (- for stdin)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"input_type",
|
||||
type=str,
|
||||
help="the input type of the sops file (yaml, json, ...)",
|
||||
)
|
||||
parser.set_defaults(func=import_sops)
|
@ -2,17 +2,15 @@ import argparse
|
||||
import getpass
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
from typing import IO
|
||||
from typing import IO, Union
|
||||
|
||||
from .. import tty
|
||||
from ..errors import ClanError
|
||||
from ..nix import nix_shell
|
||||
from .folders import list_objects, sops_secrets_folder, sops_users_folder
|
||||
from .sops import SopsKey, encrypt_file, ensure_sops_key, read_key, update_keys
|
||||
from .sops import decrypt_file, encrypt_file, ensure_sops_key, read_key, update_keys
|
||||
from .types import VALID_SECRET_NAME, secret_name_type
|
||||
|
||||
|
||||
@ -28,12 +26,11 @@ def get_command(args: argparse.Namespace) -> None:
|
||||
secret_path = sops_secrets_folder() / secret / "secret"
|
||||
if not secret_path.exists():
|
||||
raise ClanError(f"Secret '{secret}' does not exist")
|
||||
cmd = nix_shell(["sops"], ["sops", "--decrypt", str(secret_path)])
|
||||
res = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, text=True)
|
||||
print(res.stdout, end="")
|
||||
print(decrypt_file(secret_path), end="")
|
||||
|
||||
|
||||
def encrypt_secret(key: SopsKey, secret: Path, value: IO[str]) -> None:
|
||||
def encrypt_secret(secret: Path, value: Union[IO[str], str]) -> None:
|
||||
key = ensure_sops_key()
|
||||
keys = set([key.pubkey])
|
||||
for kind in ["users", "machines", "groups"]:
|
||||
if not (sops_secrets_folder() / kind).is_dir():
|
||||
@ -42,20 +39,19 @@ def encrypt_secret(key: SopsKey, secret: Path, value: IO[str]) -> None:
|
||||
keys.add(k)
|
||||
encrypt_file(secret / "secret", value, list(sorted(keys)))
|
||||
|
||||
# make sure we add ourselves to the key
|
||||
allow_member(users_folder(secret.name), sops_users_folder(), key.username)
|
||||
|
||||
|
||||
def set_command(args: argparse.Namespace) -> None:
|
||||
key = ensure_sops_key()
|
||||
secret_value = os.environ.get("SOPS_NIX_SECRET")
|
||||
if secret_value:
|
||||
encrypt_secret(key, sops_secrets_folder() / args.secret, StringIO(secret_value))
|
||||
encrypt_secret(sops_secrets_folder() / args.secret, StringIO(secret_value))
|
||||
elif tty.is_interactive():
|
||||
secret = getpass.getpass(prompt="Paste your secret: ")
|
||||
encrypt_secret(key, sops_secrets_folder() / args.secret, StringIO(secret))
|
||||
encrypt_secret(sops_secrets_folder() / args.secret, StringIO(secret))
|
||||
else:
|
||||
encrypt_secret(key, sops_secrets_folder() / args.secret, sys.stdin)
|
||||
|
||||
# make sure we add ourselves to the key
|
||||
allow_member(users_folder(args.secret), sops_users_folder(), key.username)
|
||||
encrypt_secret(sops_secrets_folder() / args.secret, sys.stdin)
|
||||
|
||||
|
||||
def remove_command(args: argparse.Namespace) -> None:
|
||||
|
@ -5,7 +5,7 @@ import subprocess
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import IO, Iterator
|
||||
from typing import IO, Iterator, Union
|
||||
|
||||
from .. import tty
|
||||
from ..dirs import user_config_dir
|
||||
@ -131,7 +131,9 @@ def update_keys(secret_path: Path, keys: list[str]) -> None:
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
|
||||
def encrypt_file(secret_path: Path, content: IO[str], keys: list[str]) -> None:
|
||||
def encrypt_file(
|
||||
secret_path: Path, content: Union[IO[str], str], keys: list[str]
|
||||
) -> None:
|
||||
folder = secret_path.parent
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@ -139,7 +141,10 @@ def encrypt_file(secret_path: Path, content: IO[str], keys: list[str]) -> None:
|
||||
with NamedTemporaryFile(delete=False) as f:
|
||||
try:
|
||||
with open(f.name, "w") as fd:
|
||||
shutil.copyfileobj(content, fd)
|
||||
if isinstance(content, str):
|
||||
fd.write(content)
|
||||
else:
|
||||
shutil.copyfileobj(content, fd)
|
||||
args = ["sops"]
|
||||
for key in keys:
|
||||
args.extend(["--age", key])
|
||||
@ -157,6 +162,12 @@ def encrypt_file(secret_path: Path, content: IO[str], keys: list[str]) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def decrypt_file(secret_path: Path) -> str:
|
||||
cmd = nix_shell(["sops"], ["sops", "--decrypt", str(secret_path)])
|
||||
res = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, text=True)
|
||||
return res.stdout
|
||||
|
||||
|
||||
def write_key(path: Path, publickey: str, overwrite: bool) -> None:
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
|
Loading…
Reference in New Issue
Block a user