From 0c2c9bb91a88befb977309bf94eae60ffee6f627 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexis=20M=C3=A9taireau?= Date: Tue, 25 Mar 2025 12:20:10 +0100 Subject: [PATCH] Use a `Runtime` class to get information about container runtimes This is useful to avoid parsing too many times the settings. --- dangerzone/container_utils.py | 62 +++++++++++----------- dangerzone/isolation_provider/container.py | 31 ++++++----- tests/isolation_provider/test_container.py | 38 ++++++++----- tests/test_container_utils.py | 20 ++++--- 4 files changed, 85 insertions(+), 66 deletions(-) diff --git a/dangerzone/container_utils.py b/dangerzone/container_utils.py index d446947..f4c15c9 100644 --- a/dangerzone/container_utils.py +++ b/dangerzone/container_utils.py @@ -3,7 +3,8 @@ import os import platform import shutil import subprocess -from typing import List, Tuple +from pathlib import Path +from typing import List, Optional, Tuple from . import errors from .settings import Settings @@ -14,29 +15,26 @@ CONTAINER_NAME = "dangerzone.rocks/dangerzone" log = logging.getLogger(__name__) -def get_runtime_name() -> str: - settings = Settings() - try: - runtime_name = settings.get("container_runtime") - except KeyError: - return "podman" if platform.system() == "Linux" else "docker" - return runtime_name - - -def get_runtime() -> str: - container_tech = get_runtime_name() - runtime = shutil.which(container_tech) - if runtime is None: - # Fallback to the container runtime path from the settings +class Runtime(object): + def __init__(self) -> None: settings = Settings() - runtime_path = settings.get("container_runtime_path") - if os.path.exists(runtime_path): - return runtime_path - raise errors.NoContainerTechException(container_tech) - return runtime + + if settings.custom_runtime_specified(): + self.path = Path(settings.get("container_runtime")) + self.name = self.path.stem + else: + self.name = self.get_default_runtime_name() + binary_path = shutil.which(self.name) + if binary_path is None or not os.path.exists(binary_path): + raise errors.NoContainerTechException(self.name) + self.path = Path(binary_path) + + @staticmethod + def get_default_runtime_name() -> str: + return "podman" if platform.system() == "Linux" else "docker" -def get_runtime_version() -> Tuple[int, int]: +def get_runtime_version(runtime: Optional[Runtime] = None) -> Tuple[int, int]: """Get the major/minor parts of the Docker/Podman version. Some of the operations we perform in this module rely on some Podman features @@ -45,15 +43,15 @@ def get_runtime_version() -> Tuple[int, int]: just knowing the major and minor version, since writing/installing a full-blown semver parser is an overkill. """ - # Get the Docker/Podman version, using a Go template. - runtime_name = get_runtime_name() + runtime = runtime or Runtime() - if runtime_name == "podman": + # Get the Docker/Podman version, using a Go template. + if runtime.name == "podman": query = "{{.Client.Version}}" else: query = "{{.Server.Version}}" - cmd = [get_runtime(), "version", "-f", query] + cmd = [str(runtime.path), "version", "-f", query] try: version = subprocess.run( cmd, @@ -85,10 +83,11 @@ def list_image_tags() -> List[str]: images. This can be useful when we want to find which are the local image tags, and which image ID does the "latest" tag point to. """ + runtime = Runtime() return ( subprocess.check_output( [ - get_runtime(), + str(runtime.path), "image", "list", "--format", @@ -105,19 +104,21 @@ def list_image_tags() -> List[str]: def add_image_tag(image_id: str, new_tag: str) -> None: """Add a tag to the Dangerzone image.""" + runtime = Runtime() log.debug(f"Adding tag '{new_tag}' to image '{image_id}'") subprocess.check_output( - [get_runtime(), "tag", image_id, new_tag], + [str(runtime.path), "tag", image_id, new_tag], startupinfo=get_subprocess_startupinfo(), ) def delete_image_tag(tag: str) -> None: """Delete a Dangerzone image tag.""" + runtime = Runtime() log.warning(f"Deleting old container image: {tag}") try: subprocess.check_output( - [get_runtime(), "rmi", "--force", tag], + [str(runtime.name), "rmi", "--force", tag], startupinfo=get_subprocess_startupinfo(), ) except Exception as e: @@ -134,11 +135,12 @@ def get_expected_tag() -> str: def load_image_tarball() -> None: + runtime = Runtime() log.info("Installing Dangerzone container image...") tarball_path = get_resource_path("container.tar") try: res = subprocess.run( - [get_runtime(), "load", "-i", str(tarball_path)], + [str(runtime.path), "load", "-i", str(tarball_path)], startupinfo=get_subprocess_startupinfo(), capture_output=True, check=True, @@ -163,7 +165,7 @@ def load_image_tarball() -> None: # `share/image-id.txt` and delete the incorrect tag. # # [1] https://github.com/containers/podman/issues/16490 - if get_runtime_name() == "podman" and get_runtime_version() == (3, 4): + if runtime.name == "podman" and get_runtime_version(runtime) == (3, 4): expected_tag = get_expected_tag() bad_tag = f"localhost/{expected_tag}:latest" good_tag = f"{CONTAINER_NAME}:{expected_tag}" diff --git a/dangerzone/isolation_provider/container.py b/dangerzone/isolation_provider/container.py index cbb930a..b8c28fd 100644 --- a/dangerzone/isolation_provider/container.py +++ b/dangerzone/isolation_provider/container.py @@ -6,6 +6,7 @@ import subprocess from typing import List, Tuple from .. import container_utils, errors +from ..container_utils import Runtime from ..document import Document from ..util import get_resource_path, get_subprocess_startupinfo from .base import IsolationProvider, terminate_process_group @@ -50,7 +51,8 @@ class Container(IsolationProvider): * Do not map the host user to the container, with `--userns nomap` (available from Podman 4.1 onwards) """ - if container_utils.get_runtime_name() == "podman": + runtime = Runtime() + if runtime.name == "podman": security_args = ["--log-driver", "none"] security_args += ["--security-opt", "no-new-privileges"] if container_utils.get_runtime_version() >= (4, 1): @@ -123,12 +125,11 @@ class Container(IsolationProvider): @staticmethod def is_available() -> bool: - container_runtime = container_utils.get_runtime() - runtime_name = container_utils.get_runtime_name() + runtime = Runtime() # Can we run `docker/podman image ls` without an error with subprocess.Popen( - [container_runtime, "image", "ls"], + [str(runtime.path), "image", "ls"], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, startupinfo=get_subprocess_startupinfo(), @@ -136,14 +137,15 @@ class Container(IsolationProvider): _, stderr = p.communicate() if p.returncode != 0: raise errors.NotAvailableContainerTechException( - runtime_name, stderr.decode() + runtime.name, stderr.decode() ) return True def check_docker_desktop_version(self) -> Tuple[bool, str]: # On windows and darwin, check that the minimum version is met version = "" - runtime_is_docker = container_utils.get_runtime_name() == "docker" + runtime = Runtime() + runtime_is_docker = runtime.name == "docker" platform_is_not_linux = platform.system() != "Linux" if runtime_is_docker and platform_is_not_linux: @@ -196,7 +198,7 @@ class Container(IsolationProvider): command: List[str], name: str, ) -> subprocess.Popen: - container_runtime = container_utils.get_runtime() + runtime = Runtime() security_args = self.get_runtime_security_args() debug_args = [] if self.debug: @@ -218,7 +220,7 @@ class Container(IsolationProvider): + image_name + command ) - return self.exec([container_runtime] + args) + return self.exec([str(runtime.path)] + args) def kill_container(self, name: str) -> None: """Terminate a spawned container. @@ -230,8 +232,8 @@ class Container(IsolationProvider): connected to the Docker daemon, and killing it will just close the associated standard streams. """ - container_runtime = container_utils.get_runtime() - cmd = [container_runtime, "kill", name] + runtime = Runtime() + cmd = [str(runtime.path), "kill", name] try: # We do not check the exit code of the process here, since the container may # have stopped right before invoking this command. In that case, the @@ -287,10 +289,10 @@ class Container(IsolationProvider): # after a podman kill / docker kill invocation, this will likely be the case, # else the container runtime (Docker/Podman) has experienced a problem, and we # should report it. - container_runtime = container_utils.get_runtime() + runtime = Runtime() name = self.doc_to_pixels_container_name(document) all_containers = subprocess.run( - [container_runtime, "ps", "-a"], + [str(runtime.path), "ps", "-a"], capture_output=True, startupinfo=get_subprocess_startupinfo(), ) @@ -301,6 +303,7 @@ class Container(IsolationProvider): # FIXME hardcoded 1 until length conversions are better handled # https://github.com/freedomofpress/dangerzone/issues/257 return 1 + runtime = Runtime() n_cpu = 1 # type: ignore [unreachable] if platform.system() == "Linux": @@ -309,11 +312,11 @@ class Container(IsolationProvider): if cpu_count is not None: n_cpu = cpu_count - elif container_utils.get_runtime_name() == "docker": + elif runtime.name == "docker": # For Windows and MacOS containers run in VM # So we obtain the CPU count for the VM n_cpu_str = subprocess.check_output( - [container_utils.get_runtime(), "info", "--format", "{{.NCPU}}"], + [str(runtime.path), "info", "--format", "{{.NCPU}}"], text=True, startupinfo=get_subprocess_startupinfo(), ) diff --git a/tests/isolation_provider/test_container.py b/tests/isolation_provider/test_container.py index 797986e..8a5f170 100644 --- a/tests/isolation_provider/test_container.py +++ b/tests/isolation_provider/test_container.py @@ -5,7 +5,8 @@ import pytest from pytest_mock import MockerFixture from pytest_subprocess import FakeProcess -from dangerzone import container_utils, errors +from dangerzone import errors +from dangerzone.container_utils import Runtime from dangerzone.isolation_provider.container import Container from dangerzone.isolation_provider.qubes import is_qubes_native_conversion from dangerzone.util import get_resource_path @@ -24,42 +25,51 @@ def provider() -> Container: return Container() +@pytest.fixture +def runtime_path() -> str: + return str(Runtime().path) + + class TestContainer(IsolationProviderTest): - def test_is_available_raises(self, provider: Container, fp: FakeProcess) -> None: + def test_is_available_raises( + self, provider: Container, fp: FakeProcess, runtime_path: str + ) -> None: """ NotAvailableContainerTechException should be raised when the "podman image ls" command fails. """ fp.register_subprocess( - [container_utils.get_runtime(), "image", "ls"], + [runtime_path, "image", "ls"], returncode=-1, stderr="podman image ls logs", ) with pytest.raises(errors.NotAvailableContainerTechException): provider.is_available() - def test_is_available_works(self, provider: Container, fp: FakeProcess) -> None: + def test_is_available_works( + self, provider: Container, fp: FakeProcess, runtime_path: str + ) -> None: """ No exception should be raised when the "podman image ls" can return properly. """ fp.register_subprocess( - [container_utils.get_runtime(), "image", "ls"], + [runtime_path, "image", "ls"], ) provider.is_available() def test_install_raise_if_image_cant_be_installed( - self, provider: Container, fp: FakeProcess + self, provider: Container, fp: FakeProcess, runtime_path: str ) -> None: """When an image installation fails, an exception should be raised""" fp.register_subprocess( - [container_utils.get_runtime(), "image", "ls"], + [runtime_path, "image", "ls"], ) # First check should return nothing. fp.register_subprocess( [ - container_utils.get_runtime(), + runtime_path, "image", "list", "--format", @@ -71,7 +81,7 @@ class TestContainer(IsolationProviderTest): fp.register_subprocess( [ - container_utils.get_runtime(), + runtime_path, "load", "-i", get_resource_path("container.tar").absolute(), @@ -83,22 +93,22 @@ class TestContainer(IsolationProviderTest): provider.install() def test_install_raises_if_still_not_installed( - self, provider: Container, fp: FakeProcess + self, provider: Container, fp: FakeProcess, runtime_path: str ) -> None: """When an image keep being not installed, it should return False""" fp.register_subprocess( - [container_utils.get_runtime(), "version", "-f", "{{.Client.Version}}"], + [runtime_path, "version", "-f", "{{.Client.Version}}"], stdout="4.0.0", ) fp.register_subprocess( - [container_utils.get_runtime(), "image", "ls"], + [runtime_path, "image", "ls"], ) # First check should return nothing. fp.register_subprocess( [ - container_utils.get_runtime(), + runtime_path, "image", "list", "--format", @@ -110,7 +120,7 @@ class TestContainer(IsolationProviderTest): fp.register_subprocess( [ - container_utils.get_runtime(), + runtime_path, "load", "-i", get_resource_path("container.tar").absolute(), diff --git a/tests/test_container_utils.py b/tests/test_container_utils.py index e921507..5b55d59 100644 --- a/tests/test_container_utils.py +++ b/tests/test_container_utils.py @@ -2,7 +2,7 @@ from pathlib import Path from pytest_mock import MockerFixture -from dangerzone.container_utils import get_runtime_name +from dangerzone.container_utils import Runtime from dangerzone.settings import Settings @@ -10,19 +10,23 @@ def test_get_runtime_name_from_settings(mocker: MockerFixture, tmp_path: Path) - mocker.patch("dangerzone.settings.get_config_dir", return_value=tmp_path) settings = Settings() - settings.set("container_runtime", "new-kid-on-the-block", autosave=True) + settings.set( + "container_runtime", "/opt/somewhere/new-kid-on-the-block", autosave=True + ) - assert get_runtime_name() == "new-kid-on-the-block" + assert Runtime().name == "new-kid-on-the-block" -def test_get_runtime_name_linux(mocker: MockerFixture) -> None: +def test_get_runtime_name_linux(mocker: MockerFixture, tmp_path: Path) -> None: + mocker.patch("dangerzone.settings.get_config_dir", return_value=tmp_path) mocker.patch("platform.system", return_value="Linux") - assert get_runtime_name() == "podman" + assert Runtime().name == "podman" -def test_get_runtime_name_non_linux(mocker: MockerFixture) -> None: +def test_get_runtime_name_non_linux(mocker: MockerFixture, tmp_path: Path) -> None: mocker.patch("platform.system", return_value="Windows") - assert get_runtime_name() == "docker" + mocker.patch("dangerzone.settings.get_config_dir", return_value=tmp_path) + assert Runtime().name == "docker" mocker.patch("platform.system", return_value="Something else") - assert get_runtime_name() == "docker" + assert Runtime().name == "docker"