1
0
forked from clan/clan-core

Merge pull request 'clan_vm_manager: Partially working process executor with killpg' (#656) from Qubasa-main into main

This commit is contained in:
clan-bot 2023-12-19 17:05:21 +00:00
commit 30e7e06f59
7 changed files with 240 additions and 10 deletions

View File

@ -1,6 +1,7 @@
# Import the urllib.parse, enum and dataclasses modules
import dataclasses
import urllib.parse
import urllib.request
from dataclasses import dataclass
from enum import Enum, member
from pathlib import Path
@ -9,6 +10,19 @@ from typing import Self
from .errors import ClanError
def url_ok(url: str) -> None:
# Create a request object with the URL and the HEAD method
req = urllib.request.Request(url, method="HEAD")
try:
# Open the URL and get the response object
res = urllib.request.urlopen(req)
# Return True if the status code is 200 (OK)
if not res.status_code == 200:
raise ClanError(f"URL has status code: {res.status_code}")
except urllib.error.URLError as ex:
raise ClanError(f"URL error: {ex}")
# Define an enum with different members that have different values
class ClanScheme(Enum):
# Use the dataclass decorator to add fields and methods to the members
@ -84,16 +98,24 @@ class ClanURI:
case ("http" | "https", _, _, _, _, _):
self.scheme = ClanScheme.HTTP.value(self._components.geturl()) # type: ignore
case ("file", "", path, "", "", "") | ("", "", path, "", "", ""): # type: ignore
self.scheme = ClanScheme.FILE.value(Path(path)) # type: ignore
self.scheme = ClanScheme.FILE.value(Path(path))
case _:
raise ClanError(f"Unsupported uri components: {comb}")
def check_exits(self) -> None:
match self.scheme:
case ClanScheme.FILE.value(path):
if not path.exists():
raise ClanError(f"File does not exist: {path}")
case ClanScheme.HTTP.value(url):
return url_ok(url)
def get_internal(self) -> str:
match self.scheme:
case ClanScheme.FILE.value(path):
return str(path) # type: ignore
return str(path)
case ClanScheme.HTTP.value(url):
return url # type: ignore
return url
case _:
raise ClanError(f"Unsupported uri components: {self.scheme}")

View File

@ -47,6 +47,7 @@ def list_history() -> list[HistoryEntry]:
def add_history(uri: ClanURI) -> list[HistoryEntry]:
uri.check_exits()
user_history_file().parent.mkdir(parents=True, exist_ok=True)
logs = list_history()
found = False

View File

@ -30,6 +30,7 @@ warn_redundant_casts = true
disallow_untyped_calls = true
disallow_untyped_defs = true
no_implicit_optional = true
disable_error_code = ["has-type"]
exclude = "clan_cli.nixpkgs"
[[tool.mypy.overrides]]

View File

@ -1,6 +1,11 @@
import argparse
from .app import register_join_parser, register_overview_parser, show_overview
from .app import (
register_join_parser,
register_overview_parser,
register_run_parser,
show_overview,
)
def main() -> None:
@ -16,6 +21,8 @@ def main() -> None:
register_overview_parser(subparser.add_parser("overview", help="overview screen"))
register_run_parser(subparser.add_parser("run", help="run a vm"))
# Executed when no command is given
parser.set_defaults(func=show_overview)
args = parser.parse_args()

View File

@ -103,3 +103,37 @@ def show_overview(args: argparse.Namespace) -> None:
def register_overview_parser(parser: argparse.ArgumentParser) -> None:
parser.set_defaults(func=show_overview)
# Define a function that writes to the memfd
def dummy_f(msg: str) -> None:
import sys
import time
print(f"Receeived message {msg}")
c = 0
while True: # Simulate a long running process
print(f"out: Hello from process c={c}", file=sys.stdout)
print(f"err: Hello from process c={c}", file=sys.stderr)
user = input("Enter to continue: \n")
if user == "q":
raise Exception("User quit")
print(f"User entered {user}", file=sys.stdout)
print(f"User entered {user}", file=sys.stderr)
time.sleep(1) # Wait for 1 second
c += 1
def show_run_vm(parser: argparse.ArgumentParser) -> None:
from pathlib import Path
from .executor import spawn
log_path = Path(".").resolve()
proc = spawn(wait_stdin_con=True, log_path=log_path, func=dummy_f, msg="Hello")
input("Press enter to kill process: ")
proc.kill_group()
def register_run_parser(parser: argparse.ArgumentParser) -> None:
parser.set_defaults(func=show_run_vm)

View File

@ -0,0 +1,168 @@
import os
import signal
import sys
import traceback
from pathlib import Path
from typing import Any
import gi
gi.require_version("GdkPixbuf", "2.0")
import multiprocessing as mp
from collections.abc import Callable
OUT_FILE: Path | None = None
IN_FILE: Path | None = None
class MPProcess:
def __init__(
self, *, name: str, proc: mp.Process, out_file: Path, in_file: Path
) -> None:
self.name = name
self.proc = proc
self.out_file = out_file
self.in_file = in_file
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def kill_group(self) -> None:
pid = self.proc.pid
assert pid is not None
os.killpg(pid, signal.SIGTERM)
def _set_proc_name(name: str) -> None:
import ctypes
# Define the prctl function with the appropriate arguments and return type
libc = ctypes.CDLL("libc.so.6")
prctl = libc.prctl
prctl.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
]
prctl.restype = ctypes.c_int
# Set the process name to "my_process"
prctl(15, name.encode(), 0, 0, 0)
def _signal_handler(signum: int, frame: Any) -> None:
signame = signal.strsignal(signum)
print("Signal received:", signame)
# Delete files
if OUT_FILE is not None:
OUT_FILE.unlink()
if IN_FILE is not None:
IN_FILE.unlink()
# Restore the default handler
signal.signal(signal.SIGTERM, signal.SIG_DFL)
# Re-raise the signal
os.kill(os.getpid(), signum)
def _init_proc(
func: Callable,
out_file: Path,
in_file: Path,
wait_stdin_connect: bool,
proc_name: str,
**kwargs: Any,
) -> None:
# Set the global variables
global OUT_FILE, IN_FILE
OUT_FILE = out_file
IN_FILE = in_file
# Create a new process group
os.setsid()
# Open stdout and stderr
out_fd = os.open(str(out_file), flags=os.O_RDWR | os.O_CREAT | os.O_TRUNC)
os.dup2(out_fd, sys.stdout.fileno())
os.dup2(out_fd, sys.stderr.fileno())
# Print some information
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Started new process pid={pid} gpid={gpid}")
# Register the signal handler for SIGINT
signal.signal(signal.SIGTERM, _signal_handler)
# Set the process name
_set_proc_name(proc_name)
# Open stdin
flags = None
if wait_stdin_connect:
print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr)
flags = os.O_RDONLY
else:
flags = os.O_RDONLY | os.O_NONBLOCK
in_fd = os.open(str(in_file), flags=flags)
os.dup2(in_fd, sys.stdin.fileno())
# Execute the main function
print(f"Executing function {func.__name__} now", file=sys.stderr)
try:
func(**kwargs)
except Exception:
traceback.print_exc()
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}")
os.killpg(gpid, signal.SIGTERM)
def spawn(
*, wait_stdin_con: bool, log_path: Path, func: Callable, **kwargs: Any
) -> MPProcess:
# Decouple the process from the parent
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="spawn")
# Set names
proc_name = f"MPExec:{func.__name__}"
out_file = log_path / "out.log"
in_file = log_path / "in.fifo"
# Create stdin fifo
if in_file.exists():
in_file.unlink()
os.mkfifo(in_file)
# Start the process
proc = mp.Process(
target=_init_proc,
args=(func, out_file, in_file, wait_stdin_con, proc_name),
name=proc_name,
kwargs=kwargs,
)
proc.start()
# Print some information
assert proc.pid is not None
print(f"Started process '{proc_name}'")
print(f"Arguments: {kwargs}")
if wait_stdin_con:
cmd = f"cat - > {in_file}"
print(f"Connect to stdin with : {cmd}")
cmd = f"tail -f {out_file}"
print(f"Connect to stdout with: {cmd}")
# Return the process
mp_proc = MPProcess(
name=proc_name,
proc=proc,
out_file=out_file,
in_file=in_file,
)
return mp_proc

View File

@ -4,9 +4,10 @@ from pathlib import Path
from typing import Any
import gi
from clan_cli import history, vms
from clan_cli import history
gi.require_version("GdkPixbuf", "2.0")
from gi.repository import GdkPixbuf
from clan_vm_manager import assets
@ -46,11 +47,7 @@ class VMBase:
def run(self) -> None:
print(f"Running VM {self.name}")
import asyncio
# raise Exception("Cannot run VMs yet")
vm = asyncio.run(vms.run.inspect_vm(flake_url=self.url, flake_attr="defaultVM"))
vms.run.run_vm(vm)
# vm = vms.run.inspect_vm(flake_url=self.url, flake_attr="defaultVM")
@dataclass(frozen=True)