clan-cli: fix clan ssh
--json
and --png
This fixes `clan ssh` with the `--json` and `--png` flags. It will now correctly use the actual fields that are present in the generated json. - probes if the ports are accessible - if accessible will attempt a single ssh connection with the provided password, in order to not spam ssh attempts Fixes #1177
This commit is contained in:
parent
a33a76ecd2
commit
498d29cca1
@ -1,17 +1,25 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from ..nix import nix_shell
|
from ..nix import nix_shell
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def ssh(
|
def ssh(
|
||||||
host: str,
|
host: str,
|
||||||
user: str = "root",
|
user: str = "root",
|
||||||
password: str | None = None,
|
password: str | None = None,
|
||||||
ssh_args: list[str] = [],
|
ssh_args: list[str] = [],
|
||||||
|
torify: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
packages = ["nixpkgs#tor", "nixpkgs#openssh"]
|
packages = ["nixpkgs#openssh"]
|
||||||
|
if torify:
|
||||||
|
packages.append("nixpkgs#tor")
|
||||||
|
|
||||||
password_args = []
|
password_args = []
|
||||||
if password:
|
if password:
|
||||||
packages.append("nixpkgs#sshpass")
|
packages.append("nixpkgs#sshpass")
|
||||||
@ -29,7 +37,12 @@ def ssh(
|
|||||||
"StrictHostKeyChecking=no",
|
"StrictHostKeyChecking=no",
|
||||||
f"{user}@{host}",
|
f"{user}@{host}",
|
||||||
]
|
]
|
||||||
cmd = nix_shell(packages, ["torify", *password_args, *_ssh_args])
|
|
||||||
|
cmd_args = [*password_args, *_ssh_args]
|
||||||
|
if torify:
|
||||||
|
cmd_args.insert(0, "torify")
|
||||||
|
|
||||||
|
cmd = nix_shell(packages, cmd_args)
|
||||||
subprocess.run(cmd)
|
subprocess.run(cmd)
|
||||||
|
|
||||||
|
|
||||||
@ -53,14 +66,39 @@ def qrcode_scan(picture_file: str) -> str:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def is_reachable(host: str) -> bool:
|
||||||
|
sock = socket.socket(
|
||||||
|
socket.AF_INET6 if ":" in host else socket.AF_INET, socket.SOCK_STREAM
|
||||||
|
)
|
||||||
|
sock.settimeout(2)
|
||||||
|
try:
|
||||||
|
sock.connect((host, 22))
|
||||||
|
sock.close()
|
||||||
|
return True
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def connect_ssh_from_json(ssh_data: dict[str, str]) -> None:
|
||||||
|
for address in ssh_data["local_addresses"]:
|
||||||
|
log.debug(f"Trying to reach host on: {address}")
|
||||||
|
if is_reachable(address):
|
||||||
|
ssh(host=address, password=ssh_data["password"])
|
||||||
|
exit(0)
|
||||||
|
else:
|
||||||
|
log.debug(f"Could not reach host on {address}")
|
||||||
|
log.debug(f'Trying to reach host via torify on {ssh_data["onion_address"]}')
|
||||||
|
ssh(host=ssh_data["onion_address"], password=ssh_data["password"], torify=True)
|
||||||
|
|
||||||
|
|
||||||
def main(args: argparse.Namespace) -> None:
|
def main(args: argparse.Namespace) -> None:
|
||||||
if args.json:
|
if args.json:
|
||||||
with open(args.json) as file:
|
with open(args.json) as file:
|
||||||
ssh_data = json.load(file)
|
ssh_data = json.load(file)
|
||||||
ssh(host=ssh_data["address"], password=ssh_data["password"])
|
connect_ssh_from_json(ssh_data)
|
||||||
elif args.png:
|
elif args.png:
|
||||||
ssh_data = json.loads(qrcode_scan(args.png))
|
ssh_data = json.loads(qrcode_scan(args.png))
|
||||||
ssh(host=ssh_data["address"], password=ssh_data["password"])
|
connect_ssh_from_json(ssh_data)
|
||||||
|
|
||||||
|
|
||||||
def register_parser(parser: argparse.ArgumentParser) -> None:
|
def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
|
@ -33,7 +33,6 @@ def test_ssh_no_pass(
|
|||||||
"shell",
|
"shell",
|
||||||
fp.any(),
|
fp.any(),
|
||||||
"-c",
|
"-c",
|
||||||
"torify",
|
|
||||||
"ssh",
|
"ssh",
|
||||||
"-o",
|
"-o",
|
||||||
"UserKnownHostsFile=/dev/null",
|
"UserKnownHostsFile=/dev/null",
|
||||||
@ -63,7 +62,6 @@ def test_ssh_with_pass(
|
|||||||
"shell",
|
"shell",
|
||||||
fp.any(),
|
fp.any(),
|
||||||
"-c",
|
"-c",
|
||||||
"torify",
|
|
||||||
"sshpass",
|
"sshpass",
|
||||||
"-p",
|
"-p",
|
||||||
fp.any(),
|
fp.any(),
|
||||||
@ -77,6 +75,65 @@ def test_ssh_with_pass(
|
|||||||
assert fp.call_count(cmd) == 1
|
assert fp.call_count(cmd) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_ssh_no_pass_with_torify(
|
||||||
|
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
host = "somehost"
|
||||||
|
user = "user"
|
||||||
|
if os.environ.get("IN_NIX_SANDBOX"):
|
||||||
|
monkeypatch.delenv("IN_NIX_SANDBOX")
|
||||||
|
cmd: list[str | utils.Any] = [
|
||||||
|
"nix",
|
||||||
|
fp.any(),
|
||||||
|
"shell",
|
||||||
|
fp.any(),
|
||||||
|
"-c",
|
||||||
|
"torify",
|
||||||
|
"ssh",
|
||||||
|
"-o",
|
||||||
|
"UserKnownHostsFile=/dev/null",
|
||||||
|
"-o",
|
||||||
|
"StrictHostKeyChecking=no",
|
||||||
|
f"{user}@{host}",
|
||||||
|
fp.any(),
|
||||||
|
]
|
||||||
|
fp.register(cmd)
|
||||||
|
cli.ssh(
|
||||||
|
host=host,
|
||||||
|
user=user,
|
||||||
|
torify=True,
|
||||||
|
)
|
||||||
|
assert fp.call_count(cmd) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_ssh_with_pass_with_torify(
|
||||||
|
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
host = "somehost"
|
||||||
|
user = "user"
|
||||||
|
if os.environ.get("IN_NIX_SANDBOX"):
|
||||||
|
monkeypatch.delenv("IN_NIX_SANDBOX")
|
||||||
|
cmd: list[str | utils.Any] = [
|
||||||
|
"nix",
|
||||||
|
fp.any(),
|
||||||
|
"shell",
|
||||||
|
fp.any(),
|
||||||
|
"-c",
|
||||||
|
"torify",
|
||||||
|
"sshpass",
|
||||||
|
"-p",
|
||||||
|
fp.any(),
|
||||||
|
]
|
||||||
|
fp.register(cmd)
|
||||||
|
cli.ssh(
|
||||||
|
host=host,
|
||||||
|
user=user,
|
||||||
|
password="XXX",
|
||||||
|
torify=True,
|
||||||
|
)
|
||||||
|
assert fp.call_count(cmd) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
|
def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
|
||||||
cmd: list[str | utils.Any] = [fp.any()]
|
cmd: list[str | utils.Any] = [fp.any()]
|
||||||
fp.register(cmd, stdout="https://test.test")
|
fp.register(cmd, stdout="https://test.test")
|
||||||
|
Loading…
Reference in New Issue
Block a user