qemu: init python modules for qmp and qga
This commit is contained in:
parent
cc21108c59
commit
56b6907740
77
pkgs/clan-cli/clan_cli/qemu/qga.py
Normal file
77
pkgs/clan-cli/clan_cli/qemu/qga.py
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
import base64
|
||||||
|
import json
|
||||||
|
import socket
|
||||||
|
from pathlib import Path
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
|
||||||
|
# qga is almost like qmp, but not quite, because:
|
||||||
|
# - server doesn't send initial message
|
||||||
|
# - no need to initialize by asking for capabilities
|
||||||
|
# - results need to be base64 decoded
|
||||||
|
class QgaSession:
|
||||||
|
def __init__(self, socket_file: Path | str) -> None:
|
||||||
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
# try to reconnect a couple of times if connetion refused
|
||||||
|
for _ in range(100):
|
||||||
|
try:
|
||||||
|
self.sock.connect(str(socket_file))
|
||||||
|
return
|
||||||
|
except ConnectionRefusedError:
|
||||||
|
sleep(0.1)
|
||||||
|
self.sock.connect(str(socket_file))
|
||||||
|
|
||||||
|
def get_response(self) -> dict:
|
||||||
|
result = self.sock.recv(9999999)
|
||||||
|
return json.loads(result)
|
||||||
|
|
||||||
|
# only execute, don't wait for response
|
||||||
|
def exec_cmd(self, cmd: str) -> None:
|
||||||
|
self.sock.send(
|
||||||
|
json.dumps(
|
||||||
|
{
|
||||||
|
"execute": "guest-exec",
|
||||||
|
"arguments": {
|
||||||
|
"path": "/bin/sh",
|
||||||
|
"arg": ["-l", "-c", cmd],
|
||||||
|
"capture-output": True,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
).encode("utf-8")
|
||||||
|
)
|
||||||
|
|
||||||
|
# run, wait for result, return exitcode and output
|
||||||
|
def run(self, cmd: str) -> tuple[int, str, str]:
|
||||||
|
self.exec_cmd(cmd)
|
||||||
|
result_pid = self.get_response()
|
||||||
|
pid = result_pid["return"]["pid"]
|
||||||
|
# loop until exited=true
|
||||||
|
status_payload = json.dumps(
|
||||||
|
{
|
||||||
|
"execute": "guest-exec-status",
|
||||||
|
"arguments": {
|
||||||
|
"pid": pid,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
).encode("utf-8")
|
||||||
|
while True:
|
||||||
|
self.sock.send(status_payload)
|
||||||
|
result = self.get_response()
|
||||||
|
if "error" in result and result["error"]["desc"].startswith("PID"):
|
||||||
|
raise Exception("PID could not be found")
|
||||||
|
if result["return"]["exited"]:
|
||||||
|
break
|
||||||
|
sleep(0.1)
|
||||||
|
|
||||||
|
exitcode = result["return"]["exitcode"]
|
||||||
|
stdout = (
|
||||||
|
""
|
||||||
|
if "out-data" not in result["return"]
|
||||||
|
else base64.b64decode(result["return"]["out-data"]).decode("utf-8")
|
||||||
|
)
|
||||||
|
stderr = (
|
||||||
|
""
|
||||||
|
if "err-data" not in result["return"]
|
||||||
|
else base64.b64decode(result["return"]["err-data"]).decode("utf-8")
|
||||||
|
)
|
||||||
|
return exitcode, stdout, stderr
|
317
pkgs/clan-cli/clan_cli/qemu/qmp.py
Normal file
317
pkgs/clan-cli/clan_cli/qemu/qmp.py
Normal file
@ -0,0 +1,317 @@
|
|||||||
|
# mypy: ignore-errors
|
||||||
|
|
||||||
|
""" QEMU Monitor Protocol Python class """
|
||||||
|
# Copyright (C) 2009, 2010 Red Hat Inc.
|
||||||
|
#
|
||||||
|
# Authors:
|
||||||
|
# Luiz Capitulino <lcapitulino@redhat.com>
|
||||||
|
#
|
||||||
|
# This work is licensed under the terms of the GNU GPL, version 2. See
|
||||||
|
# the COPYING file in the top-level directory.
|
||||||
|
|
||||||
|
import errno
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
class QMPError(Exception):
|
||||||
|
"""
|
||||||
|
QMP base exception
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class QMPConnectError(QMPError):
|
||||||
|
"""
|
||||||
|
QMP connection exception
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class QMPCapabilitiesError(QMPError):
|
||||||
|
"""
|
||||||
|
QMP negotiate capabilities exception
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class QMPTimeoutError(QMPError):
|
||||||
|
"""
|
||||||
|
QMP timeout exception
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class QEMUMonitorProtocol:
|
||||||
|
"""
|
||||||
|
Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then
|
||||||
|
allow to handle commands and events.
|
||||||
|
"""
|
||||||
|
|
||||||
|
#: Logger object for debugging messages
|
||||||
|
logger: logging.Logger = logging.getLogger("QMP")
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
address: str | tuple[str, int],
|
||||||
|
server: bool = False,
|
||||||
|
nickname: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Create a QEMUMonitorProtocol class.
|
||||||
|
|
||||||
|
@param address: QEMU address, can be either a unix socket path (string)
|
||||||
|
or a tuple in the form ( address, port ) for a TCP
|
||||||
|
connection
|
||||||
|
@param server: server mode listens on the socket (bool)
|
||||||
|
@raise OSError on socket connection errors
|
||||||
|
@note No connection is established, this is done by the connect() or
|
||||||
|
accept() methods
|
||||||
|
"""
|
||||||
|
self.__events: list[dict[str, Any]] = []
|
||||||
|
self.__address: str | tuple[str, int] = address
|
||||||
|
self.__sock: socket.socket = self.__get_sock()
|
||||||
|
self.__sockfile: socket.SocketIO | None = None
|
||||||
|
self._nickname: str | None = nickname
|
||||||
|
if self._nickname:
|
||||||
|
self.logger = logging.getLogger("QMP").getChild(self._nickname)
|
||||||
|
if server:
|
||||||
|
self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
self.__sock.bind(self.__address)
|
||||||
|
self.__sock.listen(1)
|
||||||
|
|
||||||
|
def __get_sock(self) -> socket.socket:
|
||||||
|
if isinstance(self.__address, tuple):
|
||||||
|
family = socket.AF_INET
|
||||||
|
else:
|
||||||
|
family = socket.AF_UNIX
|
||||||
|
return socket.socket(family, socket.SOCK_STREAM)
|
||||||
|
|
||||||
|
def __negotiate_capabilities(self) -> dict[str, Any]:
|
||||||
|
greeting = self.__json_read()
|
||||||
|
if greeting is None or "QMP" not in greeting:
|
||||||
|
raise QMPConnectError
|
||||||
|
# Greeting seems ok, negotiate capabilities
|
||||||
|
resp = self.cmd("qmp_capabilities")
|
||||||
|
if resp and "return" in resp:
|
||||||
|
return greeting
|
||||||
|
raise QMPCapabilitiesError
|
||||||
|
|
||||||
|
def __json_read(self, only_event: bool = False) -> dict[str, Any] | None:
|
||||||
|
while True:
|
||||||
|
data = self.__sockfile.readline()
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
resp = json.loads(data)
|
||||||
|
if "event" in resp:
|
||||||
|
self.logger.debug("<<< %s", resp)
|
||||||
|
self.__events.append(resp)
|
||||||
|
if not only_event:
|
||||||
|
continue
|
||||||
|
return resp
|
||||||
|
|
||||||
|
def __get_events(self, wait: bool | float = False) -> None:
|
||||||
|
"""
|
||||||
|
Check for new events in the stream and cache them in __events.
|
||||||
|
|
||||||
|
@param wait (bool): block until an event is available.
|
||||||
|
@param wait (float): If wait is a float, treat it as a timeout value.
|
||||||
|
|
||||||
|
@raise QMPTimeoutError: If a timeout float is provided and the timeout
|
||||||
|
period elapses.
|
||||||
|
@raise QMPConnectError: If wait is True but no events could be
|
||||||
|
retrieved or if some other error occurred.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Check for new events regardless and pull them into the cache:
|
||||||
|
self.__sock.setblocking(0)
|
||||||
|
try:
|
||||||
|
self.__json_read()
|
||||||
|
except OSError as err:
|
||||||
|
if err.errno == errno.EAGAIN:
|
||||||
|
# No data available
|
||||||
|
pass
|
||||||
|
self.__sock.setblocking(1)
|
||||||
|
|
||||||
|
# Wait for new events, if needed.
|
||||||
|
# if wait is 0.0, this means "no wait" and is also implicitly false.
|
||||||
|
if not self.__events and wait:
|
||||||
|
if isinstance(wait, float):
|
||||||
|
self.__sock.settimeout(wait)
|
||||||
|
try:
|
||||||
|
ret = self.__json_read(only_event=True)
|
||||||
|
except socket.timeout:
|
||||||
|
raise QMPTimeoutError("Timeout waiting for event")
|
||||||
|
except Exception:
|
||||||
|
raise QMPConnectError("Error while reading from socket")
|
||||||
|
if ret is None:
|
||||||
|
raise QMPConnectError("Error while reading from socket")
|
||||||
|
self.__sock.settimeout(None)
|
||||||
|
|
||||||
|
def __enter__(self) -> "QEMUMonitorProtocol":
|
||||||
|
# Implement context manager enter function.
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> bool:
|
||||||
|
# Implement context manager exit function.
|
||||||
|
self.close()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def connect(self, negotiate: bool = True) -> dict[str, Any] | None:
|
||||||
|
"""
|
||||||
|
Connect to the QMP Monitor and perform capabilities negotiation.
|
||||||
|
|
||||||
|
@return QMP greeting dict, or None if negotiate is false
|
||||||
|
@raise OSError on socket connection errors
|
||||||
|
@raise QMPConnectError if the greeting is not received
|
||||||
|
@raise QMPCapabilitiesError if fails to negotiate capabilities
|
||||||
|
"""
|
||||||
|
self.__sock.connect(self.__address)
|
||||||
|
self.__sockfile = self.__sock.makefile()
|
||||||
|
if negotiate:
|
||||||
|
return self.__negotiate_capabilities()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def accept(self, timeout: float | None = 15.0) -> dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Await connection from QMP Monitor and perform capabilities negotiation.
|
||||||
|
|
||||||
|
@param timeout: timeout in seconds (nonnegative float number, or
|
||||||
|
None). The value passed will set the behavior of the
|
||||||
|
underneath QMP socket as described in [1]. Default value
|
||||||
|
is set to 15.0.
|
||||||
|
@return QMP greeting dict
|
||||||
|
@raise OSError on socket connection errors
|
||||||
|
@raise QMPConnectError if the greeting is not received
|
||||||
|
@raise QMPCapabilitiesError if fails to negotiate capabilities
|
||||||
|
|
||||||
|
[1]
|
||||||
|
https://docs.python.org/3/library/socket.html#socket.socket.settimeout
|
||||||
|
"""
|
||||||
|
self.__sock.settimeout(timeout)
|
||||||
|
self.__sock, _ = self.__sock.accept()
|
||||||
|
self.__sockfile = self.__sock.makefile()
|
||||||
|
return self.__negotiate_capabilities()
|
||||||
|
|
||||||
|
def cmd_obj(self, qmp_cmd: dict[str, Any]) -> dict[str, Any] | None:
|
||||||
|
"""
|
||||||
|
Send a QMP command to the QMP Monitor.
|
||||||
|
|
||||||
|
@param qmp_cmd: QMP command to be sent as a Python dict
|
||||||
|
@return QMP response as a Python dict or None if the connection has
|
||||||
|
been closed
|
||||||
|
"""
|
||||||
|
self.logger.debug(">>> %s", qmp_cmd)
|
||||||
|
try:
|
||||||
|
self.__sock.sendall(json.dumps(qmp_cmd).encode("utf-8"))
|
||||||
|
except OSError as err:
|
||||||
|
if err.errno == errno.EPIPE:
|
||||||
|
return None
|
||||||
|
raise err
|
||||||
|
resp = self.__json_read()
|
||||||
|
self.logger.debug("<<< %s", resp)
|
||||||
|
return resp
|
||||||
|
|
||||||
|
def cmd(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
args: dict[str, Any] | None = None,
|
||||||
|
cmd_id: dict[str, Any] | list[Any] | str | int | None = None,
|
||||||
|
) -> dict[str, Any] | None:
|
||||||
|
"""
|
||||||
|
Build a QMP command and send it to the QMP Monitor.
|
||||||
|
|
||||||
|
@param name: command name (string)
|
||||||
|
@param args: command arguments (dict)
|
||||||
|
@param cmd_id: command id (dict, list, string or int)
|
||||||
|
"""
|
||||||
|
qmp_cmd: dict[str, Any] = {"execute": name}
|
||||||
|
if args:
|
||||||
|
qmp_cmd["arguments"] = args
|
||||||
|
if cmd_id:
|
||||||
|
qmp_cmd["id"] = cmd_id
|
||||||
|
return self.cmd_obj(qmp_cmd)
|
||||||
|
|
||||||
|
def command(self, cmd: str, **kwds: Any) -> Any:
|
||||||
|
"""
|
||||||
|
Build and send a QMP command to the monitor, report errors if any
|
||||||
|
"""
|
||||||
|
ret = self.cmd(cmd, kwds)
|
||||||
|
if "error" in ret:
|
||||||
|
raise Exception(ret["error"]["desc"])
|
||||||
|
return ret["return"]
|
||||||
|
|
||||||
|
def pull_event(self, wait: bool | float = False) -> dict[str, Any] | None:
|
||||||
|
"""
|
||||||
|
Pulls a single event.
|
||||||
|
|
||||||
|
@param wait (bool): block until an event is available.
|
||||||
|
@param wait (float): If wait is a float, treat it as a timeout value.
|
||||||
|
|
||||||
|
@raise QMPTimeoutError: If a timeout float is provided and the timeout
|
||||||
|
period elapses.
|
||||||
|
@raise QMPConnectError: If wait is True but no events could be
|
||||||
|
retrieved or if some other error occurred.
|
||||||
|
|
||||||
|
@return The first available QMP event, or None.
|
||||||
|
"""
|
||||||
|
self.__get_events(wait)
|
||||||
|
|
||||||
|
if self.__events:
|
||||||
|
return self.__events.pop(0)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_events(self, wait: bool | float = False) -> list[dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Get a list of available QMP events.
|
||||||
|
|
||||||
|
@param wait (bool): block until an event is available.
|
||||||
|
@param wait (float): If wait is a float, treat it as a timeout value.
|
||||||
|
|
||||||
|
@raise QMPTimeoutError: If a timeout float is provided and the timeout
|
||||||
|
period elapses.
|
||||||
|
@raise QMPConnectError: If wait is True but no events could be
|
||||||
|
retrieved or if some other error occurred.
|
||||||
|
|
||||||
|
@return The list of available QMP events.
|
||||||
|
"""
|
||||||
|
self.__get_events(wait)
|
||||||
|
return self.__events
|
||||||
|
|
||||||
|
def clear_events(self) -> None:
|
||||||
|
"""
|
||||||
|
Clear current list of pending events.
|
||||||
|
"""
|
||||||
|
self.__events = []
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
"""
|
||||||
|
Close the socket and socket file.
|
||||||
|
"""
|
||||||
|
if self.__sock:
|
||||||
|
self.__sock.close()
|
||||||
|
if self.__sockfile:
|
||||||
|
self.__sockfile.close()
|
||||||
|
|
||||||
|
def settimeout(self, timeout: float | None) -> None:
|
||||||
|
"""
|
||||||
|
Set the socket timeout.
|
||||||
|
|
||||||
|
@param timeout (float): timeout in seconds, or None.
|
||||||
|
@note This is a wrap around socket.settimeout
|
||||||
|
"""
|
||||||
|
self.__sock.settimeout(timeout)
|
||||||
|
|
||||||
|
def get_sock_fd(self) -> int:
|
||||||
|
"""
|
||||||
|
Get the socket file descriptor.
|
||||||
|
|
||||||
|
@return The file descriptor number.
|
||||||
|
"""
|
||||||
|
return self.__sock.fileno()
|
||||||
|
|
||||||
|
def is_scm_available(self) -> bool:
|
||||||
|
"""
|
||||||
|
Check if the socket allows for SCM_RIGHTS.
|
||||||
|
|
||||||
|
@return True if SCM_RIGHTS is available, otherwise False.
|
||||||
|
"""
|
||||||
|
return self.__sock.family == socket.AF_UNIX
|
@ -1,7 +1,4 @@
|
|||||||
import base64
|
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
import socket
|
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
@ -15,6 +12,8 @@ from fixtures_flakes import FlakeForTest, generate_flake
|
|||||||
from root import CLAN_CORE
|
from root import CLAN_CORE
|
||||||
|
|
||||||
from clan_cli.dirs import vm_state_dir
|
from clan_cli.dirs import vm_state_dir
|
||||||
|
from clan_cli.qemu.qga import QgaSession
|
||||||
|
from clan_cli.qemu.qmp import QEMUMonitorProtocol
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from age_keys import KeyPair
|
from age_keys import KeyPair
|
||||||
@ -22,79 +21,6 @@ if TYPE_CHECKING:
|
|||||||
no_kvm = not os.path.exists("/dev/kvm")
|
no_kvm = not os.path.exists("/dev/kvm")
|
||||||
|
|
||||||
|
|
||||||
# qga is almost like qmp, but not quite, because:
|
|
||||||
# - server doesn't send initial message
|
|
||||||
# - no need to initialize by asking for capabilities
|
|
||||||
# - results need to be base64 decoded
|
|
||||||
# TODO: move this to an extra file and make it available to other parts like GUI
|
|
||||||
class QgaSession:
|
|
||||||
def __init__(self, socket_file: Path | str) -> None:
|
|
||||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
||||||
# try to reconnect a couple of times if connetion refused
|
|
||||||
for _ in range(100):
|
|
||||||
try:
|
|
||||||
self.sock.connect(str(socket_file))
|
|
||||||
return
|
|
||||||
except ConnectionRefusedError:
|
|
||||||
sleep(0.1)
|
|
||||||
self.sock.connect(str(socket_file))
|
|
||||||
|
|
||||||
def get_response(self) -> dict:
|
|
||||||
result = self.sock.recv(9999999)
|
|
||||||
return json.loads(result)
|
|
||||||
|
|
||||||
# only execute, don't wait for response
|
|
||||||
def exec_cmd(self, cmd: str) -> None:
|
|
||||||
self.sock.send(
|
|
||||||
json.dumps(
|
|
||||||
{
|
|
||||||
"execute": "guest-exec",
|
|
||||||
"arguments": {
|
|
||||||
"path": "/bin/sh",
|
|
||||||
"arg": ["-l", "-c", cmd],
|
|
||||||
"capture-output": True,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
).encode("utf-8")
|
|
||||||
)
|
|
||||||
|
|
||||||
# run, wait for result, return exitcode and output
|
|
||||||
def run(self, cmd: str) -> tuple[int, str, str]:
|
|
||||||
self.exec_cmd(cmd)
|
|
||||||
result_pid = self.get_response()
|
|
||||||
pid = result_pid["return"]["pid"]
|
|
||||||
# loop until exited=true
|
|
||||||
status_payload = json.dumps(
|
|
||||||
{
|
|
||||||
"execute": "guest-exec-status",
|
|
||||||
"arguments": {
|
|
||||||
"pid": pid,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
).encode("utf-8")
|
|
||||||
while True:
|
|
||||||
self.sock.send(status_payload)
|
|
||||||
result = self.get_response()
|
|
||||||
if "error" in result and result["error"]["desc"].startswith("PID"):
|
|
||||||
raise Exception("PID could not be found")
|
|
||||||
if result["return"]["exited"]:
|
|
||||||
break
|
|
||||||
sleep(0.1)
|
|
||||||
|
|
||||||
exitcode = result["return"]["exitcode"]
|
|
||||||
stdout = (
|
|
||||||
""
|
|
||||||
if "out-data" not in result["return"]
|
|
||||||
else base64.b64decode(result["return"]["out-data"]).decode("utf-8")
|
|
||||||
)
|
|
||||||
stderr = (
|
|
||||||
""
|
|
||||||
if "err-data" not in result["return"]
|
|
||||||
else base64.b64decode(result["return"]["err-data"]).decode("utf-8")
|
|
||||||
)
|
|
||||||
return exitcode, stdout, stderr
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.impure
|
@pytest.mark.impure
|
||||||
def test_inspect(
|
def test_inspect(
|
||||||
test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture
|
test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture
|
||||||
@ -226,19 +152,6 @@ def test_vm_persistence(
|
|||||||
Type="oneshot",
|
Type="oneshot",
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
# TODO: implement shutdown via qmp instead of this hack
|
|
||||||
poweroff=dict(
|
|
||||||
description="Poweroff the machine",
|
|
||||||
wantedBy=["multi-user.target"],
|
|
||||||
after=["read_after_reboot.service"],
|
|
||||||
script="""
|
|
||||||
while [ ! -f /var/my-state/poweroff ]; do
|
|
||||||
sleep 0.1
|
|
||||||
done
|
|
||||||
sleep 0.1
|
|
||||||
poweroff
|
|
||||||
""",
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
clan=dict(virtualisation=dict(graphics=False)),
|
clan=dict(virtualisation=dict(graphics=False)),
|
||||||
@ -322,3 +235,9 @@ def test_vm_persistence(
|
|||||||
)
|
)
|
||||||
print(out)
|
print(out)
|
||||||
assert exitcode == 0, out
|
assert exitcode == 0, out
|
||||||
|
|
||||||
|
qmp = QEMUMonitorProtocol(
|
||||||
|
address=str(os.path.realpath(state_dir / "qmp.sock")),
|
||||||
|
)
|
||||||
|
qmp.connect()
|
||||||
|
qmp.cmd_obj({"execute": "system_powerdown"})
|
||||||
|
Loading…
Reference in New Issue
Block a user