Merge pull request 'don't add user to a secret if they already can access the secret' (#107) from Mic92-mic92 into main
Some checks failed
build / test (push) Failing after 16s

Reviewed-on: #107
This commit is contained in:
Mic92 2023-08-09 08:22:14 +00:00
commit 0314fbbec5
8 changed files with 172 additions and 118 deletions

View File

@ -19,6 +19,39 @@ from .sops import decrypt_file, encrypt_file, ensure_sops_key, read_key, update_
from .types import VALID_SECRET_NAME, secret_name_type
def collect_keys_for_type(folder: Path) -> set[str]:
if not folder.exists():
return set()
keys = set()
for p in folder.iterdir():
if not p.is_symlink():
continue
try:
target = p.resolve()
except FileNotFoundError:
tty.warn(f"Ignoring broken symlink {p}")
continue
kind = target.parent.name
if folder.name != kind:
tty.warn(f"Expected {p} to point to {folder} but points to {target.parent}")
continue
keys.add(read_key(target))
return keys
def collect_keys_for_path(path: Path) -> set[str]:
keys = set([])
keys.update(collect_keys_for_type(path / "machines"))
keys.update(collect_keys_for_type(path / "users"))
groups = path / "groups"
if not groups.is_dir():
return keys
for group in groups.iterdir():
keys.update(collect_keys_for_type(group / "machines"))
keys.update(collect_keys_for_type(group / "users"))
return keys
def encrypt_secret(
secret: Path,
value: Union[IO[str], str],
@ -40,11 +73,7 @@ def encrypt_secret(
for group in add_groups:
allow_member(groups_folder(secret.name), sops_groups_folder(), group, False)
for kind in ["users", "machines", "groups"]:
if not (sops_secrets_folder() / kind).is_dir():
continue
k = read_key(sops_secrets_folder() / kind)
keys.add(k)
keys = collect_keys_for_path(secret)
if key.pubkey not in keys:
keys.add(key.pubkey)
@ -79,39 +108,6 @@ def groups_folder(group: str) -> Path:
return sops_secrets_folder() / group / "groups"
def collect_keys_for_type(folder: Path) -> list[str]:
if not folder.exists():
return []
keys = []
for p in folder.iterdir():
if not p.is_symlink():
continue
try:
target = p.resolve()
except FileNotFoundError:
tty.warn(f"Ignoring broken symlink {p}")
continue
kind = target.parent.name
if folder.name != kind:
tty.warn(f"Expected {p} to point to {folder} but points to {target.parent}")
continue
keys.append(read_key(target))
return keys
def collect_keys_for_path(path: Path) -> list[str]:
keys = []
keys += collect_keys_for_type(path / "machines")
keys += collect_keys_for_type(path / "users")
groups = path / "groups"
if not groups.is_dir():
return keys
for group in groups.iterdir():
keys += collect_keys_for_type(group / "machines")
keys += collect_keys_for_type(group / "users")
return keys
def allow_member(
group_folder: Path, source_folder: Path, name: str, do_update_keys: bool = True
) -> None:
@ -129,7 +125,10 @@ def allow_member(
user_target.symlink_to(os.path.relpath(source, user_target.parent))
if do_update_keys:
update_keys(group_folder.parent, collect_keys_for_path(group_folder.parent))
update_keys(
group_folder.parent,
list(sorted(collect_keys_for_path(group_folder.parent))),
)
def disallow_member(group_folder: Path, name: str) -> None:
@ -151,7 +150,9 @@ def disallow_member(group_folder: Path, name: str) -> None:
if len(os.listdir(group_folder.parent)) == 0:
os.rmdir(group_folder.parent)
update_keys(target.parent.parent, collect_keys_for_path(group_folder.parent))
update_keys(
target.parent.parent, list(sorted(collect_keys_for_path(group_folder.parent)))
)
def list_command(args: argparse.Namespace) -> None:

View File

@ -138,9 +138,7 @@ def encrypt_file(
folder.mkdir(parents=True, exist_ok=True)
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
with NamedTemporaryFile(delete=False) as dummy_manifest_file, NamedTemporaryFile(
delete=False
) as f:
with sops_manifest(keys) as manifest, NamedTemporaryFile(delete=False) as f:
try:
with open(f.name, "w") as fd:
if isinstance(content, str):
@ -148,9 +146,7 @@ def encrypt_file(
else:
shutil.copyfileobj(content, fd)
# we pass an empty manifest to pick up existing configuration of the user
args = ["sops", "--config", dummy_manifest_file.name]
for key in keys:
args.extend(["--age", key])
args = ["sops", "--config", str(manifest)]
args.extend(["-i", "--encrypt", str(f.name)])
cmd = nix_shell(["sops"], args)
subprocess.run(cmd, check=True)

View File

@ -79,6 +79,11 @@ python3.pkgs.buildPythonPackage {
'';
checkPhase = ''
PYTHONPATH= $out/bin/clan --help
if grep --include \*.py -q "breakpoint()" $out; then
echo "breakpoint() found in $out:"
grep --include \*.py -Rn "breakpoint()" $out
exit 1
fi
'';
meta.mainProgram = "clan";
}

View File

@ -3,4 +3,4 @@ import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "helpers"))
pytest_plugins = ["temporary_dir", "clan_flake", "root"]
pytest_plugins = ["temporary_dir", "clan_flake", "root", "test_keys"]

View File

@ -0,0 +1,14 @@
import argparse
from clan_cli.secrets import register_parser
class SecretCli:
def __init__(self) -> None:
self.parser = argparse.ArgumentParser()
register_parser(self.parser)
def run(self, args: list[str]) -> argparse.Namespace:
parsed = self.parser.parse_args(args)
parsed.func(parsed)
return parsed

View File

@ -0,0 +1,46 @@
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from environment import mock_env
from secret_cli import SecretCli
if TYPE_CHECKING:
from test_keys import KeyPair
def test_import_sops(
test_root: Path,
clan_flake: Path,
capsys: pytest.CaptureFixture,
test_keys: list["KeyPair"],
) -> None:
cli = SecretCli()
with mock_env(SOPS_AGE_KEY=test_keys[1].privkey):
cli.run(["machines", "add", "machine1", test_keys[0].pubkey])
cli.run(["users", "add", "user1", test_keys[1].pubkey])
cli.run(["users", "add", "user2", test_keys[2].pubkey])
cli.run(["groups", "add-user", "group1", "user1"])
cli.run(["groups", "add-user", "group1", "user2"])
# To edit:
# SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml
cli.run(
[
"import-sops",
"--group",
"group1",
"--machine",
"machine1",
str(test_root.joinpath("data", "secrets.yaml")),
]
)
capsys.readouterr()
cli.run(["users", "list"])
users = sorted(capsys.readouterr().out.rstrip().split())
assert users == ["user1", "user2"]
capsys.readouterr()
cli.run(["get", "secret-key"])
assert capsys.readouterr().out == "secret-value"

View File

@ -0,0 +1,31 @@
import pytest
class KeyPair:
def __init__(self, pubkey: str, privkey: str) -> None:
self.pubkey = pubkey
self.privkey = privkey
KEYS = [
KeyPair(
"age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c",
"AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK",
),
KeyPair(
"age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62",
"AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ",
),
KeyPair(
"age1dhuh9xtefhgpr2sjjf7gmp9q2pr37z92rv4wsadxuqdx48989g7qj552qp",
"AGE-SECRET-KEY-169N3FT32VNYQ9WYJMLUSVTMA0TTZGVJF7YZWS8AHTWJ5RR9VGR7QCD8SKF",
),
]
@pytest.fixture
def test_keys() -> list[KeyPair]:
"""
Root directory of the tests
"""
return KEYS

View File

@ -1,45 +1,30 @@
import argparse
import os
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from environment import mock_env
from secret_cli import SecretCli
from clan_cli.errors import ClanError
from clan_cli.secrets import register_parser
class SecretCli:
def __init__(self) -> None:
self.parser = argparse.ArgumentParser()
register_parser(self.parser)
def run(self, args: list[str]) -> argparse.Namespace:
parsed = self.parser.parse_args(args)
parsed.func(parsed)
return parsed
PUBKEY = "age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c"
PRIVKEY = "AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK"
PUBKEY_2 = "age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62"
PRIVKEY_2 = "AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ"
PUBKEY_3 = "age1dhuh9xtefhgpr2sjjf7gmp9q2pr37z92rv4wsadxuqdx48989g7qj552qp"
PRIVKEY_3 = "AGE-SECRET-KEY-169N3FT32VNYQ9WYJMLUSVTMA0TTZGVJF7YZWS8AHTWJ5RR9VGR7QCD8SKF"
if TYPE_CHECKING:
from test_keys import KeyPair
def _test_identities(
what: str, clan_flake: Path, capsys: pytest.CaptureFixture
what: str,
clan_flake: Path,
capsys: pytest.CaptureFixture,
test_keys: list["KeyPair"],
) -> None:
cli = SecretCli()
sops_folder = clan_flake / "sops"
cli.run([what, "add", "foo", PUBKEY])
cli.run([what, "add", "foo", test_keys[0].pubkey])
assert (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError):
cli.run([what, "add", "foo", PUBKEY])
cli.run([what, "add", "foo", test_keys[0].pubkey])
cli.run(
[
@ -47,7 +32,7 @@ def _test_identities(
"add",
"-f",
"foo",
PRIVKEY,
test_keys[0].privkey,
]
)
capsys.readouterr() # empty the buffer
@ -68,15 +53,21 @@ def _test_identities(
assert "foo" not in out.out
def test_users(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
_test_identities("users", clan_flake, capsys)
def test_users(
clan_flake: Path, capsys: pytest.CaptureFixture, test_keys: list["KeyPair"]
) -> None:
_test_identities("users", clan_flake, capsys, test_keys)
def test_machines(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
_test_identities("machines", clan_flake, capsys)
def test_machines(
clan_flake: Path, capsys: pytest.CaptureFixture, test_keys: list["KeyPair"]
) -> None:
_test_identities("machines", clan_flake, capsys, test_keys)
def test_groups(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
def test_groups(
clan_flake: Path, capsys: pytest.CaptureFixture, test_keys: list["KeyPair"]
) -> None:
cli = SecretCli()
capsys.readouterr() # empty the buffer
cli.run(["groups", "list"])
@ -86,13 +77,13 @@ def test_groups(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
cli.run(["groups", "add-machine", "group1", "machine1"])
with pytest.raises(ClanError): # user does not exist yet
cli.run(["groups", "add-user", "groupb1", "user1"])
cli.run(["machines", "add", "machine1", PUBKEY])
cli.run(["machines", "add", "machine1", test_keys[0].pubkey])
cli.run(["groups", "add-machine", "group1", "machine1"])
# Should this fail?
cli.run(["groups", "add-machine", "group1", "machine1"])
cli.run(["users", "add", "user1", PUBKEY])
cli.run(["users", "add", "user1", test_keys[0].pubkey])
cli.run(["groups", "add-user", "group1", "user1"])
capsys.readouterr() # empty the buffer
@ -107,7 +98,9 @@ def test_groups(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
assert len(groups) == 0
def test_secrets(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
def test_secrets(
clan_flake: Path, capsys: pytest.CaptureFixture, test_keys: list["KeyPair"]
) -> None:
cli = SecretCli()
capsys.readouterr() # empty the buffer
cli.run(["list"])
@ -132,18 +125,18 @@ def test_secrets(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
cli.run(["list"])
assert capsys.readouterr().out == "key\n"
cli.run(["machines", "add", "machine1", PUBKEY])
cli.run(["machines", "add", "machine1", test_keys[0].pubkey])
cli.run(["machines", "add-secret", "machine1", "key"])
with mock_env(SOPS_AGE_KEY=PRIVKEY, SOPS_AGE_KEY_FILE=""):
with mock_env(SOPS_AGE_KEY=test_keys[1].privkey, SOPS_AGE_KEY_FILE=""):
capsys.readouterr()
cli.run(["get", "key"])
assert capsys.readouterr().out == "foo"
cli.run(["machines", "remove-secret", "machine1", "key"])
cli.run(["users", "add", "user1", PUBKEY_2])
cli.run(["users", "add", "user1", test_keys[1].pubkey])
cli.run(["users", "add-secret", "user1", "key"])
with mock_env(SOPS_AGE_KEY=PRIVKEY_2, SOPS_AGE_KEY_FILE=""):
with mock_env(SOPS_AGE_KEY=test_keys[1].privkey, SOPS_AGE_KEY_FILE=""):
capsys.readouterr()
cli.run(["get", "key"])
assert capsys.readouterr().out == "foo"
@ -158,7 +151,7 @@ def test_secrets(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
capsys.readouterr() # empty the buffer
cli.run(["set", "--group", "admin-group", "key2"])
with mock_env(SOPS_AGE_KEY=PRIVKEY_2, SOPS_AGE_KEY_FILE=""):
with mock_env(SOPS_AGE_KEY=test_keys[1].privkey, SOPS_AGE_KEY_FILE=""):
capsys.readouterr()
cli.run(["get", "key"])
assert capsys.readouterr().out == "foo"
@ -170,35 +163,3 @@ def test_secrets(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
capsys.readouterr() # empty the buffer
cli.run(["list"])
assert capsys.readouterr().out == ""
def test_import_sops(
test_root: Path, clan_flake: Path, capsys: pytest.CaptureFixture
) -> None:
cli = SecretCli()
with mock_env(SOPS_AGE_KEY=PRIVKEY_2):
cli.run(["machines", "add", "machine1", PUBKEY])
cli.run(["users", "add", "user1", PUBKEY_2])
cli.run(["users", "add", "user2", PUBKEY_3])
# To edit:
# SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml
cli.run(
[
"import-sops",
"--user",
"user1",
"--machine",
"machine1",
str(test_root.joinpath("data", "secrets.yaml")),
]
)
capsys.readouterr()
cli.run(["users", "list"])
users = sorted(capsys.readouterr().out.rstrip().split())
assert users == ["user1", "user2"]
capsys.readouterr()
cli.run(["get", "secret-key"])
assert capsys.readouterr().out == "secret-value"