add api for secret groups and decrypting secrets
Some checks failed
build / test (pull_request) Failing after 23s
Some checks failed
build / test (pull_request) Failing after 23s
This commit is contained in:
parent
caa1c0dfd8
commit
e103a4186c
@ -23,29 +23,51 @@ def users_folder(group: str) -> Path:
|
||||
return sops_groups_folder() / group / "users"
|
||||
|
||||
|
||||
# TODO: make this a tree
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
class Group:
|
||||
def __init__(self, name: str, machines: list[str], users: list[str]) -> None:
|
||||
self.name = name
|
||||
self.machines = machines
|
||||
self.users = users
|
||||
|
||||
|
||||
def list_groups() -> list[Group]:
|
||||
groups = []
|
||||
folder = sops_groups_folder()
|
||||
if not folder.exists():
|
||||
return
|
||||
return groups
|
||||
|
||||
for group in os.listdir(folder):
|
||||
group_folder = folder / group
|
||||
for name in os.listdir(folder):
|
||||
group_folder = folder / name
|
||||
if not group_folder.is_dir():
|
||||
continue
|
||||
print(group)
|
||||
machines = machines_folder(group)
|
||||
if machines.is_dir():
|
||||
print("machines:")
|
||||
for f in machines.iterdir():
|
||||
machines_path = machines_folder(name)
|
||||
machines = []
|
||||
if machines_path.is_dir():
|
||||
for f in machines_path.iterdir():
|
||||
if validate_hostname(f.name):
|
||||
print(f.name)
|
||||
users = users_folder(group)
|
||||
if users.is_dir():
|
||||
print("users:")
|
||||
for f in users.iterdir():
|
||||
machines.append(f.name)
|
||||
users_path = users_folder(name)
|
||||
users = []
|
||||
if users_path.is_dir():
|
||||
for f in users_path.iterdir():
|
||||
if VALID_USER_NAME.match(f.name):
|
||||
print(f)
|
||||
users.append(f.name)
|
||||
groups.append(Group(name, machines, users))
|
||||
return groups
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
for group in list_groups():
|
||||
print(group.name)
|
||||
if group.machines:
|
||||
print("machines:")
|
||||
for machine in group.machines:
|
||||
print(f" {machine}")
|
||||
if group.users:
|
||||
print("users:")
|
||||
for user in group.users:
|
||||
print(f" {user}")
|
||||
print()
|
||||
|
||||
|
||||
def list_directory(directory: Path) -> str:
|
||||
|
@ -183,13 +183,16 @@ def list_command(args: argparse.Namespace) -> None:
|
||||
print("\n".join(lst))
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
secret: str = args.secret
|
||||
def decrypt_secret(secret: str) -> str:
|
||||
ensure_sops_key()
|
||||
secret_path = sops_secrets_folder() / secret / "secret"
|
||||
if not secret_path.exists():
|
||||
raise ClanError(f"Secret '{secret}' does not exist")
|
||||
print(decrypt_file(secret_path), end="")
|
||||
return decrypt_file(secret_path)
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
print(decrypt_secret(args.secret), end="")
|
||||
|
||||
|
||||
def set_command(args: argparse.Namespace) -> None:
|
||||
|
Loading…
Reference in New Issue
Block a user