clan-core/nixosModules/clanCore/zerotier/generate-network.py
Jörg Thalheim 74a3c85c29
Some checks failed
checks-impure / test (pull_request) Failing after 7s
checks / test (pull_request) Successful in 23s
move zerotier secret generation into nixos module
2023-09-26 17:57:43 +02:00

143 lines
4.2 KiB
Python

import argparse
import json
import socket
import subprocess
import time
import urllib.request
from contextlib import contextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Iterator, Optional
class ClanError(Exception):
pass
def try_bind_port(port: int) -> bool:
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with tcp, udp:
try:
tcp.bind(("127.0.0.1", port))
udp.bind(("127.0.0.1", port))
return True
except OSError:
return False
def try_connect_port(port: int) -> bool:
sock = socket.socket(socket.AF_INET)
result = sock.connect_ex(("127.0.0.1", port))
sock.close()
return result == 0
def find_free_port(port_range: range) -> Optional[int]:
for port in port_range:
if try_bind_port(port):
return port
return None
class ZerotierController:
def __init__(self, port: int, home: Path) -> None:
self.port = port
self.home = home
self.authtoken = (home / "authtoken.secret").read_text()
self.secret = (home / "identity.secret").read_text()
def _http_request(
self,
path: str,
method: str = "GET",
headers: dict[str, str] = {},
data: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
body = None
headers = headers.copy()
if data is not None:
body = json.dumps(data).encode("ascii")
headers["Content-Type"] = "application/json"
headers["X-ZT1-AUTH"] = self.authtoken
url = f"http://127.0.0.1:{self.port}{path}"
req = urllib.request.Request(url, headers=headers, method=method, data=body)
resp = urllib.request.urlopen(req)
return json.load(resp)
def status(self) -> dict[str, Any]:
return self._http_request("/status")
def create_network(self, data: dict[str, Any] = {}) -> dict[str, Any]:
identity = (self.home / "identity.public").read_text()
node_id = identity.split(":")[0]
return self._http_request(
f"/controller/network/{node_id}______", method="POST", data=data
)
def get_network(self, id: str) -> dict[str, Any]:
return self._http_request(f"/controller/network/{id}")
@contextmanager
def zerotier_controller() -> Iterator[ZerotierController]:
# This check could be racy but it's unlikely in practice
controller_port = find_free_port(range(10000, 65535))
if controller_port is None:
raise ClanError("cannot find a free port for zerotier controller")
with TemporaryDirectory() as d:
tempdir = Path(d)
home = tempdir / "zerotier-one"
home.mkdir()
cmd = [
"fakeroot",
"--",
"zerotier-one",
f"-p{controller_port}",
str(home),
]
with subprocess.Popen(cmd) as p:
try:
print(
f"wait for controller to be started on 127.0.0.1:{controller_port}...",
)
while not try_connect_port(controller_port):
status = p.poll()
if status is not None:
raise ClanError(
f"zerotier-one has been terminated unexpected with {status}"
)
time.sleep(0.1)
print()
yield ZerotierController(controller_port, home)
finally:
p.kill()
p.wait()
# TODO: allow merging more network configuration here
def create_network() -> dict:
with zerotier_controller() as controller:
network = controller.create_network()
return {
"secret": controller.secret,
"networkid": network["nwid"],
}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("network_id")
parser.add_argument("identity_secret")
args = parser.parse_args()
zerotier = create_network()
Path(args.network_id).write_text(zerotier["networkid"])
Path(args.identity_secret).write_text(zerotier["secret"])
if __name__ == "__main__":
main()